--- /dev/null
+**/__pycache__
+++ /dev/null
-#!/usr/bin/env python3
-
-from enum import Enum
-from select import poll, POLLIN, POLLERR, POLLHUP, POLLPRI
-from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
-from struct import pack, unpack
-import sys
-from time import time
-
-PORT = 4303
-
-class P(Enum):
- UNKNOWN = 0x00
- LOGIN = 0x01
- STATUS = 0x13
- HIBERNATION = 0x14
- time = 0x30
- SETUP = 0x57
-
-
-
-def answer_setup(data):
- return bytes.fromhex("0300310000000000000000000000000000000000000000000000003b3b3b")
-
-def handle_packet(packet, addr, when):
- xx, length, proto = unpack("!2sBB", packet[:4])
- crlf = packet[-2:]
- data = packet[4:-2]
- if xx != b"xx" or crlf != b"\r\n" or (length > 1 and len(packet) != length + 2):
- print("bad packet:", packet.hex())
- print("length", length, "proto", hex(proto))
- try:
- p = P(proto)
- except ValueError:
- p = P.UNKNOWN
- payload = b""
- if p == P.LOGIN:
- print("imei", data[:-1].hex(), "ver", data[-1:].hex())
- elif p == P.SETUP:
- payload = answer_setup(data)
- length = len(payload)+1
- if length > 6:
- length -= 6
- return b"xx" + pack("B", length) + pack("B", proto) + payload + b"\r\n"
-
-if __name__ == "__main__":
- ctlsock = socket(AF_INET, SOCK_STREAM)
- ctlsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
- ctlsock.bind(("", PORT))
- ctlsock.listen(5)
- ctlfd = ctlsock.fileno()
- pollset = poll()
- pollset.register(ctlfd, POLLIN | POLLERR | POLLHUP | POLLPRI)
- clnt_dict = {}
- while True:
- try:
- events = pollset.poll(1000)
- except KeyboardInterrupt:
- print("Exiting")
- sys.exit(0)
- for fd, ev in events:
- if fd == ctlfd:
- if ev & POLLIN:
- clntsock, clntaddr = ctlsock.accept()
- clntfd = clntsock.fileno()
- clnt_dict[clntfd] = (clntsock, clntaddr)
- pollset.register(clntfd, POLLIN | POLLERR | POLLHUP | POLLPRI)
- print("accepted connection from", clntaddr, "fd", clntfd)
- if ev & ~POLLIN:
- print("unexpected event on ctlfd:", ev)
- else:
- try:
- clntsock, clntaddr = clnt_dict[fd]
- except KeyError: # this socket closed already
- continue
- if ev & POLLIN:
- packet = clntsock.recv(4096)
- when = time()
- print("packet", packet, "from", clntaddr, "from fd", fd)
- if packet:
- response = handle_packet(packet, clntaddr, when)
- if response:
- try:
- # Ignore possibility of blocking
- clntsock.send(response)
- except OSError as e:
- print("sending to socket", fd, "error", e)
- else:
- print("disconnect")
- pollset.unregister(fd)
- clntsock.close()
- del clnt_dict[fd]
- if ev & ~POLLIN:
- print("unexpected event", ev, "on fd", fd)
--- /dev/null
+"""
+Implementation of the protocol used by zx303 GPS+GPRS module
+Description from https://github.com/tobadia/petGPS/tree/master/resources
+"""
+
+from inspect import isclass
+from logging import getLogger
+from struct import pack, unpack
+
+__all__ = ("handle_packet", "make_response")
+
+log = getLogger("gps303")
+
+
+class _GT06pkt:
+ PROTO: int
+
+ def __init__(self, *args, **kwargs):
+ assert len(args) == 0
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
+ def __repr__(self):
+ return "{}({})".format(
+ self.__class__.__name__,
+ ", ".join(
+ "{}={}".format(
+ k,
+ 'bytes.fromhex("{}")'.format(v.hex())
+ if isinstance(v, bytes)
+ else v.__repr__(),
+ )
+ for k, v in self.__dict__.items()
+ if not k.startswith("_")
+ ),
+ )
+
+ @classmethod
+ def from_packet(cls, length, proto, payload):
+ adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
+ if length > 1 and len(payload) + adjust != length:
+ log.warning(
+ "length is %d but payload length is %d", length, len(payload)
+ )
+ return cls(length=length, proto=proto, payload=payload)
+
+ def response(self, *args):
+ if len(args) == 0:
+ return None
+ assert len(args) == 1 and isinstance(args[0], bytes)
+ payload = args[0]
+ length = len(payload) + 1
+ if length > 6:
+ length -= 6
+ return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
+
+
+class UNKNOWN(_GT06pkt):
+ pass
+
+
+class LOGIN(_GT06pkt):
+ PROTO = 0x01
+
+ @classmethod
+ def from_packet(cls, length, proto, payload):
+ self = super().from_packet(length, proto, payload)
+ self.imei = payload[:-1].hex()
+ self.ver = unpack("B", payload[-1:])[0]
+ return self
+
+ def response(self):
+ return super().response(b"")
+
+
+class SUPERVISION(_GT06pkt):
+ PROTO = 0x05
+
+
+class HEARTBEAT(_GT06pkt):
+ PROTO = 0x08
+
+
+class GPS_POSITIONING(_GT06pkt):
+ PROTO = 0x10
+
+
+class GPS_OFFLINE_POSITIONING(_GT06pkt):
+ PROTO = 0x11
+
+
+class STATUS(_GT06pkt):
+ PROTO = 0x13
+
+ @classmethod
+ def from_packet(cls, length, proto, payload):
+ self = super().from_packet(length, proto, payload)
+ if len(payload) == 5:
+ self.batt, self.ver, self.intvl, self.signal, _ = unpack(
+ "BBBBB", payload
+ )
+ elif len(payload) == 4:
+ self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
+ self.signal = None
+ return self
+
+
+class HIBERNATION(_GT06pkt):
+ PROTO = 0x14
+
+
+class RESET(_GT06pkt):
+ PROTO = 0x15
+
+
+class WHITELIST_TOTAL(_GT06pkt):
+ PROTO = 0x16
+
+
+class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+ PROTO = 0x17
+
+
+class TIME(_GT06pkt):
+ PROTO = 0x30
+
+
+class MOM_PHONE(_GT06pkt):
+ PROTO = 0x43
+
+
+class STOP_ALARM(_GT06pkt):
+ PROTO = 0x56
+
+
+class SETUP(_GT06pkt):
+ PROTO = 0x57
+
+ def response(
+ self,
+ uploadIntervalSeconds=0x0300,
+ binarySwitch=0b00110001,
+ alarms=[0, 0, 0],
+ dndTimeSwitch=0,
+ dndTimes=[0, 0, 0],
+ gpsTimeSwitch=0,
+ gpsTimeStart=0,
+ gpsTimeStop=0,
+ phoneNumbers=["", "", ""],
+ ):
+ def pack3b(x):
+ return pack("!I", x)[1:]
+
+ payload = b"".join(
+ [
+ pack("!H", uploadIntervalSeconds),
+ pack("B", binarySwitch),
+ ]
+ + [pack3b(el) for el in alarms]
+ + [
+ pack("B", dndTimeSwitch),
+ ]
+ + [pack3b(el) for el in dndTimes]
+ + [
+ pack("B", gpsTimeSwitch),
+ pack("!H", gpsTimeStart),
+ pack("!H", gpsTimeStop),
+ ]
+ + [b";".join([el.encode() for el in phoneNumbers])]
+ )
+ return super().response(payload)
+
+
+class SYNCHRONOUS_WHITELIST(_GT06pkt):
+ PROTO = 0x58
+
+
+class RESTORE_PASSWORD(_GT06pkt):
+ PROTO = 0x67
+
+
+class WIFI_POSITIONING(_GT06pkt):
+ PROTO = 0x69
+
+
+class MANUAL_POSITIONING(_GT06pkt):
+ PROTO = 0x80
+
+
+class BATTERY_CHARGE(_GT06pkt):
+ PROTO = 0x81
+
+
+class CHARGER_CONNECTED(_GT06pkt):
+ PROTO = 0x82
+
+
+class CHARGER_DISCONNECTED(_GT06pkt):
+ PROTO = 0x83
+
+
+class VIBRATION_RECEIVED(_GT06pkt):
+ PROTO = 0x94
+
+
+class POSITION_UPLOAD_INTERVAL(_GT06pkt):
+ PROTO = 0x98
+
+
+# Build a dict protocol number -> class
+CLASSES = {}
+if True: # just to indent the code, sorry!
+ for cls in [
+ cls
+ for name, cls in globals().items()
+ if isclass(cls)
+ and issubclass(cls, _GT06pkt)
+ and not name.startswith("_")
+ ]:
+ if hasattr(cls, "PROTO"):
+ CLASSES[cls.PROTO] = cls
+
+
+def handle_packet(packet, addr, when):
+ if len(packet) < 6:
+ msg = UNKNOWN.from_packet(0, 0, packet)
+ else:
+ xx, length, proto = unpack("!2sBB", packet[:4])
+ crlf = packet[-2:]
+ payload = packet[4:-2]
+ if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
+ msg = UNKNOWN.from_packet(length, proto, packet)
+ else:
+ msg = CLASSES[proto].from_packet(length, proto, payload)
+ return msg
+
+def make_response(msg):
+ return msg.response()
--- /dev/null
+from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging.handlers import SysLogHandler
+from select import poll, POLLIN, POLLERR, POLLHUP, POLLPRI
+from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+import sys
+from time import time
+
+from .GT06mod import handle_packet, make_response, LOGIN
+from .evstore import initdb, stow
+
+PORT = 4303
+log = getLogger("gps303")
+
+if __name__.endswith("__main__"):
+ if sys.stdout.isatty():
+ log.addHandler(StreamHandler(sys.stderr))
+ log.setLevel(DEBUG)
+ else:
+ log.addHandler(SysLogHandler(address="/dev/log"))
+ log.setLevel(INFO)
+
+ initdb("/tmp/gps303.sqlite")
+
+ ctlsock = socket(AF_INET, SOCK_STREAM)
+ ctlsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ ctlsock.bind(("", PORT))
+ ctlsock.listen(5)
+ ctlfd = ctlsock.fileno()
+ pollset = poll()
+ pollset.register(ctlfd, POLLIN | POLLERR | POLLHUP | POLLPRI)
+ clnt_dict = {}
+ while True:
+ try:
+ events = pollset.poll(1000)
+ except KeyboardInterrupt:
+ log.info("Exiting")
+ sys.exit(0)
+ for fd, ev in events:
+ if fd == ctlfd:
+ if ev & POLLIN:
+ clntsock, clntaddr = ctlsock.accept()
+ clntfd = clntsock.fileno()
+ clnt_dict[clntfd] = (clntsock, clntaddr, None)
+ pollset.register(
+ clntfd, POLLIN | POLLERR | POLLHUP | POLLPRI
+ )
+ log.debug(
+ "accepted connection from %s as fd %d",
+ clntaddr,
+ clntfd,
+ )
+ if ev & ~POLLIN:
+ log.debug("unexpected event on ctlfd: %s", ev)
+ else:
+ try:
+ clntsock, clntaddr, imei = clnt_dict[fd]
+ except KeyError: # this socket closed already
+ continue
+ if ev & POLLIN:
+ packet = clntsock.recv(4096)
+ when = time()
+ if packet:
+ msg = handle_packet(packet, clntaddr, when)
+ log.debug("%s from %s fd %d'", msg, clntaddr, fd)
+ if isinstance(msg, LOGIN):
+ imei = msg.imei
+ clnt_dict[fd] = (clntsock, clntaddr, imei)
+ stow(clntaddr, when, imei, msg.proto, msg.payload)
+ response = make_response(msg)
+ if response:
+ try:
+ # Ignore possibility of blocking
+ clntsock.send(make_response(msg))
+ except OSError as e:
+ log.debug("sending to fd %d error %s", fd, e)
+ else:
+ log.info("disconnect fd %d imei %s", fd, imei)
+ pollset.unregister(fd)
+ clntsock.close()
+ del clnt_dict[fd]
+ if ev & ~POLLIN:
+ log.warning("unexpected event", ev, "on fd", fd)
--- /dev/null
+from logging import getLogger
+from sqlite3 import connect
+
+__all__ = ("initdb", "stow")
+
+log = getLogger("gps303")
+
+DB = None
+
+SCHEMA = """create table if not exists events (
+ timestamp real not null,
+ imei text,
+ clntaddr text not null,
+ proto int not null,
+ payload blob
+)"""
+
+
+def initdb(dbname):
+ global DB
+ DB = connect(dbname)
+ DB.execute(SCHEMA)
+
+
+def stow(clntaddr, timestamp, imei, proto, payload):
+ assert DB is not None
+ parms = dict(
+ zip(
+ ("clntaddr", "timestamp", "imei", "proto", "payload"),
+ (str(clntaddr), timestamp, imei, proto, payload),
+ )
+ )
+ log.debug("inserting %s", parms)
+ DB.execute(
+ """insert or ignore into events
+ (timestamp, imei, clntaddr, proto, payload)
+ values (:timestamp, :imei, :clntaddr, :proto, :payload)""",
+ parms,
+ )
+ DB.commit()