]> average.org Git - loctrkd.git/commitdiff
Multiprotocol support in zmq messages and storage
authorEugene Crosser <crosser@average.org>
Thu, 7 Jul 2022 21:28:38 +0000 (23:28 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 20:38:12 +0000 (22:38 +0200)
debian/gps303.conf
gps303/collector.py
gps303/evstore.py
gps303/googlemaps.py
gps303/gps303proto.py
gps303/lookaside.py
gps303/mkgpx.py
gps303/opencellid.py
gps303/termconfig.py
gps303/wsgateway.py
gps303/zmsg.py

index 8bfff8e140f28faa3b271475585a116fb6cc39e4..88bf74a8ebb2d2e4ae6ec9cbdd3076631cd8be3b 100644 (file)
@@ -12,7 +12,7 @@ port = 5049
 htmlfile = /var/lib/gps303/index.html
 
 [storage]
-dbfn = /var/lib/gps303/gps303.sqlite
+dbfn = /var/lib/gps303/trkloc.sqlite
 
 [lookaside]
 # "opencellid" and "googlemaps" can be here. Both require an access token,
index 4fc946c3dcb7795a6ff414411d5b44d757b97d4f..cb45d8144bb9ec132265f8bed3d18881b6dab6de 100644 (file)
@@ -58,7 +58,7 @@ class ProtoModule:
         ...
 
     @staticmethod
-    def proto_of_message(packet: bytes) -> int:
+    def proto_of_message(packet: bytes) -> str:
         ...
 
     @staticmethod
index 2b23e8e92ef98e4c5ff21a584be786df179859ad..07b6dc4b760b7de08349611e844168ab0e80470a 100644 (file)
@@ -12,7 +12,7 @@ SCHEMA = """create table if not exists events (
     imei text,
     peeraddr text not null,
     is_incoming int not null default TRUE,
-    proto int not null,
+    proto text not null,
     packet blob
 )"""
 
@@ -38,7 +38,7 @@ def stow(**kwargs: Any) -> None:
             ("peeraddr", None),
             ("when", 0.0),
             ("imei", None),
-            ("proto", -1),
+            ("proto", "UNKNOWN"),
             ("packet", b""),
         )
     }
@@ -55,7 +55,7 @@ def stow(**kwargs: Any) -> None:
 
 
 def fetch(
-    imei: str, matchlist: List[Tuple[bool, int]], backlog: int
+    imei: str, matchlist: List[Tuple[bool, str]], backlog: int
 ) -> List[Tuple[bool, float, bytes]]:
     # matchlist is a list of tuples (is_incoming, proto)
     # returns a list of tuples (is_incoming, timestamp, packet)
