]> average.org Git - loctrkd.git/commitdiff
wsgateway: switch to the use of cooked reports
authorEugene Crosser <crosser@average.org>
Tue, 2 Aug 2022 22:37:06 +0000 (00:37 +0200)
committerEugene Crosser <crosser@average.org>
Tue, 2 Aug 2022 22:37:06 +0000 (00:37 +0200)
loctrkd/evstore.py
loctrkd/wsgateway.py
loctrkd/zmsg.py

index b0265055dec85cd55785f626e6ae690aeb1599f5..85e8c9d25767de36fbf4e30444c9caa6f7bd6d5c 100644 (file)
@@ -1,8 +1,8 @@
 """ sqlite event store """
 
 from datetime import datetime
-from json import dumps
-from sqlite3 import connect, OperationalError
+from json import dumps, loads
+from sqlite3 import connect, OperationalError, Row
 from typing import Any, Dict, List, Tuple
 
 __all__ = "fetch", "initdb", "stow", "stowloc"
@@ -32,14 +32,9 @@ SCHEMA = (
 def initdb(dbname: str) -> None:
     global DB
     DB = connect(dbname)
-    try:
-        DB.execute(
-            """alter table events add column
-                is_incoming int not null default TRUE"""
-        )
-    except OperationalError:
-        for stmt in SCHEMA:
-            DB.execute(stmt)
+    DB.row_factory = Row
+    for stmt in SCHEMA:
+        DB.execute(stmt)
 
 
 def stow(**kwargs: Any) -> None:
@@ -91,23 +86,20 @@ def stowloc(**kwargs: Dict[str, Any]) -> None:
     DB.commit()
 
 
-def fetch(
-    imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, str, bytes]]:
-    # matchlist is a list of tuples (is_incoming, proto)
-    # returns a list of tuples (is_incoming, timestamp, packet)
+def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
     assert DB is not None
-    selector = " or ".join(
-        (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
-    )
     cur = DB.cursor()
     cur.execute(
-        f"""select is_incoming, tstamp, proto, packet from events
-                    where ({selector}) and imei = ?
-                    order by tstamp desc limit ?""",
-        tuple(item for sublist in matchlist for item in sublist)
-        + (imei, backlog),
+        """select imei, devtime, accuracy, latitude, longitude, remainder
+                    from reports where imei = ?
+                    order by devtime desc limit ?""",
+        (imei, backlog),
     )
-    result = list(cur)
+    result = []
+    for row in cur:
+        dic = dict(row)
+        remainder = loads(dic.pop("remainder"))
+        dic.update(remainder)
+        result.append(dic)
     cur.close()
     return list(reversed(result))
index b6d10e84798f89e7b36e322074f9afb115d50489..e568584054dd571e0a83c04039a0f2cbfeeaa828 100644 (file)
@@ -24,42 +24,20 @@ import zmq
 from . import common
 from .evstore import initdb, fetch
 from .protomodule import ProtoModule
-from .zmsg import Bcast, topic
+from .zmsg import Rept, rtopic
 
 log = getLogger("loctrkd/wsgateway")
 
-
 htmlfile = None
-pmods: List[ProtoModule] = []
-selector: List[Tuple[bool, str]] = []
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
-    for is_incoming, timestamp, proto, packet in fetch(
-        imei,
-        selector,
-        numback,
-    ):
-        for pmod in pmods:
-            if pmod.proto_handled(proto):
-                msg = pmod.parse_message(packet, is_incoming=is_incoming)
-        result.append(
-            {
-                "type": "location",
-                "imei": imei,
-                "timestamp": str(
-                    datetime.fromtimestamp(timestamp).astimezone(
-                        tz=timezone.utc
-                    )
-                ),
-                "longitude": msg.longitude,
-                "latitude": msg.latitude,
-                "accuracy": "gps"
-                if True  # TODO isinstance(msg, GPS_POSITIONING)
-                else "approximate",
-            }
-        )
+    for report in fetch(imei, numback):
+        report["type"] = "location"
+        timestamp = report.pop("devtime")
+        report["timestamp"] = timestamp
+        result.append(report)
     return result
 
 
@@ -258,21 +236,13 @@ class Clients:
 
 
 def runserver(conf: ConfigParser) -> None:
-    global htmlfile, pmods, selector
-    pmods = [
-        cast(ProtoModule, import_module("." + modnm, __package__))
-        for modnm in conf.get("common", "protocols").split(",")
-    ]
-    for pmod in pmods:
-        for proto, is_incoming in pmod.exposed_protos():
-            if proto != "ZX:STATUS":  # TODO make it better
-                selector.append((is_incoming, proto))
+    global htmlfile
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
+    zsub.connect(conf.get("rectifier", "publishurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -288,18 +258,10 @@ def runserver(conf: ConfigParser) -> None:
         towait: Set[int] = set()
         while True:
             neededsubs = clients.subs()
-            for pmod in pmods:
-                for proto, is_incoming in pmod.exposed_protos():
-                    for imei in neededsubs - activesubs:
-                        zsub.setsockopt(
-                            zmq.SUBSCRIBE,
-                            topic(proto, is_incoming, imei),
-                        )
-                    for imei in activesubs - neededsubs:
-                        zsub.setsockopt(
-                            zmq.UNSUBSCRIBE,
-                            topic(proto, is_incoming, imei),
-                        )
+            for imei in neededsubs - activesubs:
+                zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
+            for imei in activesubs - neededsubs:
+                zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
@@ -311,43 +273,11 @@ def runserver(conf: ConfigParser) -> None:
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            for pmod in pmods:
-                                if pmod.proto_handled(zmsg.proto):
-                                    msg = pmod.parse_message(
-                                        zmsg.packet, zmsg.is_incoming
-                                    )
-                            log.debug("Got %s with %s", zmsg, msg)
-                            if zmsg.proto == "ZX:STATUS":
-                                tosend.append(
-                                    {
-                                        "type": "status",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "battery": msg.batt,
-                                    }
-                                )
-                            else:
-                                tosend.append(
-                                    {
-                                        "type": "location",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "longitude": msg.longitude,
-                                        "latitude": msg.latitude,
-                                        "accuracy": "gps"
-                                        if zmsg.is_incoming
-                                        else "approximate",
-                                    }
-                                )
+                            zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+                            msg = loads(zmsg.payload)
+                            msg["imei"] = zmsg.imei
+                            log.debug("Got %s, sending %s", zmsg, msg)
+                            tosend.append(msg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
index 4da88d2f6754e7e78db97ef8ecc9e989dcd1475c..da0dc77e39da6f38d7eaefe5adbdfe897f928b28 100644 (file)
@@ -4,7 +4,7 @@ import ipaddress as ip
 from struct import pack, unpack
 from typing import Any, cast, Optional, Tuple, Type, Union
 
-__all__ = "Bcast", "Resp", "topic"
+__all__ = "Bcast", "Resp", "topic", "rtopic"
 
 
 def pack_peer(  # 18 bytes
@@ -100,6 +100,10 @@ def topic(
     )
 
 
+def rtopic(imei: str) -> bytes:
+    return pack("16s", imei.encode())
+
+
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""