[storage]
dbfn = /var/lib/loctrkd/trkloc.sqlite
+# store raw events from the collector. Rectified reports are always stored.
+events = yes
[rectifier]
# "opencellid" and "googlemaps" can be here. Both require an access token,
self,
*,
devtime: str,
- battery_percentage: int,
- accuracy: float,
- altitude: float,
- speed: float,
- direction: float,
+ battery_percentage: Optional[int],
+ accuracy: Optional[float],
+ altitude: Optional[float],
+ speed: Optional[float],
+ direction: Optional[float],
latitude: float,
longitude: float,
) -> None:
self,
*,
devtime: str,
- battery_percentage: int,
+ battery_percentage: Optional[int],
mcc: int,
mnc: int,
gsm_cells: List[Tuple[int, int, int]],
""" sqlite event store """
+from datetime import datetime
+from json import dumps
from sqlite3 import connect, OperationalError
-from typing import Any, List, Tuple
+from typing import Any, Dict, List, Tuple
-__all__ = "fetch", "initdb", "stow"
+__all__ = "fetch", "initdb", "stow", "stowloc"
DB = None
-SCHEMA = """create table if not exists events (
+SCHEMA = (
+ """create table if not exists events (
tstamp real not null,
imei text,
peeraddr text not null,
is_incoming int not null default TRUE,
proto text not null,
packet blob
-)"""
+)""",
+ """create table if not exists reports (
+ imei text,
+ devtime text not null,
+ accuracy real,
+ latitude real,
+ longitude real,
+ remainder text
+)""",
+)
def initdb(dbname: str) -> None:
is_incoming int not null default TRUE"""
)
except OperationalError:
- DB.execute(SCHEMA)
+ for stmt in SCHEMA:
+ DB.execute(stmt)
def stow(**kwargs: Any) -> None:
DB.commit()
+def stowloc(**kwargs: Dict[str, Any]) -> None:
+ assert DB is not None
+ parms = {
+ k: kwargs.pop(k) if k in kwargs else v
+ for k, v in (
+ ("imei", None),
+ ("devtime", str(datetime.now())),
+ ("accuracy", None),
+ ("latitude", None),
+ ("longitude", None),
+ )
+ }
+ parms["remainder"] = dumps(kwargs)
+ DB.execute(
+ """insert or ignore into reports
+ (imei, devtime, accuracy, latitude, longitude, remainder)
+ values
+ (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
+ """,
+ parms,
+ )
+ DB.commit()
+
+
def fetch(
imei: str, matchlist: List[Tuple[bool, str]], backlog: int
) -> List[Tuple[bool, float, str, bytes]]:
rept = CoordReport(
devtime=rect.devtime,
battery_percentage=rect.battery_percentage,
- accuracy=-1,
- altitude=-1,
- speed=-1,
- direction=-1,
+ accuracy=None,
+ altitude=None,
+ speed=None,
+ direction=None,
latitude=lat,
longitude=lon,
)
from configparser import ConfigParser
from datetime import datetime, timezone
+from json import loads
from logging import getLogger
import zmq
from . import common
-from .evstore import initdb, stow
-from .zmsg import Bcast
+from .evstore import initdb, stow, stowloc
+from .zmsg import Bcast, Rept
log = getLogger("loctrkd/storage")
initdb(dbname)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
- zsub = zctx.socket(zmq.SUB) # type: ignore
- zsub.connect(conf.get("collector", "publishurl"))
- zsub.setsockopt(zmq.SUBSCRIBE, b"")
+ zraw = zctx.socket(zmq.SUB) # type: ignore
+ zraw.connect(conf.get("collector", "publishurl"))
+ if conf.getboolean("storage", "events", fallback=False):
+ zraw.setsockopt(zmq.SUBSCRIBE, b"")
+ zrep = zctx.socket(zmq.SUB) # type: ignore
+ zrep.connect(conf.get("rectifier", "publishurl"))
+ zrep.setsockopt(zmq.SUBSCRIBE, b"")
+ poller = zmq.Poller() # type: ignore
+ poller.register(zraw, flags=zmq.POLLIN)
+ poller.register(zrep, flags=zmq.POLLIN)
try:
while True:
- zmsg = Bcast(zsub.recv())
- log.debug(
- "%s IMEI %s from %s at %s: %s",
- "I" if zmsg.is_incoming else "O",
- zmsg.imei,
- zmsg.peeraddr,
- datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
- zmsg.packet.hex(),
- )
- stow(
- is_incoming=zmsg.is_incoming,
- peeraddr=str(zmsg.peeraddr),
- when=zmsg.when,
- imei=zmsg.imei,
- proto=zmsg.proto,
- packet=zmsg.packet,
- )
+ events = poller.poll(1000)
+ for sk, fl in events:
+ if sk is zraw:
+ while True:
+ try:
+ zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ log.debug(
+ "%s IMEI %s from %s at %s: %s",
+ "I" if zmsg.is_incoming else "O",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(
+ tz=timezone.utc
+ ),
+ zmsg.packet.hex(),
+ )
+ stow(
+ is_incoming=zmsg.is_incoming,
+ peeraddr=str(zmsg.peeraddr),
+ when=zmsg.when,
+ imei=zmsg.imei,
+ proto=zmsg.proto,
+ packet=zmsg.packet,
+ )
+ elif sk is zrep:
+ while True:
+ try:
+ rept = Rept(zrep.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ data = loads(rept.payload)
+ log.debug("R IMEI %s %s", rept.imei, data)
+ if data.pop("type") == "location":
+ data["imei"] = rept.imei
+ stowloc(**data)
+
+ else:
+ log.error("Event %s on unknown socket %s", fl, sk)
except KeyboardInterrupt:
- zsub.close()
+ zrep.close()
+ zraw.close()
zctx.destroy() # type: ignore
def rectified(self) -> CoordReport: # JSON-able dict
return CoordReport(
devtime=str(self.devtime),
- battery_percentage=-1,
- accuracy=-1.0,
- altitude=-1.0,
+ battery_percentage=None,
+ accuracy=None,
+ altitude=None,
speed=self.speed,
direction=self.heading,
latitude=self.latitude,
def rectified(self) -> HintReport:
return HintReport(
devtime=str(self.devtime),
- battery_percentage=-1,
+ battery_percentage=None,
mcc=self.mcc,
mnc=self.mnc,
gsm_cells=self.gsm_cells,
}
self.conf["storage"] = {
"dbfn": self.tmpfilebase + ".storage.sqlite",
+ "events": "yes",
}
self.conf["opencellid"] = {
"dbfn": self.tmpfilebase + ".opencellid.sqlite",