]> average.org Git - loctrkd.git/commitdiff
storage: save both raw and rectified reports
authorEugene Crosser <crosser@average.org>
Fri, 29 Jul 2022 22:42:17 +0000 (00:42 +0200)
committerEugene Crosser <crosser@average.org>
Fri, 29 Jul 2022 22:50:43 +0000 (00:50 +0200)
debian/loctrkd.conf
loctrkd/common.py
loctrkd/evstore.py
loctrkd/rectifier.py
loctrkd/storage.py
loctrkd/zx303proto.py
test/common.py

index a28104335b9468519538643ddff578d2fba8a04f..5bc733cfde5b3f04d1d66d13fcae29573af2052e 100644 (file)
@@ -15,6 +15,8 @@ htmlfile = /var/lib/loctrkd/index.html
 
 [storage]
 dbfn = /var/lib/loctrkd/trkloc.sqlite
+# store raw events from the collector. Rectified reports are always stored.
+events = yes
 
 [rectifier]
 # "opencellid" and "googlemaps" can be here. Both require an access token,
index 81f8bb99c914644e55ba3274a4e756cb178fae49..941a93e008dbf793cad604d0df1fd86a81809b63 100644 (file)
@@ -106,11 +106,11 @@ class CoordReport(Report):
         self,
         *,
         devtime: str,
-        battery_percentage: int,
-        accuracy: float,
-        altitude: float,
-        speed: float,
-        direction: float,
+        battery_percentage: Optional[int],
+        accuracy: Optional[float],
+        altitude: Optional[float],
+        speed: Optional[float],
+        direction: Optional[float],
         latitude: float,
         longitude: float,
     ) -> None:
@@ -131,7 +131,7 @@ class HintReport(Report):
         self,
         *,
         devtime: str,
-        battery_percentage: int,
+        battery_percentage: Optional[int],
         mcc: int,
         mnc: int,
         gsm_cells: List[Tuple[int, int, int]],
index da34cb9fdd1bfca083a44f03a9b4ecc738810fab..b0265055dec85cd55785f626e6ae690aeb1599f5 100644 (file)
@@ -1,20 +1,32 @@
 """ sqlite event store """
 
+from datetime import datetime
+from json import dumps
 from sqlite3 import connect, OperationalError
-from typing import Any, List, Tuple
+from typing import Any, Dict, List, Tuple
 
-__all__ = "fetch", "initdb", "stow"
+__all__ = "fetch", "initdb", "stow", "stowloc"
 
 DB = None
 
-SCHEMA = """create table if not exists events (
+SCHEMA = (
+    """create table if not exists events (
     tstamp real not null,
     imei text,
     peeraddr text not null,
     is_incoming int not null default TRUE,
     proto text not null,
     packet blob
-)"""
+)""",
+    """create table if not exists reports (
+    imei text,
+    devtime text not null,
+    accuracy real,
+    latitude real,
+    longitude real,
+    remainder text
+)""",
+)
 
 
 def initdb(dbname: str) -> None:
