From ecb31fcdc354471235ba44edbad3b24a6bded905 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Fri, 22 Apr 2022 00:58:15 +0200 Subject: [PATCH] the whole shebang is working now --- gps303/collector.py | 22 +++++++++------ gps303/gps303proto.py | 61 ++++++++++++++------------------------- gps303/lookaside.py | 66 ++++++++++++++++++++++++++++++++++++------- gps303/termconfig.py | 54 +++++++++++++++++++++++++++++++++++ gps303/zmsg.py | 30 ++++++++++++++++++-- 5 files changed, 172 insertions(+), 61 deletions(-) create mode 100644 gps303/termconfig.py diff --git a/gps303/collector.py b/gps303/collector.py index be1bdec..921833c 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -32,7 +32,6 @@ class Client: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() self.buffer = b"" - self.imei = None def recv(self): """Read from the socket and parse complete messages""" @@ -127,21 +126,23 @@ class Clients: def response(self, resp): if resp.imei in self.by_imei: self.by_imei[resp.imei].send(resp.packet) + else: + log.info("Not connected (IMEI %s)", resp.imei) def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) zpub.bind(conf.get("collector", "publishurl")) - zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("collector", "listenurl")) + zpull = zctx.socket(zmq.PULL) + zpull.bind(conf.get("collector", "listenurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) tcpfd = tcpl.fileno() poller = zmq.Poller() - poller.register(zsub, flags=zmq.POLLIN) + poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() try: @@ -149,19 +150,19 @@ def runserver(conf): tosend = [] topoll = [] tostop = [] - events = poller.poll(10) + events = poller.poll(1000) for sk, fl in events: - if sk is zsub: + if sk is zpull: while True: try: - msg = zsub.recv(zmq.NOBLOCK) + msg = zpull.recv(zmq.NOBLOCK) tosend.append(Resp(msg)) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) - else: + elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: log.debug( @@ -192,15 +193,18 @@ def runserver(conf): clients.response( Resp(imei=imei, packet=respmsg) ) + else: + log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd) clients.stop(fd) for zmsg in tosend: + log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) - poller.register(fd) + poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: pass diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 445c34a..32aae55 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -58,6 +58,7 @@ log = getLogger("gps303") class GPS303Pkt: PROTO: int + INLINE = True def __init__(self, *args, **kwargs): assert len(args) == 0 @@ -96,15 +97,15 @@ class GPS303Pkt: @classmethod def inline_response(cls, packet): - return cls.make_packet(b"") + if cls.INLINE: + return cls.make_packet(b"") + else: + return None class UNKNOWN(GPS303Pkt): PROTO = 256 # > 255 is impossible in real packets - - @classmethod - def inline_response(cls, packet): - return None + INLINE = False class LOGIN(GPS303Pkt): @@ -117,18 +118,16 @@ class LOGIN(GPS303Pkt): self.ver = unpack("B", payload[-1:])[0] return self - def response(self): - return self.make_packet(b"") - class SUPERVISION(GPS303Pkt): # Server sends supervision number status PROTO = 0x05 + INLINE = False def response(self, supnum=0): # 1: The device automatically answers Pickup effect # 2: Automatically Answering Two-way Calls # 3: Ring manually answer the two-way call - return super().response(b"") + return self.make_packet(pack("B", supnum)) class HEARTBEAT(GPS303Pkt): @@ -163,9 +162,6 @@ class _GPS_POSITIONING(GPS303Pkt): def inline_response(cls, packet): return cls.make_packet(packet[2:8]) - def response(self): - return self.make_packet(self.dtime) - class GPS_POSITIONING(_GPS_POSITIONING): PROTO = 0x10 @@ -177,6 +173,7 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING): class STATUS(GPS303Pkt): PROTO = 0x13 + INLINE = False @classmethod def from_packet(cls, length, payload): @@ -196,12 +193,8 @@ class STATUS(GPS303Pkt): self.signal = None return self - @classmethod - def inline_response(cls, packet): - return None - def response(self, upload_interval=25): # Set interval in minutes - return super().response(pack("B", upload_interval)) + return self.make_packet(pack("B", upload_interval)) class HIBERNATION(GPS303Pkt): @@ -210,6 +203,7 @@ class HIBERNATION(GPS303Pkt): class RESET(GPS303Pkt): # Device sends when it got reset SMS PROTO = 0x15 + INLINE = False def response(self): # Server can send to initiate factory reset return self.make_packet(b"") @@ -217,9 +211,10 @@ class RESET(GPS303Pkt): # Device sends when it got reset SMS class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58) PROTO = 0x16 + INLINE = False def response(self, number=3): # Number of whitelist entries - return super().response(pack("B", number)) + return self.make_packet(pack("B", number)) class _WIFI_POSITIONING(GPS303Pkt): @@ -258,9 +253,6 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): def inline_response(cls, packet): return cls.make_packet(packet[2:8]) - def response(self): - return super().response(self.dtime) - class TIME(GPS303Pkt): PROTO = 0x30 @@ -271,16 +263,13 @@ class TIME(GPS303Pkt): "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6] ) - def response(self): - payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6]) - return super().response(payload) - class PROHIBIT_LBS(GPS303Pkt): PROTO = 0x33 + INLINE = False def response(self, status=1): # Server sent, 0-off, 1-on - return super().response(pack("B", status)) + return self.make_packet(pack("B", status)) class MOM_PHONE(GPS303Pkt): @@ -290,9 +279,6 @@ class MOM_PHONE(GPS303Pkt): class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device PROTO = 0x44 - def response(self): - return super().response(b"") - class STOP_ALARM(GPS303Pkt): PROTO = 0x56 @@ -300,6 +286,7 @@ class STOP_ALARM(GPS303Pkt): class SETUP(GPS303Pkt): PROTO = 0x57 + INLINE = False def response( self, @@ -333,7 +320,7 @@ class SETUP(GPS303Pkt): ] + [b";".join([el.encode() for el in phoneNumbers])] ) - return super().response(payload) + return self.make_packet(payload) class SYNCHRONOUS_WHITELIST(GPS303Pkt): @@ -346,10 +333,7 @@ class RESTORE_PASSWORD(GPS303Pkt): class WIFI_POSITIONING(_WIFI_POSITIONING): PROTO = 0x69 - - @classmethod - def inline_response(cls, packet): - return None + INLINE = False def response(self, lat=None, lon=None): if lat is None or lon is None: @@ -383,6 +367,7 @@ class VIBRATION_RECEIVED(GPS303Pkt): class POSITION_UPLOAD_INTERVAL(GPS303Pkt): PROTO = 0x98 + INLINE = False @classmethod def from_packet(cls, length, payload): @@ -390,12 +375,8 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt): self.interval = unpack("!H", payload[:2]) return self - @classmethod - def inline_response(cls, packet): - return cls.make_packet(packet[2:4]) - - def response(self): - return self.make_packet(pack("!H", self.interval)) + def response(self, interval=10): + return self.make_packet(pack("!H", interval)) class SOS_ALARM(GPS303Pkt): diff --git a/gps303/lookaside.py b/gps303/lookaside.py index d9af989..05b7a49 100644 --- a/gps303/lookaside.py +++ b/gps303/lookaside.py @@ -1,13 +1,59 @@ -""" -For when responding to the terminal is not trivial -""" +""" Estimate coordinates from WIFI_POSITIONING and send back """ -from .gps303proto import * +from datetime import datetime, timezone +from logging import getLogger +from struct import pack +import zmq + +from . import common +from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING from .opencellid import qry_cell +from .zmsg import Bcast, Resp + +log = getLogger("gps303/lookaside") + + +def runserver(conf): + zctx = zmq.Context() + zsub = zctx.socket(zmq.SUB) + zsub.connect(conf.get("collector", "publishurl")) + topic = pack("B", proto_by_name("WIFI_POSITIONING")) + zsub.setsockopt(zmq.SUBSCRIBE, topic) + zpush = zctx.socket(zmq.PUSH) + zpush.connect(conf.get("collector", "listenurl")) + + try: + while True: + zmsg = Bcast(zsub.recv()) + msg = parse_message(zmsg.packet) + log.debug( + "IMEI %s from %s at %s: %s", + zmsg.imei, + zmsg.peeraddr, + datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), + msg, + ) + if not isinstance(msg, WIFI_POSITIONING): + log.error( + "IMEI %s from %s at %s: %s", + zmsg.imei, + zmsg.peeraddr, + datetime.fromtimestamp(zmsg.when).astimezone( + tz=timezone.utc + ), + msg, + ) + continue + lat, lon = qry_cell( + conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells + ) + resp = Resp(imei=zmsg.imei, packet=msg.response(lat=lat, lon=lon)) + log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp) + zpush.send(resp.packed) + + except KeyboardInterrupt: + pass + -def prepare_response(conf, msg): - if isinstance(msg, WIFI_POSITIONING): - lat, lon = qry_cell(conf["opencellid"]["dbfn"], - msg.mcc, msg.gsm_cells) - return {"lat": lat, "lon": lon} - return {} +if __name__.endswith("__main__"): + runserver(common.init(log)) diff --git a/gps303/termconfig.py b/gps303/termconfig.py new file mode 100644 index 0000000..10c6286 --- /dev/null +++ b/gps303/termconfig.py @@ -0,0 +1,54 @@ +""" For when responding to the terminal is not trivial """ + +from datetime import datetime, timezone +from logging import getLogger +from struct import pack +import zmq + +from . import common +from .gps303proto import parse_message, proto_by_name +from .zmsg import Bcast, Resp + +log = getLogger("gps303/termconfig") + + +def runserver(conf): + zctx = zmq.Context() + zsub = zctx.socket(zmq.SUB) + zsub.connect(conf.get("collector", "publishurl")) + for protoname in ( + "SUPERVISION", + "STATUS", + "RESET", + "WHITELIST_TOTAL", + "PROHIBIT_LBS", + "SETUP", + "POSITION_UPLOAD_INTERVAL", + ): + topic = pack("B", proto_by_name(protoname)) + zsub.setsockopt(zmq.SUBSCRIBE, topic) + zpush = zctx.socket(zmq.PUSH) + zpush.connect(conf.get("collector", "listenurl")) + + try: + while True: + zmsg = Bcast(zsub.recv()) + msg = parse_message(zmsg.packet) + log.debug( + "IMEI %s from %s at %s: %s", + zmsg.imei, + zmsg.peeraddr, + datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), + msg, + ) + # TODO get data from the config + resp = Resp(imei=zmsg.imei, packet=msg.response()) + log.debug("Response: %s", resp) + zpush.send(resp.packed) + + except KeyboardInterrupt: + pass + + +if __name__.endswith("__main__"): + runserver(common.init(log)) diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 7ab3ce4..6e591ab 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -5,13 +5,19 @@ from struct import pack, unpack __all__ = "Bcast", "Resp" + def pack_peer(peeraddr): saddr, port, _x, _y = peeraddr addr6 = ip.ip_address(saddr) addr = addr6.ipv4_mapped if addr is None: addr = addr6 - return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port) + return ( + pack("B", addr.version) + + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + + pack("!H", port) + ) + def unpack_peer(buffer): version = buffer[0] @@ -41,6 +47,22 @@ class _Zmsg: + str(kwargs) ) + def __repr__(self): + return "{}({})".format( + self.__class__.__name__, + ", ".join( + [ + "{}={}".format( + k, + 'bytes.fromhex("{}")'.format(getattr(self, k).hex()) + if isinstance(getattr(self, k), bytes) + else getattr(self, k), + ) + for k, _ in self.KWARGS + ] + ), + ) + def decode(self, buffer): raise RuntimeError( self.__class__.__name__ + "must implement `encode()` method" @@ -69,7 +91,11 @@ class Bcast(_Zmsg): return ( pack("B", self.proto) + ("0000000000000000" if self.imei is None else self.imei).encode() - + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when)) + + ( + b"\0\0\0\0\0\0\0\0" + if self.when is None + else pack("!d", self.when) + ) + pack_peer(self.peeraddr) + self.packet ) -- 2.43.0