]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
Don't make unneeded responses, better debug log
[loctrkd.git] / gps303 / gps303proto.py
index acfaea63c272aa74bc676dcac90765d027c8ddd2..dcc335b184fc6fa59249ce2c692f37c54d1587d5 100755 (executable)
@@ -21,12 +21,14 @@ from logging import getLogger
 from struct import pack, unpack
 
 __all__ = (
+    "class_by_prefix",
     "handle_packet",
     "inline_response",
     "make_object",
     "make_response",
     "parse_message",
     "proto_by_name",
+    "Dir",
     "GPS303Pkt",
     "UNKNOWN",
     "LOGIN",
@@ -66,7 +68,7 @@ class Dir(Enum):
 
 class GPS303Pkt:
     PROTO: int
-    DIR = Dir.INLINE  # Most packets anticipate simple acknowledgement
+    DIR = Dir.IN  # Do not send anything back by default
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
@@ -113,11 +115,11 @@ class GPS303Pkt:
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
-    DIR = Dir.IN
 
 
 class LOGIN(GPS303Pkt):
     PROTO = 0x01
+    DIR = Dir.INLINE
     # Default response for ACK, can also respond with STOP_UPLOAD
 
     @classmethod
@@ -132,18 +134,22 @@ class SUPERVISION(GPS303Pkt):
     PROTO = 0x05
     DIR = Dir.OUT
 
-    def response(self, status=0):
+    @classmethod
+    def response(cls, status=0):
         # 1: The device automatically answers Pickup effect
         # 2: Automatically Answering Two-way Calls
         # 3: Ring manually answer the two-way call
-        return self.make_packet(pack("B", status))
+        return cls.make_packet(pack("B", status))
 
 
 class HEARTBEAT(GPS303Pkt):
     PROTO = 0x08
+    DIR = Dir.INLINE
 
 
 class _GPS_POSITIONING(GPS303Pkt):
+    DIR = Dir.INLINE
+
     @classmethod
     def from_packet(cls, length, payload):
         self = super().from_packet(length, payload)
@@ -204,32 +210,35 @@ class STATUS(GPS303Pkt):
             self.signal = None
         return self
 
-    def response(self, upload_interval=25):  # Set interval in minutes
-        return self.make_packet(pack("B", upload_interval))
+    @classmethod
+    def response(cls, upload_interval=25):  # Set interval in minutes
+        return cls.make_packet(pack("B", upload_interval))
 
 
 class HIBERNATION(GPS303Pkt):
     PROTO = 0x14
-    DIR = Dir.EXT
+    DIR = Dir.INLINE
 
-    def response(self):  # Server can send to send devicee to sleep
-        return self.make_packet(b"")
+    @classmethod
+    def response(cls):  # Server can send to send devicee to sleep
+        return cls.make_packet(b"")
 
 
 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
-    DIR = Dir.EXT
 
-    def response(self):  # Server can send to initiate factory reset
-        return self.make_packet(b"")
+    @classmethod
+    def response(cls):  # Server can send to initiate factory reset
+        return cls.make_packet(b"")
 
 
 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
     PROTO = 0x16
     DIR = Dir.OUT
 
-    def response(self, number=3):  # Number of whitelist entries
-        return self.make_packet(pack("B", number))
+    @classmethod
+    def response(cls, number=3):  # Number of whitelist entries
+        return cls.make_packet(pack("B", number))
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
@@ -263,6 +272,7 @@ class _WIFI_POSITIONING(GPS303Pkt):
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
+    DIR = Dir.INLINE
 
     @classmethod
     def inline_response(cls, packet):
@@ -273,6 +283,7 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
 
 class TIME(GPS303Pkt):
     PROTO = 0x30
+    DIR = Dir.INLINE
 
     @classmethod
     def inline_response(cls, packet):
@@ -285,15 +296,17 @@ class PROHIBIT_LBS(GPS303Pkt):
     PROTO = 0x33
     DIR = Dir.OUT
 
-    def response(self, status=1):  # Server sent, 0-off, 1-on
-        return self.make_packet(pack("B", status))
+    @classmethod
+    def response(cls, status=1):  # Server sent, 0-off, 1-on
+        return cls.make_packet(pack("B", status))
 
 
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
     DIR = Dir.OUT
 
-    def response(self):
+    @classmethod
+    def response(cls):
         # Data is in packed decimal
         # 00/01 - GPS on/off
         # 00/01 - Don't set / Set upload period
@@ -303,14 +316,15 @@ class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
         # HHMM  - Time of boot
         # 00/01 - Don't set / Set time of shutdown
         # HHMM  - Time of shutdown
-        return self.make_packet(b"")  # TODO
+        return cls.make_packet(b"")  # TODO
 
 
 class _SET_PHONE(GPS303Pkt):
     DIR = Dir.OUT
 
-    def response(self, phone):
-        return self.make_packet(phone.encode())
+    @classmethod
+    def response(cls, phone):
+        return cls.make_packet(phone.encode())
 
 
 class REMOTE_MONITOR_PHONE(_SET_PHONE):
@@ -333,16 +347,18 @@ class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
     PROTO = 0x44
     DIR = Dir.OUT
 
-    def response(self):
-        return self.make_packet(b"")
+    @classmethod
+    def response(cls):
+        return cls.make_packet(b"")
 
 
 class GPS_OFF_PERIOD(GPS303Pkt):
     PROTO = 0x46
     DIR = Dir.OUT
 
-    def response(self, onoff=0, fm="0000", to="2359"):
-        return self.make_packet(
+    @classmethod
+    def response(cls, onoff=0, fm="0000", to="2359"):
+        return cls.make_packet(
             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
         )
 
@@ -351,10 +367,11 @@ class DND_PERIOD(GPS303Pkt):
     PROTO = 0x47
     DIR = Dir.OUT
 
+    @classmethod
     def response(
-        self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
+        cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
     ):
-        return self.make_packet(
+        return cls.make_packet(
             pack("B", onoff)
             + pack("B", week)
             + bytes.fromhex(fm1)
@@ -368,29 +385,32 @@ class RESTART_SHUTDOWN(GPS303Pkt):
     PROTO = 0x48
     DIR = Dir.OUT
 
-    def response(self, flag=0):
+    @classmethod
+    def response(cls, flag=2):
         # 1 - restart
         # 2 - shutdown
-        return self.make_packet(pack("B", flag))
+        return cls.make_packet(pack("B", flag))
 
 
 class DEVICE(GPS303Pkt):
     PROTO = 0x49
     DIR = Dir.OUT
 
-    def response(self, flag=0):
+    @classmethod
+    def response(cls, flag=0):
         # 0 - Stop looking for equipment
         # 1 - Start looking for equipment
-        return self.make_packet(pack("B", flag))
+        return cls.make_packet(pack("B", flag))
 
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
     DIR = Dir.OUT
 
-    def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
+    @classmethod
+    def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
         return b"".join(
-            pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
+            cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
         )
 
 
@@ -401,14 +421,16 @@ class STOP_ALARM(GPS303Pkt):
     def from_packet(cls, length, payload):
         self = super().from_packet(length, payload)
         self.flag = payload[0]
+        return self
 
 
 class SETUP(GPS303Pkt):
     PROTO = 0x57
     DIR = Dir.EXT
 
+    @classmethod
     def response(
-        self,
+        cls,
         uploadintervalseconds=0x0300,
         binaryswitch=0b00110001,
         alarms=[0, 0, 0],
@@ -439,7 +461,7 @@ class SETUP(GPS303Pkt):
             ]
             + [b";".join([el.encode() for el in phonenumbers])]
         )
-        return self.make_packet(payload)
+        return cls.make_packet(payload)
 
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
@@ -454,24 +476,25 @@ class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
     DIR = Dir.EXT
 
-    def response(self, lat=None, lon=None):
+    @classmethod
+    def response(cls, lat=None, lon=None):
         if lat is None or lon is None:
             payload = b""
         else:
             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
                 "ascii"
             )
-        return self.make_packet(payload)
+        return cls.make_packet(payload)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
     PROTO = 0x80
-    DIR = Dir.EXT
+    DIR = Dir.OUT
 
     @classmethod
     def from_packet(cls, length, payload):
         self = super().from_packet(length, payload)
-        self.flag = payload[0]
+        self.flag = payload[0] if len(payload) > 0 else None
         self.reason = {
             1: "Incorrect time",
             2: "LBS less",
@@ -481,9 +504,11 @@ class MANUAL_POSITIONING(GPS303Pkt):
             6: "LBS prohibited, WiFi absent",
             7: "GPS spacing < 50 m",
         }.get(self.flag, "Unknown")
+        return self
 
-    def response(self):
-        return self.make_packet(b"")
+    @classmethod
+    def response(cls):
+        return cls.make_packet(b"")
 
 
 class BATTERY_CHARGE(GPS303Pkt):
@@ -512,8 +537,9 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
         self.interval = unpack("!H", payload[:2])
         return self
 
-    def response(self, interval=10):
-        return self.make_packet(pack("!H", interval))
+    @classmethod
+    def response(cls, interval=10):
+        return cls.make_packet(pack("!H", interval))
 
 
 class SOS_ALARM(GPS303Pkt):
@@ -536,6 +562,18 @@ if True:  # just to indent the code, sorry!
             PROTOS[cls.__name__] = cls.PROTO
 
 
+def class_by_prefix(prefix):
+    lst = [
+        (name, proto)
+        for name, proto in PROTOS.items()
+        if name.upper().startswith(prefix.upper())
+    ]
+    if len(lst) != 1:
+        return lst
+    _, proto = lst[0]
+    return CLASSES[proto]
+
+
 def proto_by_name(name):
     return PROTOS.get(name, -1)