@@ -26,7 +38,8 @@ def initdb(dbname: str) -> None:
                 is_incoming int not null default TRUE"""
         )
     except OperationalError:
-        DB.execute(SCHEMA)
+        for stmt in SCHEMA:
+            DB.execute(stmt)
 
 
 def stow(**kwargs: Any) -> None:
@@ -54,6 +67,30 @@ def stow(**kwargs: Any) -> None:
     DB.commit()
 
 
+def stowloc(**kwargs: Dict[str, Any]) -> None:
+    assert DB is not None
+    parms = {
+        k: kwargs.pop(k) if k in kwargs else v
+        for k, v in (
+            ("imei", None),
+            ("devtime", str(datetime.now())),
+            ("accuracy", None),
+            ("latitude", None),
+            ("longitude", None),
+        )
+    }
+    parms["remainder"] = dumps(kwargs)
+    DB.execute(
+        """insert or ignore into reports
+                (imei, devtime, accuracy, latitude, longitude, remainder)
+                values
+                (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
+        """,
+        parms,
+    )
+    DB.commit()
+
+
 def fetch(
     imei: str, matchlist: List[Tuple[bool, str]], backlog: int
 ) -> List[Tuple[bool, float, str, bytes]]:
index 1da57528d733a52df0be0955f18d7d29ce86d49b..e73aeaed3188ca5433d9abfadbe2ed7b69052d22 100644 (file)
@@ -92,10 +92,10 @@ def runserver(conf: ConfigParser) -> None:
                     rept = CoordReport(
                         devtime=rect.devtime,
                         battery_percentage=rect.battery_percentage,
-                        accuracy=-1,
-                        altitude=-1,
-                        speed=-1,
-                        direction=-1,
+                        accuracy=None,
+                        altitude=None,
+                        speed=None,
+                        direction=None,
                         latitude=lat,
                         longitude=lon,
                     )
index 128a4573cbacfe550d30b5c6f757a9de56b0feeb..5ea3b2c823c15618ed2845e6633b384fd2f93289 100644 (file)
@@ -2,12 +2,13 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from json import loads
 from logging import getLogger
 import zmq
 
 from . import common
-from .evstore import initdb, stow
-from .zmsg import Bcast
+from .evstore import initdb, stow, stowloc
+from .zmsg import Bcast, Rept
 
 log = getLogger("loctrkd/storage")
 
@@ -18,31 +19,62 @@ def runserver(conf: ConfigParser) -> None:
     initdb(dbname)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
-    zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+    zraw = zctx.socket(zmq.SUB)  # type: ignore
+    zraw.connect(conf.get("collector", "publishurl"))
+    if conf.getboolean("storage", "events", fallback=False):
+        zraw.setsockopt(zmq.SUBSCRIBE, b"")
+    zrep = zctx.socket(zmq.SUB)  # type: ignore
+    zrep.connect(conf.get("rectifier", "publishurl"))
+    zrep.setsockopt(zmq.SUBSCRIBE, b"")
+    poller = zmq.Poller()  # type: ignore
+    poller.register(zraw, flags=zmq.POLLIN)
+    poller.register(zrep, flags=zmq.POLLIN)
 
     try:
         while True:
-            zmsg = Bcast(zsub.recv())
-            log.debug(
-                "%s IMEI %s from %s at %s: %s",
-                "I" if zmsg.is_incoming else "O",
-                zmsg.imei,
-                zmsg.peeraddr,
-                datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
-                zmsg.packet.hex(),
-            )
-            stow(
-                is_incoming=zmsg.is_incoming,
-                peeraddr=str(zmsg.peeraddr),
-                when=zmsg.when,
-                imei=zmsg.imei,
-                proto=zmsg.proto,
-                packet=zmsg.packet,
-            )
+            events = poller.poll(1000)
+            for sk, fl in events:
+                if sk is zraw:
+                    while True:
+                        try:
+                            zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        log.debug(
+                            "%s IMEI %s from %s at %s: %s",
+                            "I" if zmsg.is_incoming else "O",
+                            zmsg.imei,
+                            zmsg.peeraddr,
+                            datetime.fromtimestamp(zmsg.when).astimezone(
+                                tz=timezone.utc
+                            ),
+                            zmsg.packet.hex(),
+                        )
+                        stow(
+                            is_incoming=zmsg.is_incoming,
+                            peeraddr=str(zmsg.peeraddr),
+                            when=zmsg.when,
+                            imei=zmsg.imei,
+                            proto=zmsg.proto,
+                            packet=zmsg.packet,
+                        )
+                elif sk is zrep:
+                    while True:
+                        try:
+                            rept = Rept(zrep.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        data = loads(rept.payload)
+                        log.debug("R IMEI %s %s", rept.imei, data)
+                        if data.pop("type") == "location":
+                            data["imei"] = rept.imei
+                            stowloc(**data)
+
+                else:
+                    log.error("Event %s on unknown socket %s", fl, sk)
     except KeyboardInterrupt:
-        zsub.close()
+        zrep.close()
+        zraw.close()
         zctx.destroy()  # type: ignore
 
 
index 63dc3f5f294149c48d6ea8e910e283c3f174cdb3..aff3405691baa596ca36ca90e00f7067880cef3f 100755 (executable)
@@ -372,9 +372,9 @@ class _GPS_POSITIONING(GPS303Pkt):
     def rectified(self) -> CoordReport:  # JSON-able dict
         return CoordReport(
             devtime=str(self.devtime),
-            battery_percentage=-1,
-            accuracy=-1.0,
-            altitude=-1.0,
+            battery_percentage=None,
+            accuracy=None,
+            altitude=None,
             speed=self.speed,
             direction=self.heading,
             latitude=self.latitude,
@@ -503,7 +503,7 @@ class _WIFI_POSITIONING(GPS303Pkt):
     def rectified(self) -> HintReport:
         return HintReport(
             devtime=str(self.devtime),
-            battery_percentage=-1,
+            battery_percentage=None,
             mcc=self.mcc,
             mnc=self.mnc,
             gsm_cells=self.gsm_cells,
index 58954a2b217acbd0fa5a930476cee1882dea8ff1..7a863106309e61072511641a7a975f5e344abbdf 100644 (file)
@@ -51,6 +51,7 @@ class TestWithServers(TestCase):
         }
         self.conf["storage"] = {
             "dbfn": self.tmpfilebase + ".storage.sqlite",
+            "events": "yes",
         }
         self.conf["opencellid"] = {
             "dbfn": self.tmpfilebase + ".opencellid.sqlite",