X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=921833c083be0ac8aecda838a338a4f72b5f818c;hb=3f74a195c346809d3075b1351705eb9ad543afd5;hp=be1bdeccc609c35b85bea167ceda15fed65bb6b9;hpb=3dea189c7bb47f02db07b52fdcda53fdb986fd2b;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index be1bdec..921833c 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -32,7 +32,6 @@ class Client: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() self.buffer = b"" - self.imei = None def recv(self): """Read from the socket and parse complete messages""" @@ -127,21 +126,23 @@ class Clients: def response(self, resp): if resp.imei in self.by_imei: self.by_imei[resp.imei].send(resp.packet) + else: + log.info("Not connected (IMEI %s)", resp.imei) def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) zpub.bind(conf.get("collector", "publishurl")) - zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("collector", "listenurl")) + zpull = zctx.socket(zmq.PULL) + zpull.bind(conf.get("collector", "listenurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) tcpfd = tcpl.fileno() poller = zmq.Poller() - poller.register(zsub, flags=zmq.POLLIN) + poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() try: @@ -149,19 +150,19 @@ def runserver(conf): tosend = [] topoll = [] tostop = [] - events = poller.poll(10) + events = poller.poll(1000) for sk, fl in events: - if sk is zsub: + if sk is zpull: while True: try: - msg = zsub.recv(zmq.NOBLOCK) + msg = zpull.recv(zmq.NOBLOCK) tosend.append(Resp(msg)) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) - else: + elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: log.debug( @@ -192,15 +193,18 @@ def runserver(conf): clients.response( Resp(imei=imei, packet=respmsg) ) + else: + log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd) clients.stop(fd) for zmsg in tosend: + log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) - poller.register(fd) + poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: pass