]> average.org Git - loctrkd.git/blobdiff - gps303/GT06mod.py
define more protocol units
[loctrkd.git] / gps303 / GT06mod.py
index aa8246facd3823946cb6aaef89df746ab8d345d4..86cc8aacdc6542bab99a64a40b4befe90e42ddad 100755 (executable)
@@ -3,7 +3,7 @@ Implementation of the protocol used by zx303 GPS+GPRS module
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
-from datetime import datetime
+from datetime import datetime, timezone
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
@@ -67,8 +67,8 @@ class _GT06pkt:
         )
 
     @classmethod
         )
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        return cls(proto=proto, payload=payload)
+    def from_packet(cls, length, proto, payload):
+        return cls(proto=proto, payload=payload, length=length)
 
     def response(self, *args):
         if len(args) == 0:
 
     def response(self, *args):
         if len(args) == 0:
@@ -89,8 +89,8 @@ class LOGIN(_GT06pkt):
     PROTO = 0x01
 
     @classmethod
     PROTO = 0x01
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
@@ -99,9 +99,15 @@ class LOGIN(_GT06pkt):
         return super().response(b"")
 
 
         return super().response(b"")
 
 
-class SUPERVISION(_GT06pkt):
+class SUPERVISION(_GT06pkt):  # Server sends supervision number status
     PROTO = 0x05
 
     PROTO = 0x05
 
+    def response(self, supnum=0):
+        # 1: The device automatically answers Pickup effect
+        # 2: Automatically Answering Two-way Calls
+        # 3: Ring manually answer the two-way call
+        return super().response(b"")
+
 
 class HEARTBEAT(_GT06pkt):
     PROTO = 0x08
 
 class HEARTBEAT(_GT06pkt):
     PROTO = 0x08
@@ -109,10 +115,26 @@ class HEARTBEAT(_GT06pkt):
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.dtime = payload[:6]
         self.dtime = payload[:6]
-        # TODO parse the rest
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime(
+                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+            )
+        self.gps_data_length = payload[6] >> 4
+        self.gps_nb_sat = payload[6] & 0x0F
+        lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
+        self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
+        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.speed = speed
+        self.flags = flags
         return self
 
     def response(self):
         return self
 
     def response(self):
@@ -131,33 +153,80 @@ class STATUS(_GT06pkt):
     PROTO = 0x13
 
     @classmethod
     PROTO = 0x13
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         if len(payload) == 5:
         if len(payload) == 5:
-            self.batt, self.ver, self.intvl, self.signal, _ = unpack(
-                "BBBBB", payload
-            )
+            (
+                self.batt,
+                self.ver,
+                self.timezone,
+                self.intvl,
+                self.signal,
+            ) = unpack("BBBBB", payload)
         elif len(payload) == 4:
         elif len(payload) == 4:
-            self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
+            self.batt, self.ver, self.timezone, self.intvl = unpack(
+                "BBBB", payload
+            )
             self.signal = None
         return self
 
             self.signal = None
         return self
 
+    def response(self, upload_interval=25):  # Set interval in minutes
+        return super().response(pack("B", upload_interval))
+
 
 class HIBERNATION(_GT06pkt):
     PROTO = 0x14
 
 
 
 class HIBERNATION(_GT06pkt):
     PROTO = 0x14
 
 
