From 513eaed4a832748bd77294679939627eaaf69612 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Wed, 20 Apr 2022 23:11:25 +0200 Subject: [PATCH] full encoder/decoder for zmq messages --- gps303/collector.py | 53 ++++++++++--------------- gps303/storage.py | 20 +++++++--- gps303/zmsg.py | 97 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 39 deletions(-) create mode 100644 gps303/zmsg.py diff --git a/gps303/collector.py b/gps303/collector.py index 3852854..4cb7956 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -14,33 +14,11 @@ from .gps303proto import ( parse_message, proto_of_message, ) +from .zmsg import Bcast, Resp log = getLogger("gps303/collector") -class Bcast: - """Zmq message to broadcast what was received from the terminal""" - - def __init__(self, imei, msg): - self.as_bytes = ( - pack("B", proto_of_message(msg)) - + ("0000000000000000" if imei is None else imei).encode() - + msg - ) - - -class Resp: - """Zmq message received from a third party to send to the terminal""" - - def __init__(self, *args, **kwargs): - if not kwargs and len(args) == 1 and isinstance(args[0], bytes): - self.imei = msg[:16].decode() - self.payload = msg[16:] - elif len(args) == 0: - self.imei = kwargs["imei"] - self.payload = kwargs["payload"] - - class Client: """Connected socket to the terminal plus buffer and metadata""" @@ -101,7 +79,7 @@ class Client: log.info( "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei ) - msgs.append(packet) + msgs.append((when, self.addr, packet)) return msgs def send(self, buffer): @@ -140,15 +118,15 @@ class Clients: if msgs is None: return None result = [] - for msg in msgs: - if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... + for when, peeraddr, packet in msgs: + if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... self.by_imei[clnt.imei] = clnt - result.append((clnt.imei, msg)) + result.append((clnt.imei, when, peeraddr, packet)) return result def response(self, resp): if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.payload) + self.by_imei[resp.imei].send(resp.packet) def runserver(conf): @@ -191,19 +169,28 @@ def runserver(conf): ) tostop.append(sk) else: - for imei, msg in received: - zpub.send(Bcast(imei, msg).as_bytes) - if proto_of_message(msg) == HIBERNATION.PROTO: + for imei, when, peeraddr, packet in received: + proto = proto_of_message(packet) + zpub.send( + Bcast( + proto=proto, + imei=imei, + when=when, + peeraddr=peeraddr, + packet=packet, + ).packed + ) + if proto == HIBERNATION.PROTO: log.debug( "HIBERNATION from fd %d (IMEI %s)", sk, imei, ) tostop.append(sk) - respmsg = inline_response(msg) + respmsg = inline_response(packet) if respmsg is not None: clients.response( - Resp(imei=imei, payload=respmsg) + Resp(imei=imei, packet=respmsg) ) # poll queue consumed, make changes now for fd in tostop: diff --git a/gps303/storage.py b/gps303/storage.py index 9da1d6d..6047c39 100644 --- a/gps303/storage.py +++ b/gps303/storage.py @@ -1,5 +1,6 @@ """ Store zmq broadcasts to sqlite """ +from datetime import datetime, timezone from getopt import getopt from logging import getLogger from logging.handlers import SysLogHandler @@ -10,9 +11,11 @@ import zmq from . import common from .evstore import initdb, stow from .gps303proto import parse_message +from .zmsg import Bcast log = getLogger("gps303/storage") + def runserver(conf): dbname = conf.get("storage", "dbfn") log.info('Using Sqlite3 database "%s"', dbname) @@ -24,12 +27,17 @@ def runserver(conf): try: while True: - zmsg = zsub.recv() - imei = zmsg[1:17].decode() - packet = zmsg[17:] - msg = parse_message(packet) - log.debug("From IMEI %s: %s", imei, msg) - stow("", time(), imei, msg.length, msg.PROTO, msg.payload) + zmsg = Bcast(zsub.recv()) + msg = parse_message(zmsg.packet) + log.debug("IMEI %s from %s at %s: %s", zmsg.imei, zmsg.peeraddr, datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), msg) + stow( + zmsg.peeraddr, + zmsg.when, + zmsg.imei, + msg.length, + msg.PROTO, + msg.payload, + ) except KeyboardInterrupt: pass diff --git a/gps303/zmsg.py b/gps303/zmsg.py new file mode 100644 index 0000000..cdd23fc --- /dev/null +++ b/gps303/zmsg.py @@ -0,0 +1,97 @@ +""" Zeromq messages """ + +import ipaddress as ip +from struct import pack, unpack + +__all__ = "Bcast", "Resp" + +def pack_peer(peeraddr): + saddr, port = peeraddr + addr = ip.ip_address(saddr) + return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port) + +def unpack_peer(buffer): + version = buffer[0] + if version not in (4, 6): + return None + if version == 4: + addr = ip.IPv4Address(buffer[1:5]) + else: + addr = ip.IPv6Address(buffer[1:17]) + port = unpack("!H", buffer[17:19])[0] + return (addr, port) + + +class _Zmsg: + def __init__(self, *args, **kwargs): + if len(args) == 1: + self.decode(args[0]) + elif bool(kwargs): + for k, v in self.KWARGS: + setattr(self, k, kwargs.get(k, v)) + else: + raise RuntimeError( + self.__class__.__name__ + + ": both args " + + str(args) + + " and kwargs " + + str(kwargs) + ) + + def decode(self, buffer): + raise RuntimeError( + self.__class__.__name__ + "must implement `encode()` method" + ) + + @property + def packed(self): + raise RuntimeError( + self.__class__.__name__ + "must implement `encode()` method" + ) + + +class Bcast(_Zmsg): + """Zmq message to broadcast what was received from the terminal""" + + KWARGS = ( + ("proto", 256), + ("imei", None), + ("when", None), + ("peeraddr", None), + ("packet", b""), + ) + + @property + def packed(self): + return ( + pack("B", self.proto) + + ("0000000000000000" if self.imei is None else self.imei).encode() + + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when)) + + pack_peer(self.peeraddr) + + self.packet + ) + + def decode(self, buffer): + self.proto = buffer[0] + self.imei = buffer[1:17].decode() + if self.imei == "0000000000000000": + self.imei = None + self.when = unpack("!d", buffer[17:25])[0] + self.peeraddr = unpack_peer(buffer[25:44]) + self.packet = buffer[44:] + + +class Resp(_Zmsg): + """Zmq message received from a third party to send to the terminal""" + + KWARGS = (("imei", None), ("packet", b"")) + + @property + def packed(self): + return ( + "0000000000000000" if self.imei is None else self.imei.encode() + ) + self.packet + + def decode(self, buffer): + self.imei = buffer[:16].decode() + self.packet = buffer[16:] -- 2.43.0