]> average.org Git - loctrkd.git/commitdiff
full encoder/decoder for zmq messages
authorEugene Crosser <crosser@average.org>
Wed, 20 Apr 2022 21:11:25 +0000 (23:11 +0200)
committerEugene Crosser <crosser@average.org>
Wed, 20 Apr 2022 21:11:25 +0000 (23:11 +0200)
gps303/collector.py
gps303/storage.py
gps303/zmsg.py [new file with mode: 0644]

index 385285494d7f0782bfc3c8ab20eff1ece22bbf7e..4cb7956d2809c4dfd57782d7a15b9bf57abfdbeb 100644 (file)
@@ -14,33 +14,11 @@ from .gps303proto import (
     parse_message,
     proto_of_message,
 )
+from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 
 
-class Bcast:
-    """Zmq message to broadcast what was received from the terminal"""
-
-    def __init__(self, imei, msg):
-        self.as_bytes = (
-            pack("B", proto_of_message(msg))
-            + ("0000000000000000" if imei is None else imei).encode()
-            + msg
-        )
-
-
-class Resp:
-    """Zmq message received from a third party to send to the terminal"""
-
-    def __init__(self, *args, **kwargs):
-        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
-            self.imei = msg[:16].decode()
-            self.payload = msg[16:]
-        elif len(args) == 0:
-            self.imei = kwargs["imei"]
-            self.payload = kwargs["payload"]
-
-
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
@@ -101,7 +79,7 @@ class Client:
                 log.info(
                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
                 )
-            msgs.append(packet)
+            msgs.append((when, self.addr, packet))
         return msgs
 
     def send(self, buffer):
@@ -140,15 +118,15 @@ class Clients:
         if msgs is None:
             return None
         result = []
-        for msg in msgs:
-            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
+        for when, peeraddr, packet in msgs:
+            if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
                 self.by_imei[clnt.imei] = clnt
-            result.append((clnt.imei, msg))
+            result.append((clnt.imei, when, peeraddr, packet))
         return result
 
     def response(self, resp):
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.payload)
+            self.by_imei[resp.imei].send(resp.packet)
 
 
 def runserver(conf):
@@ -191,19 +169,28 @@ def runserver(conf):
                         )
                         tostop.append(sk)
                     else:
-                        for imei, msg in received:
-                            zpub.send(Bcast(imei, msg).as_bytes)
-                            if proto_of_message(msg) == HIBERNATION.PROTO:
+                        for imei, when, peeraddr, packet in received:
+                            proto = proto_of_message(packet)
+                            zpub.send(
+                                Bcast(
+                                    proto=proto,
+                                    imei=imei,
+                                    when=when,
+                                    peeraddr=peeraddr,
+                                    packet=packet,
+                                ).packed
+                            )
+                            if proto == HIBERNATION.PROTO:
                                 log.debug(
                                     "HIBERNATION from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(msg)
+                            respmsg = inline_response(packet)
                             if respmsg is not None:
                                 clients.response(
-                                    Resp(imei=imei, payload=respmsg)
+                                    Resp(imei=imei, packet=respmsg)
                                 )
             # poll queue consumed, make changes now
             for fd in tostop:
index 9da1d6d7d1a475d5f0e2fbd7b4f83af26c83f363..6047c39bfc9686fd0a0643564a10007792209b4e 100644 (file)
@@ -1,5 +1,6 @@
 """ Store zmq broadcasts to sqlite """
 
+from datetime import datetime, timezone
 from getopt import getopt
 from logging import getLogger
 from logging.handlers import SysLogHandler
@@ -10,9 +11,11 @@ import zmq
 from . import common
 from .evstore import initdb, stow
 from .gps303proto import parse_message
+from .zmsg import Bcast
 
 log = getLogger("gps303/storage")
 
+
 def runserver(conf):
     dbname = conf.get("storage", "dbfn")
     log.info('Using Sqlite3 database "%s"', dbname)
@@ -24,12 +27,17 @@ def runserver(conf):
 
     try:
         while True:
-            zmsg = zsub.recv()
-            imei = zmsg[1:17].decode()
-            packet = zmsg[17:]
-            msg = parse_message(packet)
-            log.debug("From IMEI %s: %s", imei, msg)
-            stow("", time(), imei, msg.length, msg.PROTO, msg.payload)
+            zmsg = Bcast(zsub.recv())
+            msg = parse_message(zmsg.packet)
+            log.debug("IMEI %s from %s at %s: %s", zmsg.imei, zmsg.peeraddr, datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), msg)
+            stow(
+                zmsg.peeraddr,
+                zmsg.when,
+                zmsg.imei,
+                msg.length,
+                msg.PROTO,
+                msg.payload,
+            )
     except KeyboardInterrupt:
         pass
 
diff --git a/gps303/zmsg.py b/gps303/zmsg.py
new file mode 100644 (file)
index 0000000..cdd23fc
--- /dev/null
@@ -0,0 +1,97 @@
+""" Zeromq messages """
+
+import ipaddress as ip
+from struct import pack, unpack
+
+__all__ = "Bcast", "Resp"
+
+def pack_peer(peeraddr):
+    saddr, port = peeraddr
+    addr = ip.ip_address(saddr)
+    return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
+
+def unpack_peer(buffer):
+    version = buffer[0]
+    if version not in (4, 6):
+        return None
+    if version == 4:
+        addr = ip.IPv4Address(buffer[1:5])
+    else:
+        addr = ip.IPv6Address(buffer[1:17])
+    port = unpack("!H", buffer[17:19])[0]
+    return (addr, port)
+
+
+class _Zmsg:
+    def __init__(self, *args, **kwargs):
+        if len(args) == 1:
+            self.decode(args[0])
+        elif bool(kwargs):
+            for k, v in self.KWARGS:
+                setattr(self, k, kwargs.get(k, v))
+        else:
+            raise RuntimeError(
+                self.__class__.__name__
+                + ": both args "
+                + str(args)
+                + " and kwargs "
+                + str(kwargs)
+            )
+
+    def decode(self, buffer):
+        raise RuntimeError(
+            self.__class__.__name__ + "must implement `encode()` method"
+        )
+
+    @property
+    def packed(self):
+        raise RuntimeError(
+            self.__class__.__name__ + "must implement `encode()` method"
+        )
+
+
+class Bcast(_Zmsg):
+    """Zmq message to broadcast what was received from the terminal"""
+
+    KWARGS = (
+        ("proto", 256),
+        ("imei", None),
+        ("when", None),
+        ("peeraddr", None),
+        ("packet", b""),
+    )
+
+    @property
+    def packed(self):
+        return (
+            pack("B", self.proto)
+            + ("0000000000000000" if self.imei is None else self.imei).encode()
+            + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
+            + pack_peer(self.peeraddr)
+            + self.packet
+        )
+
+    def decode(self, buffer):
+        self.proto = buffer[0]
+        self.imei = buffer[1:17].decode()
+        if self.imei == "0000000000000000":
+            self.imei = None
+        self.when = unpack("!d", buffer[17:25])[0]
+        self.peeraddr = unpack_peer(buffer[25:44])
+        self.packet = buffer[44:]
+
+
+class Resp(_Zmsg):
+    """Zmq message received from a third party to send to the terminal"""
+
+    KWARGS = (("imei", None), ("packet", b""))
+
+    @property
+    def packed(self):
+        return (
+            "0000000000000000" if self.imei is None else self.imei.encode()
+        ) + self.packet
+
+    def decode(self, buffer):
+        self.imei = buffer[:16].decode()
+        self.packet = buffer[16:]