-class RESET(_GT06pkt):
+class RESET(_GT06pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
 
     PROTO = 0x15
 
+    def response(self):  # Server can send to initiate factory reset
+        return super().response(b"")
+
 
 
-class WHITELIST_TOTAL(_GT06pkt):
+class WHITELIST_TOTAL(_GT06pkt):  # Server sends to initiage sync (0x58)
     PROTO = 0x16
 
     PROTO = 0x16
 
+    def response(self, number=3):  # Number of whitelist entries
+        return super().response(pack("B", number))
+
+
+class _WIFI_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime.strptime(
+                self.dtime.hex(), "%y%m%d%H%M%S"
+            ).astimezone(tz=timezone.utc)
+        self.wifi_aps = []
+        for i in range(self.length):  # length has special meaning here
+            slice = payload[6 + i * 7 : 13 + i * 7]
+            self.wifi_aps.append(
+                (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
+            )
+        gsm_slice = payload[6 + self.length * 7 :]
+        ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
+        self.gsm_cells = []
+        for i in range(ncells):
+            slice = gsm_slice[4 + i * 5 : 9 + i * 5]
+            locac, cellid, sigstr = unpack(
+                "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
+            )
+            self.gsm_cells.append((locac, cellid, -sigstr))
+        return self
+
 
 
-class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
     PROTO = 0x17
 
+    def response(self):
+        return super().response(self.dtime)
+
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
 class TIME(_GT06pkt):
     PROTO = 0x30
@@ -167,10 +236,24 @@ class TIME(_GT06pkt):
         return super().response(payload)
 
 
         return super().response(payload)
 
 
+class PROHIBIT_LBS(_GT06pkt):
+    PROTO = 0x33
+
+    def response(self, status=1):  # Server sent, 0-off, 1-on
+        return super().response(pack("B", status))
+
+
 class MOM_PHONE(_GT06pkt):
     PROTO = 0x43
 
 
 class MOM_PHONE(_GT06pkt):
     PROTO = 0x43
 
 
+class STOP_UPLOAD(_GT06pkt):  # Server response to LOGIN to thwart the device
+    PROTO = 0x44
+
+    def response(self):
+        return super().response(b"")
+
+
 class STOP_ALARM(_GT06pkt):
     PROTO = 0x56
 
 class STOP_ALARM(_GT06pkt):
     PROTO = 0x56
 
@@ -221,9 +304,13 @@ class RESTORE_PASSWORD(_GT06pkt):
     PROTO = 0x67
 
 
     PROTO = 0x67
 
 
-class WIFI_POSITIONING(_GT06pkt):
+class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
     PROTO = 0x69
 
+    def response(self):
+        payload = b""  # TODO fill payload
+        return super().response(payload)
+
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
@@ -249,8 +336,8 @@ class POSITION_UPLOAD_INTERVAL(_GT06pkt):
     PROTO = 0x98
 
     @classmethod
     PROTO = 0x98
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.interval = unpack("!H", payload[:2])
         return self
 
         self.interval = unpack("!H", payload[:2])
         return self
 
@@ -258,6 +345,10 @@ class POSITION_UPLOAD_INTERVAL(_GT06pkt):
         return super().response(pack("!H", self.interval))
 
 
         return super().response(pack("!H", self.interval))
 
 
+class SOS_ALARM(_GT06pkt):
+    PROTO = 0x99
+
+
 # Build a dict protocol number -> class
 CLASSES = {}
 if True:  # just to indent the code, sorry!
 # Build a dict protocol number -> class
 CLASSES = {}
 if True:  # just to indent the code, sorry!
@@ -271,28 +362,39 @@ if True:  # just to indent the code, sorry!
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
-def make_object(proto, payload):
+
+def make_object(length, proto, payload):
     if proto in CLASSES:
     if proto in CLASSES:
-        return CLASSES[proto].from_packet(proto, payload)
+        return CLASSES[proto].from_packet(length, proto, payload)
     else:
     else:
-        return UNKNOWN.from_packet(proto, payload)
+        return UNKNOWN.from_packet(length, proto, payload)
+
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
-        return UNKNOWN.from_packet(0, packet)
+        return UNKNOWN.from_packet(0, 0, packet)
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-        if length > 1 and len(payload) + adjust != length:
+        if (
+            proto
+            not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+            and length > 1
+            and len(payload) + adjust != length
+        ):
             log.warning(
             log.warning(
-                "length is %d but payload length is %d", length, len(payload)
+                "With proto %d length is %d but payload length is %d+%d",
+                proto,
+                length,
+                len(payload),
+                adjust,
             )
         if xx != b"xx" or crlf != b"\r\n":
             )
         if xx != b"xx" or crlf != b"\r\n":
-            return UNKNOWN.from_packet(proto, packet)  # full packet as payload
+            return UNKNOWN.from_packet(length, proto, packet)  # full packet
         else:
         else:
-            return make_object(proto, payload)
+            return make_object(length, proto, payload)
 
 
 def make_response(msg):
 
 
 def make_response(msg):