1 """ Watch for locevt and print them """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from logging import getLogger
7 from typing import Any, cast, List
11 from .protomodule import ProtoModule
12 from .zmsg import Bcast, Rept
14 log = getLogger("loctrkd/watch")
17 def runserver(conf: ConfigParser) -> None:
18 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
19 zctx = zmq.Context() # type: ignore
20 zraw = zctx.socket(zmq.SUB) # type: ignore
21 zraw.connect(conf.get("collector", "publishurl"))
22 zraw.setsockopt(zmq.SUBSCRIBE, b"")
23 zrep = zctx.socket(zmq.SUB) # type: ignore
24 zrep.connect(conf.get("rectifier", "publishurl"))
25 zrep.setsockopt(zmq.SUBSCRIBE, b"")
26 poller = zmq.Poller() # type: ignore
27 poller.register(zraw, flags=zmq.POLLIN)
28 poller.register(zrep, flags=zmq.POLLIN)
32 events = poller.poll(1000)
37 zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
41 "I" if zmsg.is_incoming else "O",
45 pmod = common.pmod_for_proto(zmsg.proto)
47 msg = pmod.parse_message(
48 zmsg.packet, zmsg.is_incoming
51 if zmsg.is_incoming and hasattr(msg, "rectified"):
52 print("Rectified:", msg.rectified())
56 rept = Rept(zrep.recv(zmq.NOBLOCK))
61 print("what is this socket?!", sk)
62 except KeyboardInterrupt:
66 if __name__.endswith("__main__"):
67 runserver(common.init(log))