]> average.org Git - loctrkd.git/blobdiff - gps303/GT06mod.py
define more protocol units
[loctrkd.git] / gps303 / GT06mod.py
index d10c9de6a6ed6bfd7f592d81548992af197e8b95..86cc8aacdc6542bab99a64a40b4befe90e42ddad 100755 (executable)
@@ -3,17 +3,48 @@ Implementation of the protocol used by zx303 GPS+GPRS module
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
+from datetime import datetime, timezone
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 
-__all__ = ("handle_packet", "make_response")
+__all__ = (
+    "handle_packet",
+    "make_object",
+    "make_response",
+    "set_config",
+    "UNKNOWN",
+    "LOGIN",
+    "SUPERVISION",
+    "HEARTBEAT",
+    "GPS_POSITIONING",
+    "GPS_OFFLINE_POSITIONING",
+    "STATUS",
+    "HIBERNATION",
+    "RESET",
+    "WHITELIST_TOTAL",
+    "WIFI_OFFLINE_POSITIONING",
+    "TIME",
+    "MOM_PHONE",
+    "STOP_ALARM",
+    "SETUP",
+    "SYNCHRONOUS_WHITELIST",
+    "RESTORE_PASSWORD",
+    "WIFI_POSITIONING",
+    "MANUAL_POSITIONING",
+    "BATTERY_CHARGE",
+    "CHARGER_CONNECTED",
+    "CHARGER_DISCONNECTED",
+    "VIBRATION_RECEIVED",
+    "POSITION_UPLOAD_INTERVAL",
+)
 
 log = getLogger("gps303")
 
 
 class _GT06pkt:
     PROTO: int
 
 log = getLogger("gps303")
 
 
 class _GT06pkt:
     PROTO: int
+    CONFIG = None
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
@@ -37,12 +68,7 @@ class _GT06pkt:
 
     @classmethod
     def from_packet(cls, length, proto, payload):
 
     @classmethod
     def from_packet(cls, length, proto, payload):
-        adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-        if length > 1 and len(payload) + adjust != length:
-            log.warning(
-                "length is %d but payload length is %d", length, len(payload)
-            )
-        return cls(length=length, proto=proto, payload=payload)
+        return cls(proto=proto, payload=payload, length=length)
 
     def response(self, *args):
         if len(args) == 0:
 
     def response(self, *args):
         if len(args) == 0:
@@ -73,19 +99,53 @@ class LOGIN(_GT06pkt):
         return super().response(b"")
 
 
         return super().response(b"")
 
 
-class SUPERVISION(_GT06pkt):
+class SUPERVISION(_GT06pkt):  # Server sends supervision number status
     PROTO = 0x05
 
     PROTO = 0x05
 
+    def response(self, supnum=0):
+        # 1: The device automatically answers Pickup effect
+        # 2: Automatically Answering Two-way Calls
+        # 3: Ring manually answer the two-way call
+        return super().response(b"")
+
 
 class HEARTBEAT(_GT06pkt):
     PROTO = 0x08
 
 
 
 class HEARTBEAT(_GT06pkt):
     PROTO = 0x08
 
 
-class GPS_POSITIONING(_GT06pkt):
+class _GPS_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime(
+                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+            )
+        self.gps_data_length = payload[6] >> 4
+        self.gps_nb_sat = payload[6] & 0x0F
+        lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
+        self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
+        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.speed = speed
+        self.flags = flags
+        return self
+
+    def response(self):
+        return super().response(self.dtime)
+
+
+class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
 
 
     PROTO = 0x10
 
 
-class GPS_OFFLINE_POSITIONING(_GT06pkt):
+class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x11
 
 
     PROTO = 0x11
 
 
@@ -96,39 +156,104 @@ class STATUS(_GT06pkt):
     def from_packet(cls, length, proto, payload):
         self = super().from_packet(length, proto, payload)
         if len(payload) == 5:
     def from_packet(cls, length, proto, payload):
         self = super().from_packet(length, proto, payload)
         if len(payload) == 5:
-            self.batt, self.ver, self.intvl, self.signal, _ = unpack(
-                "BBBBB", payload
-            )
+            (
+                self.batt,
+                self.ver,
+                self.timezone,
+                self.intvl,
+                self.signal,
+            ) = unpack("BBBBB", payload)
         elif len(payload) == 4:
         elif len(payload) == 4:
-            self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
+            self.batt, self.ver, self.timezone, self.intvl = unpack(
+                "BBBB", payload
+            )
             self.signal = None
         return self
 
             self.signal = None
         return self
 
+    def response(self, upload_interval=25):  # Set interval in minutes
+        return super().response(pack("B", upload_interval))
+
 
 class HIBERNATION(_GT06pkt):
     PROTO = 0x14
 
 
 
 class HIBERNATION(_GT06pkt):
     PROTO = 0x14
 
 
