]> average.org Git - loctrkd.git/commitdiff
Initial multiprotocol support
authorEugene Crosser <crosser@average.org>
Fri, 1 Jul 2022 23:33:07 +0000 (01:33 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 20:38:10 +0000 (22:38 +0200)
Protocol module is loaded dynamically

debian/gps303.conf
gps303/collector.py
gps303/gps303proto.py
test/common.py
test/test_storage.py

index 9d5ed3ca161c977f4018153d85765309cf502303..8bfff8e140f28faa3b271475585a116fb6cc39e4 100644 (file)
@@ -4,6 +4,8 @@
 port = 4303
 publishurl = ipc:///var/lib/gps303/collected
 listenurl = ipc:///var/lib/gps303/responses
+# comma-separated list of tracker protocols to accept
+protocols = gps303proto
 
 [wsgateway]
 port = 5049
index df1a474f057e0fb68c9af477fc43c714b083de01..8adaeb36fa4671b7f60f7409ecec45e320806215 100644 (file)
@@ -1,6 +1,7 @@
 """ TCP server that communicates with terminals """
 
 from configparser import ConfigParser
+from importlib import import_module
 from logging import getLogger
 from os import umask
 from socket import (
@@ -13,18 +14,10 @@ from socket import (
 )
 from struct import pack
 from time import time
-from typing import Dict, List, Optional, Tuple
+from typing import Any, cast, Dict, List, Optional, Tuple, Union
 import zmq
 
 from . import common
-from .gps303proto import (
-    GPS303Conn,
-    is_goodbye_packet,
-    imei_from_packet,
-    inline_response,
-    parse_message,
-    proto_of_message,
-)
 from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
@@ -32,19 +25,67 @@ log = getLogger("gps303/collector")
 MAXBUFFER: int = 4096
 
 
+class ProtoModule:
+    class Stream:
+        @staticmethod
+        def enframe(buffer: bytes) -> bytes:
+            ...
+
+        def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+            ...
+
+        def close(self) -> bytes:
+            ...
+
+    @staticmethod
+    def probe_buffer(buffer: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
+        ...
+
+    @staticmethod
+    def inline_response(packet: bytes) -> Optional[bytes]:
+        ...
+
+    @staticmethod
+    def is_goodbye_packet(packet: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def imei_from_packet(packet: bytes) -> Optional[str]:
+        ...
+
+    @staticmethod
+    def proto_of_message(packet: bytes) -> int:
+        ...
+
+    @staticmethod
+    def proto_by_name(name: str) -> int:
+        ...
+
+
+pmods: List[ProtoModule] = []
+
+
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
-        self.stream = GPS303Conn()
+        self.pmod: Optional[ProtoModule] = None
+        self.stream: Optional[ProtoModule.Stream] = None
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
-        rest = self.stream.close()
+        if self.stream:
+            rest = self.stream.close()
+        else:
+            rest = b""
         if rest:
             log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
@@ -67,6 +108,20 @@ class Client:
                 self.imei,
             )
             return None
+        if self.stream is None:
+            for pmod in pmods:
+                if pmod.probe_buffer(segment):
+                    self.pmod = pmod
+                    self.stream = pmod.Stream()
+                    break
+        if self.stream is None:
+            log.info(
+                "unrecognizable %d bytes of data %s from fd %d",
+                len(segment),
+                segment[:32].hex(),
+                self.sock.fileno(),
+            )
+            return []
         when = time()
         msgs = []
         for elem in self.stream.recv(segment):
@@ -82,6 +137,7 @@ class Client:
         return msgs
 
     def send(self, buffer: bytes) -> None:
+        assert self.stream is not None
         try:
             self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
@@ -114,15 +170,18 @@ class Clients:
 
     def recv(
         self, fd: int
-    ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
+    ) -> Optional[
+        List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
+    ]:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
         if msgs is None:
             return None
         result = []
         for when, peeraddr, packet in msgs:
+            assert clnt.pmod is not None
             if clnt.imei is None:
-                imei = imei_from_packet(packet)
+                imei = clnt.pmod.imei_from_packet(packet)
                 if imei is not None:
                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
                     clnt.imei = imei
@@ -140,7 +199,7 @@ class Clients:
                         peeraddr,
                         packet,
                     )
-            result.append((clnt.imei, when, peeraddr, packet))
+            result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
             log.debug(
                 "Received from %s (IMEI %s): %s",
                 peeraddr,
@@ -149,14 +208,22 @@ class Clients:
             )
         return result
 
-    def response(self, resp: Resp) -> None:
+    def response(self, resp: Resp) -> Optional[ProtoModule]:
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.packet)
+            clnt = self.by_imei[resp.imei]
+            clnt.send(resp.packet)
+            return clnt.pmod
         else:
             log.info("Not connected (IMEI %s)", resp.imei)
+            return None
 
 
 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
+    global pmods
+    pmods = [
+        cast(ProtoModule, import_module("." + modnm, __package__))
+        for modnm in conf.get("collector", "protocols").split(",")
+    ]
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zpub = zctx.socket(zmq.PUB)  # type: ignore
@@ -199,8 +266,8 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                         log.debug("Terminal gone from fd %d", sk)
                         tostop.append(sk)
                     else:
-                        for imei, when, peeraddr, packet in received:
-                            proto = proto_of_message(packet)
+                        for pmod, imei, when, peeraddr, packet in received:
+                            proto = pmod.proto_of_message(packet)
                             zpub.send(
                                 Bcast(
                                     proto=proto,
@@ -210,14 +277,17 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                                     packet=packet,
                                 ).packed
                             )
-                            if is_goodbye_packet(packet) and handle_hibernate:
+                            if (
+                                pmod.is_goodbye_packet(packet)
+                                and handle_hibernate
+                            ):
                                 log.debug(
                                     "Goodbye from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(packet)
+                            respmsg = pmod.inline_response(packet)
                             if respmsg is not None:
                                 tosend.append(
                                     Resp(imei=imei, when=when, packet=respmsg)
@@ -226,17 +296,18 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                     log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for zmsg in tosend:
-                zpub.send(
-                    Bcast(
-                        is_incoming=False,
-                        proto=proto_of_message(zmsg.packet),
-                        when=zmsg.when,
-                        imei=zmsg.imei,
-                        packet=zmsg.packet,
-                    ).packed
-                )
                 log.debug("Sending to the client: %s", zmsg)
-                clients.response(zmsg)
+                rpmod = clients.response(zmsg)
+                if rpmod is not None:
+                    zpub.send(
+                        Bcast(
+                            is_incoming=False,
+                            proto=rpmod.proto_of_message(zmsg.packet),
+                            when=zmsg.when,
+                            imei=zmsg.imei,
+                            packet=zmsg.packet,
+                        ).packed
+                    )
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
index e597b2bbc564da0e24e37f39aa69b5e725b68e2b..e3776c3a529276fd333d619ea1946a8264f8337b 100755 (executable)
@@ -32,10 +32,11 @@ from typing import (
 )
 
 __all__ = (
-    "GPS303Conn",
+    "Stream",
     "class_by_prefix",
     "inline_response",
     "parse_message",
+    "probe_buffer",
     "proto_by_name",
     "DecodeError",
     "Respond",
@@ -84,7 +85,7 @@ __all__ = (
 MAXBUFFER: int = 4096
 
 
-class GPS303Conn:
+class Stream:
     def __init__(self) -> None:
         self.buffer = b""
 
@@ -900,6 +901,15 @@ def inline_response(packet: bytes) -> Optional[bytes]:
     return None
 
 
+def probe_buffer(buffer: bytes) -> bool:
+    framestart = buffer.find(b"xx")
+    if framestart < 0:
+        return False
+    if len(buffer) - framestart < 6:
+        return False
+    return True
+
+
 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
index 1672f46029c663eb79436d0add7bbc5ef9259227..ba4c12ade61f3bd19e42cbbdbd2646978377b363 100644 (file)
@@ -4,6 +4,7 @@ from configparser import ConfigParser, SectionProxy
 from contextlib import closing, ExitStack
 from http.server import HTTPServer, SimpleHTTPRequestHandler
 from importlib import import_module
+from logging import DEBUG, StreamHandler
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
@@ -16,7 +17,7 @@ from socket import (
     socket,
     SocketType,
 )
-from sys import exit
+from sys import exit, stderr
 from tempfile import mkstemp
 from time import sleep
 from typing import Optional
@@ -26,7 +27,9 @@ NUMPORTS = 3
 
 
 class TestWithServers(TestCase):
-    def setUp(self, *args: str, httpd: bool = False) -> None:
+    def setUp(
+        self, *args: str, httpd: bool = False, verbose: bool = False
+    ) -> None:
         freeports = []
         with ExitStack() as stack:
             for _ in range(NUMPORTS):
@@ -40,6 +43,7 @@ class TestWithServers(TestCase):
             "port": str(freeports[0]),
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
+            "protocols": "gps303proto",
         }
         self.conf["storage"] = {
             "dbfn": self.tmpfilebase + ".storage.sqlite",
@@ -61,6 +65,9 @@ class TestWithServers(TestCase):
             else:
                 kwargs = {}
             cls = import_module("gps303." + srvname, package=".")
+            if verbose:
+                cls.log.addHandler(StreamHandler(stderr))
+                cls.log.setLevel(DEBUG)
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
index 449aa514c8686baaaed232f33aef1f420a76052a..17477379edd6b91f8dd7b6c426d4ab04e777305b 100644 (file)
@@ -13,7 +13,9 @@ from gps303.ocid_dload import SCHEMA
 
 class Storage(TestWithServers):
     def setUp(self, *args: str, **kwargs: Any) -> None:
-        super().setUp("collector", "storage", "lookaside", "termconfig")
+        super().setUp(
+            "collector", "storage", "lookaside", "termconfig", verbose=True
+        )
         with connect(self.conf.get("opencellid", "dbfn")) as ldb:
             ldb.execute(SCHEMA)
             ldb.executemany(
@@ -184,8 +186,8 @@ class Storage(TestWithServers):
         super().tearDown()
 
     def test_storage(self) -> None:
-        for buf in (
-            LOGIN.In(imei="9999123456780000", ver=9).packed,
+        for msg in (
+            LOGIN.In(imei="9999123456780000", ver=9),
             WIFI_POSITIONING.In(
                 mnc=3,
                 mcc=262,
@@ -198,21 +200,22 @@ class Storage(TestWithServers):
                     (24420, 36243, -78),
                     (24420, 17012, -44),
                 ],
-            ).packed,
-            SETUP.In().packed,
-            STATUS.In(signal=87).packed,
-            HIBERNATION.In().packed,
+            ),
+            SETUP.In(),
+            STATUS.In(signal=87),
+            HIBERNATION.In(),
         ):
-            send_and_drain(self.sock, b"xx" + buf + b"\r\n")
-        self.sock.close()
+            print("Send:", msg)
+            send_and_drain(self.sock, b"xx" + msg.packed + b"\r\n")
         sleep(1)
+        self.sock.close()
         got = set()
         with connect(self.conf.get("storage", "dbfn")) as db:
             for is_incoming, packet in db.execute(
                 "select is_incoming, packet from events"
             ):
                 msg = parse_message(packet, is_incoming=is_incoming)
-                print(msg)
+                print("Stored:", msg)
                 got.add(type(msg))
         self.assertEqual(
             got,