]> average.org Git - loctrkd.git/blobdiff - gps303/GT06mod.py
Work with cell location data; use opencellid
[loctrkd.git] / gps303 / GT06mod.py
index 31be5d3bc5bbe354a4f2ff5b94b4f88c7d2c9f93..c3918c754b0e5350ad1601f9f66326b2a9758180 100755 (executable)
@@ -3,18 +3,48 @@ Implementation of the protocol used by zx303 GPS+GPRS module
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
-from datetime import datetime
+from datetime import datetime, timezone
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 
-__all__ = ("handle_packet", "make_response")
+__all__ = (
+    "handle_packet",
+    "make_object",
+    "make_response",
+    "set_config",
+    "UNKNOWN",
+    "LOGIN",
+    "SUPERVISION",
+    "HEARTBEAT",
+    "GPS_POSITIONING",
+    "GPS_OFFLINE_POSITIONING",
+    "STATUS",
+    "HIBERNATION",
+    "RESET",
+    "WHITELIST_TOTAL",
+    "WIFI_OFFLINE_POSITIONING",
+    "TIME",
+    "MOM_PHONE",
+    "STOP_ALARM",
+    "SETUP",
+    "SYNCHRONOUS_WHITELIST",
+    "RESTORE_PASSWORD",
+    "WIFI_POSITIONING",
+    "MANUAL_POSITIONING",
+    "BATTERY_CHARGE",
+    "CHARGER_CONNECTED",
+    "CHARGER_DISCONNECTED",
+    "VIBRATION_RECEIVED",
+    "POSITION_UPLOAD_INTERVAL",
+)
 
 log = getLogger("gps303")
 
 
 class _GT06pkt:
     PROTO: int
 
 log = getLogger("gps303")
 
 
 class _GT06pkt:
     PROTO: int
+    CONFIG = None
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
@@ -38,12 +68,7 @@ class _GT06pkt:
 
     @classmethod
     def from_packet(cls, length, proto, payload):
 
     @classmethod
     def from_packet(cls, length, proto, payload):
-        adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-        if length > 1 and len(payload) + adjust != length:
-            log.warning(
-                "length is %d but payload length is %d", length, len(payload)
-            )
-        return cls(length=length, proto=proto, payload=payload)
+        return cls(proto=proto, payload=payload, length=length)
 
     def response(self, *args):
         if len(args) == 0:
 
     def response(self, *args):
         if len(args) == 0:
@@ -83,12 +108,27 @@ class HEARTBEAT(_GT06pkt):
 
 
 class _GPS_POSITIONING(_GT06pkt):
 
 
 class _GPS_POSITIONING(_GT06pkt):
-
     @classmethod
     def from_packet(cls, length, proto, payload):
         self = super().from_packet(length, proto, payload)
         self.dtime = payload[:6]
     @classmethod
     def from_packet(cls, length, proto, payload):
         self = super().from_packet(length, proto, payload)
         self.dtime = payload[:6]
-        # TODO parse the rest
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime(
+                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+            )
+        self.gps_data_length = payload[6] >> 4
+        self.gps_nb_sat = payload[6] & 0x0F
+        lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
+        self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
+        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.speed = speed
+        self.flags = flags
         return self
 
     def response(self):
         return self
 
     def response(self):
@@ -131,9 +171,41 @@ class WHITELIST_TOTAL(_GT06pkt):
     PROTO = 0x16
 
 
     PROTO = 0x16
 
 
-class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+class _WIFI_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime.strptime(
+                self.dtime.hex(), "%y%m%d%H%M%S"
+            ).astimezone(tz=timezone.utc)
+        self.wifi_aps = []
+        for i in range(self.length):  # length has special meaning here
+            slice = payload[6 + i * 7 : 13 + i * 7]
+            self.wifi_aps.append(
+                (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
+            )
+        gsm_slice = payload[6 + self.length * 7 :]
+        ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
+        self.gsm_cells = []
+        for i in range(ncells):
+            slice = gsm_slice[4 + i * 5 : 9 + i * 5]
+            locac, cellid, sigstr = unpack(
+                "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
+            )
+            self.gsm_cells.append((locac, cellid, -sigstr))
+        return self
+
+
+class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
     PROTO = 0x17
 
+    def response(self):
+        return super().response(self.dtime)
+
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
 class TIME(_GT06pkt):
     PROTO = 0x30
@@ -197,9 +269,13 @@ class RESTORE_PASSWORD(_GT06pkt):
     PROTO = 0x67
 
 
     PROTO = 0x67
 
 
-class WIFI_POSITIONING(_GT06pkt):
+class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
     PROTO = 0x69
 
+    def response(self):
+        payload = b""  # TODO fill payload
+        return super().response(payload)
+
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
@@ -248,18 +324,43 @@ if True:  # just to indent the code, sorry!
             CLASSES[cls.PROTO] = cls
 
 
             CLASSES[cls.PROTO] = cls
 
 
+def make_object(length, proto, payload):
+    if proto in CLASSES:
+        return CLASSES[proto].from_packet(length, proto, payload)
+    else:
+        return UNKNOWN.from_packet(length, proto, payload)
+
+
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
-        msg = UNKNOWN.from_packet(0, 0, packet)
+        return UNKNOWN.from_packet(0, 0, packet)
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
-        if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
-            msg = UNKNOWN.from_packet(length, proto, packet)
+        adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
+        if (
+            proto
+            not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+            and length > 1
+            and len(payload) + adjust != length
+        ):
+            log.warning(
+                "With proto %d length is %d but payload length is %d+%d",
+                proto,
+                length,
+                len(payload),
+                adjust,
+            )
+        if xx != b"xx" or crlf != b"\r\n":
+            return UNKNOWN.from_packet(length, proto, packet)  # full packet
         else:
         else:
-            msg = CLASSES[proto].from_packet(length, proto, payload)
-    return msg
+            return make_object(length, proto, payload)
+
 
 def make_response(msg):
     return msg.response()
 
 def make_response(msg):
     return msg.response()
+
+
+def set_config(config):  # Note that we are setting _class_ attribute
+    _GT06pkt.CONFIG = config