From ba54a11f5710a714ae4df0c5d878229872386eb6 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Sat, 2 Jul 2022 01:33:07 +0200 Subject: [PATCH] Initial multiprotocol support Protocol module is loaded dynamically --- debian/gps303.conf | 2 + gps303/collector.py | 131 ++++++++++++++++++++++++++++++++---------- gps303/gps303proto.py | 14 ++++- test/common.py | 11 +++- test/test_storage.py | 23 ++++---- 5 files changed, 137 insertions(+), 44 deletions(-) diff --git a/debian/gps303.conf b/debian/gps303.conf index 9d5ed3c..8bfff8e 100644 --- a/debian/gps303.conf +++ b/debian/gps303.conf @@ -4,6 +4,8 @@ port = 4303 publishurl = ipc:///var/lib/gps303/collected listenurl = ipc:///var/lib/gps303/responses +# comma-separated list of tracker protocols to accept +protocols = gps303proto [wsgateway] port = 5049 diff --git a/gps303/collector.py b/gps303/collector.py index df1a474..8adaeb3 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,6 +1,7 @@ """ TCP server that communicates with terminals """ from configparser import ConfigParser +from importlib import import_module from logging import getLogger from os import umask from socket import ( @@ -13,18 +14,10 @@ from socket import ( ) from struct import pack from time import time -from typing import Dict, List, Optional, Tuple +from typing import Any, cast, Dict, List, Optional, Tuple, Union import zmq from . import common -from .gps303proto import ( - GPS303Conn, - is_goodbye_packet, - imei_from_packet, - inline_response, - parse_message, - proto_of_message, -) from .zmsg import Bcast, Resp log = getLogger("gps303/collector") @@ -32,19 +25,67 @@ log = getLogger("gps303/collector") MAXBUFFER: int = 4096 +class ProtoModule: + class Stream: + @staticmethod + def enframe(buffer: bytes) -> bytes: + ... + + def recv(self, segment: bytes) -> List[Union[bytes, str]]: + ... + + def close(self) -> bytes: + ... + + @staticmethod + def probe_buffer(buffer: bytes) -> bool: + ... + + @staticmethod + def parse_message(packet: bytes, is_incoming: bool = True) -> Any: + ... + + @staticmethod + def inline_response(packet: bytes) -> Optional[bytes]: + ... + + @staticmethod + def is_goodbye_packet(packet: bytes) -> bool: + ... + + @staticmethod + def imei_from_packet(packet: bytes) -> Optional[str]: + ... + + @staticmethod + def proto_of_message(packet: bytes) -> int: + ... + + @staticmethod + def proto_by_name(name: str) -> int: + ... + + +pmods: List[ProtoModule] = [] + + class Client: """Connected socket to the terminal plus buffer and metadata""" def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.stream = GPS303Conn() + self.pmod: Optional[ProtoModule] = None + self.stream: Optional[ProtoModule.Stream] = None self.imei: Optional[str] = None def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - rest = self.stream.close() + if self.stream: + rest = self.stream.close() + else: + rest = b"" if rest: log.warning("%d bytes in buffer on close: %s", len(rest), rest) @@ -67,6 +108,20 @@ class Client: self.imei, ) return None + if self.stream is None: + for pmod in pmods: + if pmod.probe_buffer(segment): + self.pmod = pmod + self.stream = pmod.Stream() + break + if self.stream is None: + log.info( + "unrecognizable %d bytes of data %s from fd %d", + len(segment), + segment[:32].hex(), + self.sock.fileno(), + ) + return [] when = time() msgs = [] for elem in self.stream.recv(segment): @@ -82,6 +137,7 @@ class Client: return msgs def send(self, buffer: bytes) -> None: + assert self.stream is not None try: self.sock.send(self.stream.enframe(buffer)) except OSError as e: @@ -114,15 +170,18 @@ class Clients: def recv( self, fd: int - ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]: + ) -> Optional[ + List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]] + ]: clnt = self.by_fd[fd] msgs = clnt.recv() if msgs is None: return None result = [] for when, peeraddr, packet in msgs: + assert clnt.pmod is not None if clnt.imei is None: - imei = imei_from_packet(packet) + imei = clnt.pmod.imei_from_packet(packet) if imei is not None: log.info("LOGIN from fd %d (IMEI %s)", fd, imei) clnt.imei = imei @@ -140,7 +199,7 @@ class Clients: peeraddr, packet, ) - result.append((clnt.imei, when, peeraddr, packet)) + result.append((clnt.pmod, clnt.imei, when, peeraddr, packet)) log.debug( "Received from %s (IMEI %s): %s", peeraddr, @@ -149,14 +208,22 @@ class Clients: ) return result - def response(self, resp: Resp) -> None: + def response(self, resp: Resp) -> Optional[ProtoModule]: if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.packet) + clnt = self.by_imei[resp.imei] + clnt.send(resp.packet) + return clnt.pmod else: log.info("Not connected (IMEI %s)", resp.imei) + return None def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: + global pmods + pmods = [ + cast(ProtoModule, import_module("." + modnm, __package__)) + for modnm in conf.get("collector", "protocols").split(",") + ] # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zpub = zctx.socket(zmq.PUB) # type: ignore @@ -199,8 +266,8 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: log.debug("Terminal gone from fd %d", sk) tostop.append(sk) else: - for imei, when, peeraddr, packet in received: - proto = proto_of_message(packet) + for pmod, imei, when, peeraddr, packet in received: + proto = pmod.proto_of_message(packet) zpub.send( Bcast( proto=proto, @@ -210,14 +277,17 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: packet=packet, ).packed ) - if is_goodbye_packet(packet) and handle_hibernate: + if ( + pmod.is_goodbye_packet(packet) + and handle_hibernate + ): log.debug( "Goodbye from fd %d (IMEI %s)", sk, imei, ) tostop.append(sk) - respmsg = inline_response(packet) + respmsg = pmod.inline_response(packet) if respmsg is not None: tosend.append( Resp(imei=imei, when=when, packet=respmsg) @@ -226,17 +296,18 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for zmsg in tosend: - zpub.send( - Bcast( - is_incoming=False, - proto=proto_of_message(zmsg.packet), - when=zmsg.when, - imei=zmsg.imei, - packet=zmsg.packet, - ).packed - ) log.debug("Sending to the client: %s", zmsg) - clients.response(zmsg) + rpmod = clients.response(zmsg) + if rpmod is not None: + zpub.send( + Bcast( + is_incoming=False, + proto=rpmod.proto_of_message(zmsg.packet), + when=zmsg.when, + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) for fd in tostop: poller.unregister(fd) # type: ignore clients.stop(fd) diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index e597b2b..e3776c3 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -32,10 +32,11 @@ from typing import ( ) __all__ = ( - "GPS303Conn", + "Stream", "class_by_prefix", "inline_response", "parse_message", + "probe_buffer", "proto_by_name", "DecodeError", "Respond", @@ -84,7 +85,7 @@ __all__ = ( MAXBUFFER: int = 4096 -class GPS303Conn: +class Stream: def __init__(self) -> None: self.buffer = b"" @@ -900,6 +901,15 @@ def inline_response(packet: bytes) -> Optional[bytes]: return None +def probe_buffer(buffer: bytes) -> bool: + framestart = buffer.find(b"xx") + if framestart < 0: + return False + if len(buffer) - framestart < 6: + return False + return True + + def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: """From a packet (without framing bytes) derive the XXX.In object""" length, proto = unpack("BB", packet[:2]) diff --git a/test/common.py b/test/common.py index 1672f46..ba4c12a 100644 --- a/test/common.py +++ b/test/common.py @@ -4,6 +4,7 @@ from configparser import ConfigParser, SectionProxy from contextlib import closing, ExitStack from http.server import HTTPServer, SimpleHTTPRequestHandler from importlib import import_module +from logging import DEBUG, StreamHandler from multiprocessing import Process from os import kill, unlink from signal import SIGINT @@ -16,7 +17,7 @@ from socket import ( socket, SocketType, ) -from sys import exit +from sys import exit, stderr from tempfile import mkstemp from time import sleep from typing import Optional @@ -26,7 +27,9 @@ NUMPORTS = 3 class TestWithServers(TestCase): - def setUp(self, *args: str, httpd: bool = False) -> None: + def setUp( + self, *args: str, httpd: bool = False, verbose: bool = False + ) -> None: freeports = [] with ExitStack() as stack: for _ in range(NUMPORTS): @@ -40,6 +43,7 @@ class TestWithServers(TestCase): "port": str(freeports[0]), "publishurl": "ipc://" + self.tmpfilebase + ".pub", "listenurl": "ipc://" + self.tmpfilebase + ".pul", + "protocols": "gps303proto", } self.conf["storage"] = { "dbfn": self.tmpfilebase + ".storage.sqlite", @@ -61,6 +65,9 @@ class TestWithServers(TestCase): else: kwargs = {} cls = import_module("gps303." + srvname, package=".") + if verbose: + cls.log.addHandler(StreamHandler(stderr)) + cls.log.setLevel(DEBUG) p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs) p.start() self.children.append((srvname, p)) diff --git a/test/test_storage.py b/test/test_storage.py index 449aa51..1747737 100644 --- a/test/test_storage.py +++ b/test/test_storage.py @@ -13,7 +13,9 @@ from gps303.ocid_dload import SCHEMA class Storage(TestWithServers): def setUp(self, *args: str, **kwargs: Any) -> None: - super().setUp("collector", "storage", "lookaside", "termconfig") + super().setUp( + "collector", "storage", "lookaside", "termconfig", verbose=True + ) with connect(self.conf.get("opencellid", "dbfn")) as ldb: ldb.execute(SCHEMA) ldb.executemany( @@ -184,8 +186,8 @@ class Storage(TestWithServers): super().tearDown() def test_storage(self) -> None: - for buf in ( - LOGIN.In(imei="9999123456780000", ver=9).packed, + for msg in ( + LOGIN.In(imei="9999123456780000", ver=9), WIFI_POSITIONING.In( mnc=3, mcc=262, @@ -198,21 +200,22 @@ class Storage(TestWithServers): (24420, 36243, -78), (24420, 17012, -44), ], - ).packed, - SETUP.In().packed, - STATUS.In(signal=87).packed, - HIBERNATION.In().packed, + ), + SETUP.In(), + STATUS.In(signal=87), + HIBERNATION.In(), ): - send_and_drain(self.sock, b"xx" + buf + b"\r\n") - self.sock.close() + print("Send:", msg) + send_and_drain(self.sock, b"xx" + msg.packed + b"\r\n") sleep(1) + self.sock.close() got = set() with connect(self.conf.get("storage", "dbfn")) as db: for is_incoming, packet in db.execute( "select is_incoming, packet from events" ): msg = parse_message(packet, is_incoming=is_incoming) - print(msg) + print("Stored:", msg) got.add(type(msg)) self.assertEqual( got, -- 2.43.0