]> average.org Git - loctrkd.git/blobdiff - gps303/GT06mod.py
Work with cell location data; use opencellid
[loctrkd.git] / gps303 / GT06mod.py
index aa8246facd3823946cb6aaef89df746ab8d345d4..c3918c754b0e5350ad1601f9f66326b2a9758180 100755 (executable)
@@ -3,7 +3,7 @@ Implementation of the protocol used by zx303 GPS+GPRS module
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
-from datetime import datetime
+from datetime import datetime, timezone
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
@@ -67,8 +67,8 @@ class _GT06pkt:
         )
 
     @classmethod
         )
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        return cls(proto=proto, payload=payload)
+    def from_packet(cls, length, proto, payload):
+        return cls(proto=proto, payload=payload, length=length)
 
     def response(self, *args):
         if len(args) == 0:
 
     def response(self, *args):
         if len(args) == 0:
@@ -89,8 +89,8 @@ class LOGIN(_GT06pkt):
     PROTO = 0x01
 
     @classmethod
     PROTO = 0x01
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
@@ -109,10 +109,26 @@ class HEARTBEAT(_GT06pkt):
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.dtime = payload[:6]
         self.dtime = payload[:6]
-        # TODO parse the rest
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime(
+                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+            )
+        self.gps_data_length = payload[6] >> 4
+        self.gps_nb_sat = payload[6] & 0x0F
+        lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
+        self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
+        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.speed = speed
+        self.flags = flags
         return self
 
     def response(self):
         return self
 
     def response(self):
@@ -131,8 +147,8 @@ class STATUS(_GT06pkt):
     PROTO = 0x13
 
     @classmethod
     PROTO = 0x13
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         if len(payload) == 5:
             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
                 "BBBBB", payload
         if len(payload) == 5:
             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
                 "BBBBB", payload
@@ -155,9 +171,41 @@ class WHITELIST_TOTAL(_GT06pkt):
     PROTO = 0x16
 
 
     PROTO = 0x16
 
 
-class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+class _WIFI_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime.strptime(
+                self.dtime.hex(), "%y%m%d%H%M%S"
+            ).astimezone(tz=timezone.utc)
+        self.wifi_aps = []
+        for i in range(self.length):  # length has special meaning here
+            slice = payload[6 + i * 7 : 13 + i * 7]
+            self.wifi_aps.append(
+                (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
+            )
+        gsm_slice = payload[6 + self.length * 7 :]
+        ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
+        self.gsm_cells = []
+        for i in range(ncells):
+            slice = gsm_slice[4 + i * 5 : 9 + i * 5]
+            locac, cellid, sigstr = unpack(
+                "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
+            )
+            self.gsm_cells.append((locac, cellid, -sigstr))
+        return self
+
+
+class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
     PROTO = 0x17
 
+    def response(self):
+        return super().response(self.dtime)
+
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
 class TIME(_GT06pkt):
     PROTO = 0x30
@@ -221,9 +269,13 @@ class RESTORE_PASSWORD(_GT06pkt):
     PROTO = 0x67
 
 
     PROTO = 0x67
 
 
-class WIFI_POSITIONING(_GT06pkt):
+class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
     PROTO = 0x69
 
+    def response(self):
+        payload = b""  # TODO fill payload
+        return super().response(payload)
+
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
@@ -249,8 +301,8 @@ class POSITION_UPLOAD_INTERVAL(_GT06pkt):
     PROTO = 0x98
 
     @classmethod
     PROTO = 0x98
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.interval = unpack("!H", payload[:2])
         return self
 
         self.interval = unpack("!H", payload[:2])
         return self
 
@@ -271,28 +323,39 @@ if True:  # just to indent the code, sorry!
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
-def make_object(proto, payload):
+
+def make_object(length, proto, payload):
     if proto in CLASSES:
     if proto in CLASSES:
-        return CLASSES[proto].from_packet(proto, payload)
+        return CLASSES[proto].from_packet(length, proto, payload)
     else:
     else:
-        return UNKNOWN.from_packet(proto, payload)
+        return UNKNOWN.from_packet(length, proto, payload)
+
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
-        return UNKNOWN.from_packet(0, packet)
+        return UNKNOWN.from_packet(0, 0, packet)
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-        if length > 1 and len(payload) + adjust != length:
+        if (
+            proto
+            not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+            and length > 1
+            and len(payload) + adjust != length
+        ):
             log.warning(
             log.warning(
-                "length is %d but payload length is %d", length, len(payload)
+                "With proto %d length is %d but payload length is %d+%d",
+                proto,
+                length,
+                len(payload),
+                adjust,
             )
         if xx != b"xx" or crlf != b"\r\n":
             )
         if xx != b"xx" or crlf != b"\r\n":
-            return UNKNOWN.from_packet(proto, packet)  # full packet as payload
+            return UNKNOWN.from_packet(length, proto, packet)  # full packet
         else:
         else:
-            return make_object(proto, payload)
+            return make_object(length, proto, payload)
 
 
 def make_response(msg):
 
 
 def make_response(msg):