from datetime import datetime, timezone
from enum import Enum
from inspect import isclass
-from struct import pack, unpack
+from struct import error, pack, unpack
__all__ = (
"class_by_prefix",
"inline_response",
"parse_message",
"proto_by_name",
+ "DecodeError",
"Respond",
"GPS303Pkt",
"UNKNOWN",
)
+class DecodeError(Exception):
+ def __init__(self, e, **kwargs):
+ super().__init__(e)
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
def intx(x):
if isinstance(x, str):
x = int(x, 0)
OUT_KWARGS = ()
def __init__(self, *args, **kwargs):
- assert len(args) == 0
- for kw, typ, dfl in self.KWARGS:
- setattr(self, kw, typ(kwargs.pop(kw, dfl)))
- if kwargs:
- print("KWARGS", self.KWARGS)
- print("kwargs", kwargs)
- raise TypeError(
- self.__class__.__name__ + " stray kwargs " + str(kwargs)
- )
+ """
+ Construct the object _either_ from (length, payload),
+ _or_ from the values of individual fields
+ """
+ assert not args or (len(args) == 2 and not kwargs)
+ if args: # guaranteed to be two arguments at this point
+ self.length, self.payload = args
+ try:
+ self.decode(self.length, self.payload)
+ except error as e:
+ raise DecodeError(e, obj=self)
+ else:
+ for kw, typ, dfl in self.KWARGS:
+ setattr(self, kw, typ(kwargs.pop(kw, dfl)))
+ if kwargs:
+ raise ValueError(
+ self.__class__.__name__ + " stray kwargs " + str(kwargs)
+ )
def __repr__(self):
return "{}({})".format(
)
def in_decode(self, length, packet):
+ # Overridden in subclasses, otherwise do not decode payload
return
def out_decode(self, length, packet):
- raise NotImplementedError(
- self.__class__.__name__ + ".decode() not implemented"
- )
+ # Overridden in subclasses, otherwise do not decode payload
+ return
def in_encode(self):
+ # Necessary to emulate terminal, which is not implemented
raise NotImplementedError(
self.__class__.__name__ + ".encode() not implemented"
)
def out_encode(self):
+ # Overridden in subclasses, otherwise make empty payload
return b""
@property
length = len(payload) + 1
return pack("BB", length, self.PROTO) + payload
- @classmethod
- def from_packet(cls, length, payload):
- self = cls.In()
- self.length = length
- self.payload = payload
- self.decode(length, payload)
- return self
-
class UNKNOWN(GPS303Pkt):
PROTO = 256 # > 255 is impossible in real packets
if self.dtime == b"\0\0\0\0\0\0":
self.devtime = None
else:
+ yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
self.devtime = datetime(
- *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+ 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
)
self.gps_data_length = payload[6] >> 4
self.gps_nb_sat = payload[6] & 0x0F
class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
PROTO = 0x14
- RESPOND = Respond.INL
class RESET(GPS303Pkt):
class WIFI_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x69
RESPOND = Respond.EXT
- OUT_KWARGS = (("lat", float, None), ("lon", float, None))
+ OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
def out_encode(self):
- if self.lat is None or self.lon is None:
+ if self.latitude is None or self.longitude is None:
return b""
- return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
+ return "{:+#010.8g},{:+#010.8g}".format(
+ self.latitude, self.longitude
+ ).encode()
+
+ def out_decode(self, length, payload):
+ lat, lon = payload.decode().split(",")
+ self.latitude = float(lat)
+ self.longitude = float(lon)
class MANUAL_POSITIONING(GPS303Pkt):
return None
-def parse_message(packet):
+def parse_message(packet, is_incoming=True):
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])
payload = packet[2:]
- if proto in CLASSES:
- return CLASSES[proto].from_packet(length, payload)
+ if proto not in CLASSES:
+ cause = ValueError(f"Proto {proto} is unknown")
+ else:
+ try:
+ if is_incoming:
+ return CLASSES[proto].In(length, payload)
+ else:
+ return CLASSES[proto].Out(length, payload)
+ except DecodeError as e:
+ cause = e
+ if is_incoming:
+ retobj = UNKNOWN.In(length, payload)
else:
- retobj = UNKNOWN.from_packet(length, payload)
- retobj.PROTO = proto # Override class attr with object attr
- return retobj
+ retobj = UNKNOWN.Out(length, payload)
+ retobj.PROTO = proto # Override class attr with object attr
+ retobj.cause = cause
+ return retobj