X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fwatch.py;h=5338d03495564d25bd5d100769c7294dfae944d3;hb=a4a6606a30e8ba743c269083f1922222b0e1e81a;hp=738cfe230be94dfb415a6c40b21045e8e1583960;hpb=ba4cb894d37f24ac333b316cf9487dfc913eaf74;p=loctrkd.git diff --git a/loctrkd/watch.py b/loctrkd/watch.py index 738cfe2..5338d03 100644 --- a/loctrkd/watch.py +++ b/loctrkd/watch.py @@ -8,44 +8,57 @@ from typing import Any, cast, List import zmq from . import common -from .zmsg import Bcast +from .protomodule import ProtoModule +from .zmsg import Bcast, Rept log = getLogger("loctrkd/watch") -class ProtoModule: - @staticmethod - def proto_handled(proto: str) -> bool: - ... - - @staticmethod - def parse_message(packet: bytes, is_incoming: bool = True) -> Any: - ... - - -pmods: List[ProtoModule] = [] - - def runserver(conf: ConfigParser) -> None: - global pmods - pmods = [ - cast(ProtoModule, import_module("." + modnm, __package__)) - for modnm in conf.get("collector", "protocols").split(",") - ] # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore - zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zraw = zctx.socket(zmq.SUB) # type: ignore + zraw.connect(conf.get("collector", "publishurl")) + zraw.setsockopt(zmq.SUBSCRIBE, b"") + zrep = zctx.socket(zmq.SUB) # type: ignore + zrep.connect(conf.get("rectifier", "publishurl")) + zrep.setsockopt(zmq.SUBSCRIBE, b"") + poller = zmq.Poller() # type: ignore + poller.register(zraw, flags=zmq.POLLIN) + poller.register(zrep, flags=zmq.POLLIN) try: while True: - zmsg = Bcast(zsub.recv()) - print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei) - for pmod in pmods: - if pmod.proto_handled(zmsg.proto.startswith): - msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming) - print(msg) + events = poller.poll(1000) + for sk, fl in events: + if sk is zraw: + while True: + try: + zmsg = Bcast(zraw.recv(zmq.NOBLOCK)) + except zmq.Again: + break + print( + "I" if zmsg.is_incoming else "O", + zmsg.proto, + zmsg.imei, + ) + pmod = common.pmod_for_proto(zmsg.proto) + if pmod is not None: + msg = pmod.parse_message( + zmsg.packet, zmsg.is_incoming + ) + print(msg) + if zmsg.is_incoming and hasattr(msg, "rectified"): + print("Rectified:", msg.rectified()) + elif sk is zrep: + while True: + try: + rept = Rept(zrep.recv(zmq.NOBLOCK)) + except zmq.Again: + break + print(rept) + else: + print("what is this socket?!", sk) except KeyboardInterrupt: pass