]> average.org Git - loctrkd.git/blobdiff - gps303/GT06mod.py
make object parser and sqlite store
[loctrkd.git] / gps303 / GT06mod.py
diff --git a/gps303/GT06mod.py b/gps303/GT06mod.py
new file mode 100755 (executable)
index 0000000..d10c9de
--- /dev/null
@@ -0,0 +1,238 @@
+"""
+Implementation of the protocol used by zx303 GPS+GPRS module
+Description from https://github.com/tobadia/petGPS/tree/master/resources
+"""
+
+from inspect import isclass
+from logging import getLogger
+from struct import pack, unpack
+
+__all__ = ("handle_packet", "make_response")
+
+log = getLogger("gps303")
+
+
+class _GT06pkt:
+    PROTO: int
+
+    def __init__(self, *args, **kwargs):
+        assert len(args) == 0
+        for k, v in kwargs.items():
+            setattr(self, k, v)
+
+    def __repr__(self):
+        return "{}({})".format(
+            self.__class__.__name__,
+            ", ".join(
+                "{}={}".format(
+                    k,
+                    'bytes.fromhex("{}")'.format(v.hex())
+                    if isinstance(v, bytes)
+                    else v.__repr__(),
+                )
+                for k, v in self.__dict__.items()
+                if not k.startswith("_")
+            ),
+        )
+
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
+        if length > 1 and len(payload) + adjust != length:
+            log.warning(
+                "length is %d but payload length is %d", length, len(payload)
+            )
+        return cls(length=length, proto=proto, payload=payload)
+
+    def response(self, *args):
+        if len(args) == 0:
+            return None
+        assert len(args) == 1 and isinstance(args[0], bytes)
+        payload = args[0]
+        length = len(payload) + 1
+        if length > 6:
+            length -= 6
+        return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
+
+
+class UNKNOWN(_GT06pkt):
+    pass
+
+
+class LOGIN(_GT06pkt):
+    PROTO = 0x01
+
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.imei = payload[:-1].hex()
+        self.ver = unpack("B", payload[-1:])[0]
+        return self
+
+    def response(self):
+        return super().response(b"")
+
+
+class SUPERVISION(_GT06pkt):
+    PROTO = 0x05
+
+
+class HEARTBEAT(_GT06pkt):
+    PROTO = 0x08
+
+
+class GPS_POSITIONING(_GT06pkt):
+    PROTO = 0x10
+
+
+class GPS_OFFLINE_POSITIONING(_GT06pkt):
+    PROTO = 0x11
+
+
+class STATUS(_GT06pkt):
+    PROTO = 0x13
+
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        if len(payload) == 5:
+            self.batt, self.ver, self.intvl, self.signal, _ = unpack(
+                "BBBBB", payload
+            )
+        elif len(payload) == 4:
+            self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
+            self.signal = None
+        return self
+
+
+class HIBERNATION(_GT06pkt):
+    PROTO = 0x14
+
+
+class RESET(_GT06pkt):
+    PROTO = 0x15
+
+
+class WHITELIST_TOTAL(_GT06pkt):
+    PROTO = 0x16
+
+
+class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+    PROTO = 0x17
+
+
+class TIME(_GT06pkt):
+    PROTO = 0x30
+
+
+class MOM_PHONE(_GT06pkt):
+    PROTO = 0x43
+
+
+class STOP_ALARM(_GT06pkt):
+    PROTO = 0x56
+
+
+class SETUP(_GT06pkt):
+    PROTO = 0x57
+
+    def response(
+        self,
+        uploadIntervalSeconds=0x0300,
+        binarySwitch=0b00110001,
+        alarms=[0, 0, 0],
+        dndTimeSwitch=0,
+        dndTimes=[0, 0, 0],
+        gpsTimeSwitch=0,
+        gpsTimeStart=0,
+        gpsTimeStop=0,
+        phoneNumbers=["", "", ""],
+    ):
+        def pack3b(x):
+            return pack("!I", x)[1:]
+
+        payload = b"".join(
+            [
+                pack("!H", uploadIntervalSeconds),
+                pack("B", binarySwitch),
+            ]
+            + [pack3b(el) for el in alarms]
+            + [
+                pack("B", dndTimeSwitch),
+            ]
+            + [pack3b(el) for el in dndTimes]
+            + [
+                pack("B", gpsTimeSwitch),
+                pack("!H", gpsTimeStart),
+                pack("!H", gpsTimeStop),
+            ]
+            + [b";".join([el.encode() for el in phoneNumbers])]
+        )
+        return super().response(payload)
+
+
+class SYNCHRONOUS_WHITELIST(_GT06pkt):
+    PROTO = 0x58
+
+
+class RESTORE_PASSWORD(_GT06pkt):
+    PROTO = 0x67
+
+
+class WIFI_POSITIONING(_GT06pkt):
+    PROTO = 0x69
+
+
+class MANUAL_POSITIONING(_GT06pkt):
+    PROTO = 0x80
+
+
+class BATTERY_CHARGE(_GT06pkt):
+    PROTO = 0x81
+
+
+class CHARGER_CONNECTED(_GT06pkt):
+    PROTO = 0x82
+
+
+class CHARGER_DISCONNECTED(_GT06pkt):
+    PROTO = 0x83
+
+
+class VIBRATION_RECEIVED(_GT06pkt):
+    PROTO = 0x94
+
+
+class POSITION_UPLOAD_INTERVAL(_GT06pkt):
+    PROTO = 0x98
+
+
+# Build a dict protocol number -> class
+CLASSES = {}
+if True:  # just to indent the code, sorry!
+    for cls in [
+        cls
+        for name, cls in globals().items()
+        if isclass(cls)
+        and issubclass(cls, _GT06pkt)
+        and not name.startswith("_")
+    ]:
+        if hasattr(cls, "PROTO"):
+            CLASSES[cls.PROTO] = cls
+
+
+def handle_packet(packet, addr, when):
+    if len(packet) < 6:
+        msg = UNKNOWN.from_packet(0, 0, packet)
+    else:
+        xx, length, proto = unpack("!2sBB", packet[:4])
+        crlf = packet[-2:]
+        payload = packet[4:-2]
+        if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
+            msg = UNKNOWN.from_packet(length, proto, packet)
+        else:
+            msg = CLASSES[proto].from_packet(length, proto, payload)
+    return msg
+
+def make_response(msg):
+    return msg.response()