-class RESET(_GT06pkt):
+class RESET(_GT06pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
 
     PROTO = 0x15
 
+    def response(self):  # Server can send to initiate factory reset
+        return super().response(b"")
+
 
 
-class WHITELIST_TOTAL(_GT06pkt):
+class WHITELIST_TOTAL(_GT06pkt):  # Server sends to initiage sync (0x58)
     PROTO = 0x16
 
     PROTO = 0x16
 
+    def response(self, number=3):  # Number of whitelist entries
+        return super().response(pack("B", number))
+
+
+class _WIFI_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime.strptime(
+                self.dtime.hex(), "%y%m%d%H%M%S"
+            ).astimezone(tz=timezone.utc)
+        self.wifi_aps = []
+        for i in range(self.length):  # length has special meaning here
+            slice = payload[6 + i * 7 : 13 + i * 7]
+            self.wifi_aps.append(
+                (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
+            )
+        gsm_slice = payload[6 + self.length * 7 :]
+        ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
+        self.gsm_cells = []
+        for i in range(ncells):
+            slice = gsm_slice[4 + i * 5 : 9 + i * 5]
+            locac, cellid, sigstr = unpack(
+                "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
+            )
+            self.gsm_cells.append((locac, cellid, -sigstr))
+        return self
+
 
 
-class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
     PROTO = 0x17
 
+    def response(self):
+        return super().response(self.dtime)
+
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
+    def response(self):
+        payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
+        return super().response(payload)
+
+
+class PROHIBIT_LBS(_GT06pkt):
+    PROTO = 0x33
+
+    def response(self, status=1):  # Server sent, 0-off, 1-on
+        return super().response(pack("B", status))
+
 
 class MOM_PHONE(_GT06pkt):
     PROTO = 0x43
 
 
 
 class MOM_PHONE(_GT06pkt):
     PROTO = 0x43
 
 
+class STOP_UPLOAD(_GT06pkt):  # Server response to LOGIN to thwart the device
+    PROTO = 0x44
+
+    def response(self):
+        return super().response(b"")
+
+
 class STOP_ALARM(_GT06pkt):
     PROTO = 0x56
 
 class STOP_ALARM(_GT06pkt):
     PROTO = 0x56
 
@@ -179,9 +304,13 @@ class RESTORE_PASSWORD(_GT06pkt):
     PROTO = 0x67
 
 
     PROTO = 0x67
 
 
-class WIFI_POSITIONING(_GT06pkt):
+class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
     PROTO = 0x69
 
+    def response(self):
+        payload = b""  # TODO fill payload
+        return super().response(payload)
+
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
@@ -206,6 +335,19 @@ class VIBRATION_RECEIVED(_GT06pkt):
 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
     PROTO = 0x98
 
 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
     PROTO = 0x98
 
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.interval = unpack("!H", payload[:2])
+        return self
+
+    def response(self):
+        return super().response(pack("!H", self.interval))
+
+
+class SOS_ALARM(_GT06pkt):
+    PROTO = 0x99
+
 
 # Build a dict protocol number -> class
 CLASSES = {}
 
 # Build a dict protocol number -> class
 CLASSES = {}
@@ -221,18 +363,43 @@ if True:  # just to indent the code, sorry!
             CLASSES[cls.PROTO] = cls
 
 
             CLASSES[cls.PROTO] = cls
 
 
+def make_object(length, proto, payload):
+    if proto in CLASSES:
+        return CLASSES[proto].from_packet(length, proto, payload)
+    else:
+        return UNKNOWN.from_packet(length, proto, payload)
+
+
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
-        msg = UNKNOWN.from_packet(0, 0, packet)
+        return UNKNOWN.from_packet(0, 0, packet)
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
-        if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
-            msg = UNKNOWN.from_packet(length, proto, packet)
+        adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
+        if (
+            proto
+            not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+            and length > 1
+            and len(payload) + adjust != length
+        ):
+            log.warning(
+                "With proto %d length is %d but payload length is %d+%d",
+                proto,
+                length,
+                len(payload),
+                adjust,
+            )
+        if xx != b"xx" or crlf != b"\r\n":
+            return UNKNOWN.from_packet(length, proto, packet)  # full packet
         else:
         else:
-            msg = CLASSES[proto].from_packet(length, proto, payload)
-    return msg
+            return make_object(length, proto, payload)
+
 
 def make_response(msg):
     return msg.response()
 
 def make_response(msg):
     return msg.response()
+
+
+def set_config(config):  # Note that we are setting _class_ attribute
+    _GT06pkt.CONFIG = config