from configparser import ConfigParser
from datetime import datetime, timezone
+from json import loads
from logging import getLogger
import zmq
from . import common
-from .evstore import initdb, stow
-from .zmsg import Bcast
+from .evstore import initdb, stow, stowloc
+from .zmsg import Bcast, Rept
log = getLogger("loctrkd/storage")
initdb(dbname)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
- zsub = zctx.socket(zmq.SUB) # type: ignore
- zsub.connect(conf.get("collector", "publishurl"))
- zsub.setsockopt(zmq.SUBSCRIBE, b"")
+ zraw = zctx.socket(zmq.SUB) # type: ignore
+ zraw.connect(conf.get("collector", "publishurl"))
+ if conf.getboolean("storage", "events", fallback=False):
+ zraw.setsockopt(zmq.SUBSCRIBE, b"")
+ zrep = zctx.socket(zmq.SUB) # type: ignore
+ zrep.connect(conf.get("rectifier", "publishurl"))
+ zrep.setsockopt(zmq.SUBSCRIBE, b"")
+ poller = zmq.Poller() # type: ignore
+ poller.register(zraw, flags=zmq.POLLIN)
+ poller.register(zrep, flags=zmq.POLLIN)
try:
while True:
- zmsg = Bcast(zsub.recv())
- log.debug(
- "%s IMEI %s from %s at %s: %s",
- "I" if zmsg.is_incoming else "O",
- zmsg.imei,
- zmsg.peeraddr,
- datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
- zmsg.packet.hex(),
- )
- stow(
- is_incoming=zmsg.is_incoming,
- peeraddr=str(zmsg.peeraddr),
- when=zmsg.when,
- imei=zmsg.imei,
- proto=zmsg.proto,
- packet=zmsg.packet,
- )
+ events = poller.poll(1000)
+ for sk, fl in events:
+ if sk is zraw:
+ while True:
+ try:
+ zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ log.debug(
+ "%s IMEI %s from %s at %s: %s",
+ "I" if zmsg.is_incoming else "O",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(
+ tz=timezone.utc
+ ),
+ zmsg.packet.hex(),
+ )
+ stow(
+ is_incoming=zmsg.is_incoming,
+ peeraddr=str(zmsg.peeraddr),
+ when=zmsg.when,
+ imei=zmsg.imei,
+ proto=zmsg.proto,
+ packet=zmsg.packet,
+ )
+ elif sk is zrep:
+ while True:
+ try:
+ rept = Rept(zrep.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ data = loads(rept.payload)
+ log.debug("R IMEI %s %s", rept.imei, data)
+ if data.pop("type") == "location":
+ data["imei"] = rept.imei
+ stowloc(**data)
+
+ else:
+ log.error("Event %s on unknown socket %s", fl, sk)
except KeyboardInterrupt:
- zsub.close()
+ zrep.close()
+ zraw.close()
zctx.destroy() # type: ignore