]> average.org Git - loctrkd.git/commitdiff
WIP converting wsgateway to multiprotocols
authorEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 22:04:10 +0000 (00:04 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 22:04:10 +0000 (00:04 +0200)
loctrkd/beesure.py
loctrkd/evstore.py
loctrkd/wsgateway.py
loctrkd/zx303proto.py

index 369abc2db39bdd3be3ebaa15347f94dddee333d2..9346b55979eaf49540172a2b61810d1812084031 100755 (executable)
@@ -26,6 +26,7 @@ __all__ = (
     "Stream",
     "class_by_prefix",
     "enframe",
+    "exposed_protos",
     "inline_response",
     "proto_handled",
     "parse_message",
@@ -565,3 +566,10 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> BeeSurePkt:
     retobj.proto = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (proto_name(UD), True),
+        (proto_name(UD2), False),
+    ]
index 07b6dc4b760b7de08349611e844168ab0e80470a..da34cb9fdd1bfca083a44f03a9b4ecc738810fab 100644 (file)
@@ -56,7 +56,7 @@ def stow(**kwargs: Any) -> None:
 
 def fetch(
     imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, bytes]]:
+) -> List[Tuple[bool, float, str, bytes]]:
     # matchlist is a list of tuples (is_incoming, proto)
     # returns a list of tuples (is_incoming, timestamp, packet)
     assert DB is not None
@@ -65,7 +65,7 @@ def fetch(
     )
     cur = DB.cursor()
     cur.execute(
-        f"""select is_incoming, tstamp, packet from events
+        f"""select is_incoming, tstamp, proto, packet from events
                     where ({selector}) and imei = ?
                     order by tstamp desc limit ?""",
         tuple(item for sublist in matchlist for item in sublist)
index 522cd59269d8380596c0befb7f95dffc058fffd9..8f2c6484b2fea8515909be63e22c45d21b3abd8e 100644 (file)
@@ -2,6 +2,7 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from importlib import import_module
 from json import dumps, loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
@@ -22,30 +23,40 @@ import zmq
 
 from . import common
 from .evstore import initdb, fetch
-from .zx303proto import (
-    GPS_POSITIONING,
-    STATUS,
-    WIFI_POSITIONING,
-    parse_message,
-    proto_name,
-)
 from .zmsg import Bcast, topic
 
 log = getLogger("loctrkd/wsgateway")
+
+
+class ProtoModule:
+    @staticmethod
+    def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
+        ...
+
+    @staticmethod
+    def exposed_protos() -> List[Tuple[str, bool]]:
+        ...
+
+    @staticmethod
+    def proto_handled(proto: str) -> bool:
+        ...
+
+
 htmlfile = None
+pmods: List[ProtoModule] = []
+selector: List[Tuple[bool, str]] = []
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
-    for is_incoming, timestamp, packet in fetch(
+    for is_incoming, timestamp, proto, packet in fetch(
         imei,
-        [
-            (True, proto_name(GPS_POSITIONING)),
-            (False, proto_name(WIFI_POSITIONING)),
-        ],
+        selector,
         numback,
     ):
-        msg = parse_message(packet, is_incoming=is_incoming)
+        for pmod in pmods:
+            if pmod.proto_handled(proto):
+                msg = pmod.parse_message(packet, is_incoming=is_incoming)
         result.append(
             {
                 "type": "location",
@@ -58,7 +69,7 @@ def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
                 "longitude": msg.longitude,
                 "latitude": msg.latitude,
                 "accuracy": "gps"
-                if isinstance(msg, GPS_POSITIONING)
+                if True  # TODO isinstance(msg, GPS_POSITIONING)
                 else "approximate",
             }
         )
@@ -260,8 +271,15 @@ class Clients:
 
 
 def runserver(conf: ConfigParser) -> None:
-    global htmlfile
-
+    global htmlfile, pmods, selector
+    pmods = [
+        cast(ProtoModule, import_module("." + modnm, __package__))
+        for modnm in conf.get("collector", "protocols").split(",")
+    ]
+    for pmod in pmods:
+        for proto, is_incoming in pmod.exposed_protos():
+            if proto != "ZX:STATUS":  # TODO make it better
+                selector.append((is_incoming, proto))
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
@@ -283,32 +301,18 @@ def runserver(conf: ConfigParser) -> None:
         towait: Set[int] = set()
         while True:
             neededsubs = clients.subs()
-            for imei in neededsubs - activesubs:
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
-            for imei in activesubs - neededsubs:
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
+            for pmod in pmods:
+                for proto, is_incoming in pmod.exposed_protos():
+                    for imei in neededsubs - activesubs:
+                        zsub.setsockopt(
+                            zmq.SUBSCRIBE,
+                            topic(proto, is_incoming, imei),
+                        )
+                    for imei in activesubs - neededsubs:
+                        zsub.setsockopt(
+                            zmq.UNSUBSCRIBE,
+                            topic(proto, is_incoming, imei),
+                        )
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
@@ -321,9 +325,13 @@ def runserver(conf: ConfigParser) -> None:
                     while True:
                         try:
                             zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            msg = parse_message(zmsg.packet, zmsg.is_incoming)
+                            for pmod in pmods:
+                                if pmod.proto_handled(zmsg.proto):
+                                    msg = pmod.parse_message(
+                                        zmsg.packet, zmsg.is_incoming
+                                    )
                             log.debug("Got %s with %s", zmsg, msg)
-                            if isinstance(msg, STATUS):
+                            if zmsg.proto == "ZX:STATUS":
                                 tosend.append(
                                     {
                                         "type": "status",
index 4d5730304fbead5492db2c1b08f5bf86381a0618..bd19e108c770221e3a0132dfc7c863e23181622f 100755 (executable)
@@ -34,6 +34,8 @@ from typing import (
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
     "proto_handled",
     "parse_message",
@@ -946,3 +948,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (proto_name(GPS_POSITIONING), True),
+        (proto_name(WIFI_POSITIONING), False),
+        (proto_name(STATUS), True),
+    ]