1 """ Store zmq broadcasts to sqlite """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
6 from logging import getLogger
10 from .evstore import initdb, stow, stowloc
11 from .zmsg import Bcast, Rept
13 log = getLogger("loctrkd/storage")
16 def runserver(conf: ConfigParser) -> None:
17 dbname = conf.get("storage", "dbfn")
18 log.info('Using Sqlite3 database "%s"', dbname)
20 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
21 zctx = zmq.Context() # type: ignore
22 zraw = zctx.socket(zmq.SUB) # type: ignore
23 zraw.connect(conf.get("collector", "publishurl"))
24 if conf.getboolean("storage", "events", fallback=False):
25 zraw.setsockopt(zmq.SUBSCRIBE, b"")
26 zrep = zctx.socket(zmq.SUB) # type: ignore
27 zrep.connect(conf.get("rectifier", "publishurl"))
28 zrep.setsockopt(zmq.SUBSCRIBE, b"")
29 poller = zmq.Poller() # type: ignore
30 poller.register(zraw, flags=zmq.POLLIN)
31 poller.register(zrep, flags=zmq.POLLIN)
35 events = poller.poll(1000)
40 zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
44 "%s IMEI %s from %s at %s: %s",
45 "I" if zmsg.is_incoming else "O",
48 datetime.fromtimestamp(zmsg.when).astimezone(
54 is_incoming=zmsg.is_incoming,
55 peeraddr=str(zmsg.peeraddr),
64 rept = Rept(zrep.recv(zmq.NOBLOCK))
67 data = loads(rept.payload)
68 log.debug("R IMEI %s %s", rept.imei, data)
69 if data.pop("type") == "location":
70 data["imei"] = rept.imei
74 log.error("Event %s on unknown socket %s", fl, sk)
75 except KeyboardInterrupt:
78 zctx.destroy() # type: ignore
81 if __name__.endswith("__main__"):
82 runserver(common.init(log))