From ea3dc3f2096472d502d376e13050a59f97efd4a8 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Sat, 30 Jul 2022 00:42:17 +0200 Subject: [PATCH] storage: save both raw and rectified reports --- debian/loctrkd.conf | 2 ++ loctrkd/common.py | 12 +++---- loctrkd/evstore.py | 47 +++++++++++++++++++++++--- loctrkd/rectifier.py | 8 ++--- loctrkd/storage.py | 78 ++++++++++++++++++++++++++++++------------- loctrkd/zx303proto.py | 8 ++--- test/common.py | 1 + 7 files changed, 114 insertions(+), 42 deletions(-) diff --git a/debian/loctrkd.conf b/debian/loctrkd.conf index a281043..5bc733c 100644 --- a/debian/loctrkd.conf +++ b/debian/loctrkd.conf @@ -15,6 +15,8 @@ htmlfile = /var/lib/loctrkd/index.html [storage] dbfn = /var/lib/loctrkd/trkloc.sqlite +# store raw events from the collector. Rectified reports are always stored. +events = yes [rectifier] # "opencellid" and "googlemaps" can be here. Both require an access token, diff --git a/loctrkd/common.py b/loctrkd/common.py index 81f8bb9..941a93e 100644 --- a/loctrkd/common.py +++ b/loctrkd/common.py @@ -106,11 +106,11 @@ class CoordReport(Report): self, *, devtime: str, - battery_percentage: int, - accuracy: float, - altitude: float, - speed: float, - direction: float, + battery_percentage: Optional[int], + accuracy: Optional[float], + altitude: Optional[float], + speed: Optional[float], + direction: Optional[float], latitude: float, longitude: float, ) -> None: @@ -131,7 +131,7 @@ class HintReport(Report): self, *, devtime: str, - battery_percentage: int, + battery_percentage: Optional[int], mcc: int, mnc: int, gsm_cells: List[Tuple[int, int, int]], diff --git a/loctrkd/evstore.py b/loctrkd/evstore.py index da34cb9..b026505 100644 --- a/loctrkd/evstore.py +++ b/loctrkd/evstore.py @@ -1,20 +1,32 @@ """ sqlite event store """ +from datetime import datetime +from json import dumps from sqlite3 import connect, OperationalError -from typing import Any, List, Tuple +from typing import Any, Dict, List, Tuple -__all__ = "fetch", "initdb", "stow" +__all__ = "fetch", "initdb", "stow", "stowloc" DB = None -SCHEMA = """create table if not exists events ( +SCHEMA = ( + """create table if not exists events ( tstamp real not null, imei text, peeraddr text not null, is_incoming int not null default TRUE, proto text not null, packet blob -)""" +)""", + """create table if not exists reports ( + imei text, + devtime text not null, + accuracy real, + latitude real, + longitude real, + remainder text +)""", +) def initdb(dbname: str) -> None: @@ -26,7 +38,8 @@ def initdb(dbname: str) -> None: is_incoming int not null default TRUE""" ) except OperationalError: - DB.execute(SCHEMA) + for stmt in SCHEMA: + DB.execute(stmt) def stow(**kwargs: Any) -> None: @@ -54,6 +67,30 @@ def stow(**kwargs: Any) -> None: DB.commit() +def stowloc(**kwargs: Dict[str, Any]) -> None: + assert DB is not None + parms = { + k: kwargs.pop(k) if k in kwargs else v + for k, v in ( + ("imei", None), + ("devtime", str(datetime.now())), + ("accuracy", None), + ("latitude", None), + ("longitude", None), + ) + } + parms["remainder"] = dumps(kwargs) + DB.execute( + """insert or ignore into reports + (imei, devtime, accuracy, latitude, longitude, remainder) + values + (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder) + """, + parms, + ) + DB.commit() + + def fetch( imei: str, matchlist: List[Tuple[bool, str]], backlog: int ) -> List[Tuple[bool, float, str, bytes]]: diff --git a/loctrkd/rectifier.py b/loctrkd/rectifier.py index 1da5752..e73aeae 100644 --- a/loctrkd/rectifier.py +++ b/loctrkd/rectifier.py @@ -92,10 +92,10 @@ def runserver(conf: ConfigParser) -> None: rept = CoordReport( devtime=rect.devtime, battery_percentage=rect.battery_percentage, - accuracy=-1, - altitude=-1, - speed=-1, - direction=-1, + accuracy=None, + altitude=None, + speed=None, + direction=None, latitude=lat, longitude=lon, ) diff --git a/loctrkd/storage.py b/loctrkd/storage.py index 128a457..5ea3b2c 100644 --- a/loctrkd/storage.py +++ b/loctrkd/storage.py @@ -2,12 +2,13 @@ from configparser import ConfigParser from datetime import datetime, timezone +from json import loads from logging import getLogger import zmq from . import common -from .evstore import initdb, stow -from .zmsg import Bcast +from .evstore import initdb, stow, stowloc +from .zmsg import Bcast, Rept log = getLogger("loctrkd/storage") @@ -18,31 +19,62 @@ def runserver(conf: ConfigParser) -> None: initdb(dbname) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore - zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zraw = zctx.socket(zmq.SUB) # type: ignore + zraw.connect(conf.get("collector", "publishurl")) + if conf.getboolean("storage", "events", fallback=False): + zraw.setsockopt(zmq.SUBSCRIBE, b"") + zrep = zctx.socket(zmq.SUB) # type: ignore + zrep.connect(conf.get("rectifier", "publishurl")) + zrep.setsockopt(zmq.SUBSCRIBE, b"") + poller = zmq.Poller() # type: ignore + poller.register(zraw, flags=zmq.POLLIN) + poller.register(zrep, flags=zmq.POLLIN) try: while True: - zmsg = Bcast(zsub.recv()) - log.debug( - "%s IMEI %s from %s at %s: %s", - "I" if zmsg.is_incoming else "O", - zmsg.imei, - zmsg.peeraddr, - datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), - zmsg.packet.hex(), - ) - stow( - is_incoming=zmsg.is_incoming, - peeraddr=str(zmsg.peeraddr), - when=zmsg.when, - imei=zmsg.imei, - proto=zmsg.proto, - packet=zmsg.packet, - ) + events = poller.poll(1000) + for sk, fl in events: + if sk is zraw: + while True: + try: + zmsg = Bcast(zraw.recv(zmq.NOBLOCK)) + except zmq.Again: + break + log.debug( + "%s IMEI %s from %s at %s: %s", + "I" if zmsg.is_incoming else "O", + zmsg.imei, + zmsg.peeraddr, + datetime.fromtimestamp(zmsg.when).astimezone( + tz=timezone.utc + ), + zmsg.packet.hex(), + ) + stow( + is_incoming=zmsg.is_incoming, + peeraddr=str(zmsg.peeraddr), + when=zmsg.when, + imei=zmsg.imei, + proto=zmsg.proto, + packet=zmsg.packet, + ) + elif sk is zrep: + while True: + try: + rept = Rept(zrep.recv(zmq.NOBLOCK)) + except zmq.Again: + break + data = loads(rept.payload) + log.debug("R IMEI %s %s", rept.imei, data) + if data.pop("type") == "location": + data["imei"] = rept.imei + stowloc(**data) + + else: + log.error("Event %s on unknown socket %s", fl, sk) except KeyboardInterrupt: - zsub.close() + zrep.close() + zraw.close() zctx.destroy() # type: ignore diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index 63dc3f5..aff3405 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -372,9 +372,9 @@ class _GPS_POSITIONING(GPS303Pkt): def rectified(self) -> CoordReport: # JSON-able dict return CoordReport( devtime=str(self.devtime), - battery_percentage=-1, - accuracy=-1.0, - altitude=-1.0, + battery_percentage=None, + accuracy=None, + altitude=None, speed=self.speed, direction=self.heading, latitude=self.latitude, @@ -503,7 +503,7 @@ class _WIFI_POSITIONING(GPS303Pkt): def rectified(self) -> HintReport: return HintReport( devtime=str(self.devtime), - battery_percentage=-1, + battery_percentage=None, mcc=self.mcc, mnc=self.mnc, gsm_cells=self.gsm_cells, diff --git a/test/common.py b/test/common.py index 58954a2..7a86310 100644 --- a/test/common.py +++ b/test/common.py @@ -51,6 +51,7 @@ class TestWithServers(TestCase): } self.conf["storage"] = { "dbfn": self.tmpfilebase + ".storage.sqlite", + "events": "yes", } self.conf["opencellid"] = { "dbfn": self.tmpfilebase + ".opencellid.sqlite", -- 2.43.0