]> average.org Git - loctrkd.git/commitdiff
clean up `from_packet()`
authorEugene Crosser <crosser@average.org>
Thu, 28 Apr 2022 21:38:04 +0000 (23:38 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 28 Apr 2022 21:38:04 +0000 (23:38 +0200)
gps303/gps303proto.py

index 04f535057ae2d6b95d7a3f25c75de2309cd3a293..c19c7c1b300c78fe2ea511c1a7859f0c35c434e4 100755 (executable)
@@ -118,13 +118,21 @@ class MetaPkt(type):
             cls,
             name + ".In",
             (newcls,) + bases,
-            {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
+            {
+                "KWARGS": newcls.IN_KWARGS,
+                "decode": newcls.in_decode,
+                "encode": newcls.in_encode,
+            },
         )
         newcls.Out = super().__new__(
             cls,
             name + ".Out",
             (newcls,) + bases,
-            {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
+            {
+                "KWARGS": newcls.OUT_KWARGS,
+                "decode": newcls.out_decode,
+                "encode": newcls.out_encode,
+            },
         )
         return newcls
 
@@ -138,8 +146,7 @@ class Respond(Enum):
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
-    # Have these kwargs for now, TODO redo
-    IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
+    IN_KWARGS = ()
     OUT_KWARGS = ()
 
     def __init__(self, *args, **kwargs):
@@ -168,6 +175,14 @@ class GPS303Pkt(metaclass=MetaPkt):
             ),
         )
 
+    def in_decode(self, length, packet):
+        return
+
+    def out_decode(self, length, packet):
+        raise NotImplementedError(
+            self.__class__.__name__ + ".decode() not implemented"
+        )
+
     def in_encode(self):
         raise NotImplementedError(
             self.__class__.__name__ + ".encode() not implemented"
@@ -184,7 +199,11 @@ class GPS303Pkt(metaclass=MetaPkt):
 
     @classmethod
     def from_packet(cls, length, payload):
-        return cls.In(payload=payload, length=length)
+        self = cls.In()
+        self.length = length
+        self.payload = payload
+        self.decode(length, payload)
+        return self
 
 
 class UNKNOWN(GPS303Pkt):
@@ -196,9 +215,7 @@ class LOGIN(GPS303Pkt):
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
@@ -223,9 +240,7 @@ class HEARTBEAT(GPS303Pkt):
 class _GPS_POSITIONING(GPS303Pkt):
     RESPOND = Respond.INL
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
             self.devtime = None
@@ -241,7 +256,7 @@ class _GPS_POSITIONING(GPS303Pkt):
         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
         self.heading = flags & 0b0000001111111111  # bits 6 - last
         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
-        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
         self.speed = speed
         self.flags = flags
         return self
@@ -265,21 +280,13 @@ class STATUS(GPS303Pkt):
     RESPOND = Respond.EXT
     OUT_KWARGS = (("upload_interval", int, 25),)
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
-        if len(payload) == 5:
-            (
-                self.batt,
-                self.ver,
-                self.timezone,
-                self.intvl,
-                self.signal,
-            ) = unpack("BBBBB", payload)
-        elif len(payload) == 4:
-            self.batt, self.ver, self.timezone, self.intvl = unpack(
-                "BBBB", payload
-            )
+    def in_decode(self, length, payload):
+        self.batt, self.ver, self.timezone, self.intvl = unpack(
+            "BBBB", payload[:4]
+        )
+        if len(payload) > 4:
+            self.signal = payload[4]
+        else:
             self.signal = None
         return self
 
@@ -307,9 +314,7 @@ class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
             self.devtime = None
@@ -474,9 +479,7 @@ class ALARM_CLOCK(GPS303Pkt):
 class STOP_ALARM(GPS303Pkt):
     PROTO = 0x56
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.flag = payload[0]
         return self
 
@@ -541,9 +544,7 @@ class WIFI_POSITIONING(_WIFI_POSITIONING):
 class MANUAL_POSITIONING(GPS303Pkt):
     PROTO = 0x80
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.flag = payload[0] if len(payload) > 0 else None
         self.reason = {
             1: "Incorrect time",
@@ -578,9 +579,7 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
     RESPOND = Respond.EXT
     OUT_KWARGS = (("interval", int, 10),)
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.interval = unpack("!H", payload[:2])
         return self
 
@@ -593,12 +592,10 @@ class SOS_ALARM(GPS303Pkt):
 
 
 class UNKNOWN_B3(GPS303Pkt):
-    PROTO = 0xb3
+    PROTO = 0xB3
     IN_KWARGS = (("asciidata", str, ""),)
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.asciidata = payload.decode()
         return self
 
@@ -649,7 +646,7 @@ def inline_response(packet):
 
 
 def parse_message(packet):
-    """ From a packet (without framing bytes) derive the XXX.In object """
+    """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto in CLASSES: