zpush = zctx.socket(zmq.PUSH) # type: ignore
zpush.connect(conf.get("collector", "listenurl"))
zpub = zctx.socket(zmq.PUB) # type: ignore
- zpub.connect(conf.get("rectifier", "publishurl"))
+ zpub.bind(conf.get("rectifier", "publishurl"))
try:
while True:
)
log.debug("Sending reponse %s", resp)
zpush.send(resp.packed)
+ rept = CoordReport(
+ devtime=rect.devtime,
+ battery_percentage=rect.battery_percentage,
+ accuracy=-1,
+ altitude=-1,
+ speed=-1,
+ direction=-1,
+ latitude=lat,
+ longitude=lon,
+ )
+ log.debug("Sending report %s", rept)
zpub.send(
Rept(
imei=zmsg.imei,
- payload=CoordReport(
- devtime=rect.devtime,
- battery_percentage=rect.battery_percentage,
- accuracy=-1,
- altitude=-1,
- speed=-1,
- direction=-1,
- latitude=lat,
- longitude=lon,
- ).json,
+ payload=rept.json,
).packed
)
except Exception as e:
from . import common
from .protomodule import ProtoModule
-from .zmsg import Bcast
+from .zmsg import Bcast, Rept
log = getLogger("loctrkd/watch")
def runserver(conf: ConfigParser) -> None:
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
- zsub = zctx.socket(zmq.SUB) # type: ignore
- zsub.connect(conf.get("collector", "publishurl"))
- zsub.setsockopt(zmq.SUBSCRIBE, b"")
+ zraw = zctx.socket(zmq.SUB) # type: ignore
+ zraw.connect(conf.get("collector", "publishurl"))
+ zraw.setsockopt(zmq.SUBSCRIBE, b"")
+ zrep = zctx.socket(zmq.SUB) # type: ignore
+ zrep.connect(conf.get("rectifier", "publishurl"))
+ zrep.setsockopt(zmq.SUBSCRIBE, b"")
+ poller = zmq.Poller() # type: ignore
+ poller.register(zraw, flags=zmq.POLLIN)
+ poller.register(zrep, flags=zmq.POLLIN)
try:
while True:
- zmsg = Bcast(zsub.recv())
- print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei)
- pmod = common.pmod_for_proto(zmsg.proto)
- if pmod is not None:
- msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming)
- print(msg)
- if zmsg.is_incoming and hasattr(msg, "rectified"):
- print(msg.rectified())
+ events = poller.poll(1000)
+ for sk, fl in events:
+ if sk is zraw:
+ while True:
+ try:
+ zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ print(
+ "I" if zmsg.is_incoming else "O",
+ zmsg.proto,
+ zmsg.imei,
+ )
+ pmod = common.pmod_for_proto(zmsg.proto)
+ if pmod is not None:
+ msg = pmod.parse_message(
+ zmsg.packet, zmsg.is_incoming
+ )
+ print(msg)
+ if zmsg.is_incoming and hasattr(msg, "rectified"):
+ print("Rectified:", msg.rectified())
+ elif sk is zrep:
+ while True:
+ try:
+ rept = Rept(zrep.recv(zmq.NOBLOCK))
+ except zmq.Again:
+ break
+ print(rept)
+ else:
+ print("what is this socket?!", sk)
except KeyboardInterrupt:
pass