index d4deea2438dcae736ba2685b62d8f1e13c53b965..dac607959b6e102579dc66030275aec8e8e4f0f9 100644 (file)
@@ -58,7 +58,7 @@ if __name__.endswith("__main__"):
     c.execute(
         """select tstamp, packet from events
             where proto in (?, ?)""",
-        (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
+        (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
     )
     init({"googlemaps": {"accesstoken": sys.argv[2]}})
     count = 0
index e3776c3a529276fd333d619ea1946a8264f8337b..efb02d249f6c118185ce2a66b23675c9acf85efd 100755 (executable)
@@ -38,6 +38,7 @@ __all__ = (
     "parse_message",
     "probe_buffer",
     "proto_by_name",
+    "proto_name",
     "DecodeError",
     "Respond",
     "GPS303Pkt",
@@ -80,6 +81,8 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
+PROTO_PREFIX = "ZX"
+
 ### Deframer ###
 
 MAXBUFFER: int = 4096
@@ -872,16 +875,28 @@ def class_by_prefix(
     return CLASSES[proto]
 
 
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+    return (
+        PROTO_PREFIX
+        + ":"
+        + (
+            obj.__class__.__name__
+            if isinstance(obj, GPS303Pkt)
+            else obj.__name__
+        )
+    ).ljust(16, "\0")[:16]
+
+
 def proto_by_name(name: str) -> int:
     return PROTOS.get(name, -1)
 
 
-def proto_of_message(packet: bytes) -> int:
-    return packet[1]
+def proto_of_message(packet: bytes) -> str:
+    return proto_name(CLASSES.get(packet[1], UNKNOWN))
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
-    if proto_of_message(packet) == LOGIN.PROTO:
+    if packet[1] == LOGIN.PROTO:
         msg = parse_message(packet)
         if isinstance(msg, LOGIN):
             return msg.imei
@@ -889,11 +904,11 @@ def imei_from_packet(packet: bytes) -> Optional[str]:
 
 
 def is_goodbye_packet(packet: bytes) -> bool:
-    return proto_of_message(packet) == HIBERNATION.PROTO
+    return packet[1] == HIBERNATION.PROTO
 
 
 def inline_response(packet: bytes) -> Optional[bytes]:
-    proto = proto_of_message(packet)
+    proto = packet[1]
     if proto in CLASSES:
         cls = CLASSES[proto]
         if cls.RESPOND is Respond.INL:
index 1d214896b81fc5e3c73db89fa6a68c9a10412d88..0c1e4cdcf19e8ca0b22c8b73615bfa178939b313 100644 (file)
@@ -9,7 +9,7 @@ from struct import pack
 import zmq
 
 from . import common
-from .gps303proto import parse_message, WIFI_POSITIONING
+from .gps303proto import parse_message, proto_name, WIFI_POSITIONING
 from .zmsg import Bcast, Resp, topic
 
 log = getLogger("gps303/lookaside")
@@ -22,7 +22,7 @@ def runserver(conf: ConfigParser) -> None:
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
     zsub.connect(conf.get("collector", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, topic(WIFI_POSITIONING.PROTO))
+    zsub.setsockopt(zmq.SUBSCRIBE, topic(proto_name(WIFI_POSITIONING)))
     zpush = zctx.socket(zmq.PUSH)  # type: ignore
     zpush.connect(conf.get("collector", "listenurl"))
 
index ef63e04bbc17e3054ab7bc235db5e8103f2e5eb9..456b94f58c4e9d83f58f7b98569112f72c3c8e92 100644 (file)
@@ -18,7 +18,7 @@ c.execute(
        and ((is_incoming = false and proto = ?) 
          or (is_incoming = true and proto = ?))
        order by tstamp""",
-    (sys.argv[2], WIFI_POSITIONING.PROTO, GPS_POSITIONING.PROTO),
+    (sys.argv[2], proto_name(WIFI_POSITIONING), proto_name(GPS_POSITIONING)),
 )
 
 print(
index 90a8a74bc78d7c426bb75e9a92d79b37a8babb1a..7b6e41306cf3973471d610403d693084f5aa0917 100644 (file)
@@ -64,7 +64,7 @@ if __name__.endswith("__main__"):
     c.execute(
         """select tstamp, packet from events
             where proto in (?, ?)""",
-        (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
+        (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
     )
     init({"opencellid": {"dbfn": sys.argv[2]}})
     for timestamp, packet in c:
index 723e8f62766ef542dbdb337ae015c66203ff1b9c..6ec43fdc1c25f5063096121b459008d6fc3c5511 100644 (file)
@@ -19,9 +19,9 @@ def runserver(conf: ConfigParser) -> None:
     zsub = zctx.socket(zmq.SUB)  # type: ignore
     zsub.connect(conf.get("collector", "publishurl"))
     for proto in (
-        STATUS.PROTO,
-        SETUP.PROTO,
-        POSITION_UPLOAD_INTERVAL.PROTO,
+        proto_name(STATUS),
+        proto_name(SETUP),
+        proto_name(POSITION_UPLOAD_INTERVAL),
     ):
         zsub.setsockopt(zmq.SUBSCRIBE, topic(proto))
     zpush = zctx.socket(zmq.PUSH)  # type: ignore
index 381a8558a4218f9d56489425ece81d4c4406e6d8..80fdac750a63c8f1950b44dfc7d85d45e28cbfa1 100644 (file)
@@ -27,6 +27,7 @@ from .gps303proto import (
     STATUS,
     WIFI_POSITIONING,
     parse_message,
+    proto_name,
 )
 from .zmsg import Bcast, topic
 
@@ -38,7 +39,10 @@ def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
     for is_incoming, timestamp, packet in fetch(
         imei,
-        [(True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)],
+        [
+            (True, proto_name(GPS_POSITIONING)),
+            (False, proto_name(WIFI_POSITIONING)),
+        ],
         numback,
     ):
         msg = parse_message(packet, is_incoming=is_incoming)
@@ -282,28 +286,28 @@ def runserver(conf: ConfigParser) -> None:
             for imei in neededsubs - activesubs:
                 zsub.setsockopt(
                     zmq.SUBSCRIBE,
-                    topic(GPS_POSITIONING.PROTO, True, imei),
+                    topic(proto_name(GPS_POSITIONING), True, imei),
                 )
                 zsub.setsockopt(
                     zmq.SUBSCRIBE,
-                    topic(WIFI_POSITIONING.PROTO, False, imei),
+                    topic(proto_name(WIFI_POSITIONING), False, imei),
                 )
                 zsub.setsockopt(
                     zmq.SUBSCRIBE,
-                    topic(STATUS.PROTO, True, imei),
+                    topic(proto_name(STATUS), True, imei),
                 )
             for imei in activesubs - neededsubs:
                 zsub.setsockopt(
                     zmq.UNSUBSCRIBE,
-                    topic(GPS_POSITIONING.PROTO, True, imei),
+                    topic(proto_name(GPS_POSITIONING), True, imei),
                 )
                 zsub.setsockopt(
                     zmq.UNSUBSCRIBE,
-                    topic(WIFI_POSITIONING.PROTO, False, imei),
+                    topic(proto_name(WIFI_POSITIONING), False, imei),
                 )
                 zsub.setsockopt(
                     zmq.UNSUBSCRIBE,
-                    topic(STATUS.PROTO, True, imei),
+                    topic(proto_name(STATUS), True, imei),
                 )
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
index 2d497c0cd5a98997bb3aa89c101a39fb29213c54..b6faa7025a1df310422e6e51603878d9c01254b1 100644 (file)
@@ -7,7 +7,7 @@ from typing import Any, cast, Optional, Tuple, Type, Union
 __all__ = "Bcast", "Resp", "topic"
 
 
-def pack_peer(
+def pack_peer(  # 18 bytes
     peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
 ) -> bytes:
     if peeraddr is None:
@@ -93,9 +93,9 @@ class _Zmsg:
 
 
 def topic(
-    proto: int, is_incoming: bool = True, imei: Optional[str] = None
+    proto: str, is_incoming: bool = True, imei: Optional[str] = None
 ) -> bytes:
-    return pack("BB", is_incoming, proto) + (
+    return pack("B16s", is_incoming, proto.encode()) + (
         b"" if imei is None else pack("16s", imei.encode())
     )
 
@@ -105,7 +105,7 @@ class Bcast(_Zmsg):
 
     KWARGS = (
         ("is_incoming", True),
-        ("proto", 256),
+        ("proto", "UNKNOWN"),
         ("imei", None),
         ("when", None),
         ("peeraddr", None),
@@ -116,31 +116,28 @@ class Bcast(_Zmsg):
     def packed(self) -> bytes:
         return (
             pack(
-                "BB16s",
+                "!B16s16sd",
                 int(self.is_incoming),
-                self.proto,
+                self.proto[:16].ljust(16, "\0").encode(),
                 b"0000000000000000"
                 if self.imei is None
                 else self.imei.encode(),
-            )
-            + (
-                b"\0\0\0\0\0\0\0\0"
-                if self.when is None
-                else pack("!d", self.when)
+                0 if self.when is None else self.when,
             )
             + pack_peer(self.peeraddr)
             + self.packet
         )
 
     def decode(self, buffer: bytes) -> None:
-        self.is_incoming = bool(buffer[0])
-        self.proto = buffer[1]
-        self.imei: Optional[str] = buffer[2:18].decode()
-        if self.imei == "0000000000000000":
-            self.imei = None
-        self.when = unpack("!d", buffer[18:26])[0]
-        self.peeraddr = unpack_peer(buffer[26:44])
-        self.packet = buffer[44:]
+        is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
+        self.is_incoming = bool(is_incoming)
+        self.proto = proto.decode()
+        self.imei = (
+            None if imei == b"0000000000000000" else imei.decode().strip("\0")
+        )
+        self.when = when
+        self.peeraddr = unpack_peer(buffer[41:59])
+        self.packet = buffer[59:]
 
 
 class Resp(_Zmsg):
@@ -152,20 +149,20 @@ class Resp(_Zmsg):
     def packed(self) -> bytes:
         return (
             pack(
-                "16s",
+                "!16sd",
                 "0000000000000000"
                 if self.imei is None
                 else self.imei.encode(),
-            )
-            + (
-                b"\0\0\0\0\0\0\0\0"
-                if self.when is None
-                else pack("!d", self.when)
+                0 if self.when is None else self.when,
             )
             + self.packet
         )
 
     def decode(self, buffer: bytes) -> None:
-        self.imei = buffer[:16].decode()
-        self.when = unpack("!d", buffer[16:24])[0]
+        imei, when = unpack("!16sd", buffer[:24])
+        self.imei = (
+            None if imei == b"0000000000000000" else imei.decode().strip("\0")
+        )
+
+        self.when = when
         self.packet = buffer[24:]