X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=0efc669fd908e83a480c3e86839efda8b2254d45;hb=e84104c8d7e93efc4ab2f543e7dfef4cc0208187;hp=8bcca303a627e75b9220fa89d0cdf5f81f761357;hpb=faa8ce9d87530105ec2ad2f4809a9ace581a2ad6;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 8bcca30..0efc669 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,6 +1,7 @@ """ TCP server that communicates with terminals """ from logging import getLogger +from os import umask from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time from struct import pack @@ -37,7 +38,7 @@ class Client: """Read from the socket and parse complete messages""" try: segment = self.sock.recv(4096) - except OSError: + except OSError as e: log.warning( "Reading from fd %d (IMEI %s): %s", self.sock.fileno(), @@ -68,7 +69,20 @@ class Client: ) self.buffer = self.buffer[framestart:] # At this point, buffer starts with a packet - frameend = self.buffer.find(b"\r\n", 4) + if len(self.buffer) < 6: # no len and proto - cannot proceed + break + exp_end = self.buffer[2] + 3 # Expect '\r\n' here + frameend = 0 + # Length field can legitimeely be much less than the + # length of the packet (e.g. WiFi positioning), but + # it _should not_ be greater. Still sometimes it is. + # Luckily, not by too much: by maybe two or three bytes? + # Do this embarrassing hack to avoid accidental match + # of some binary data in the packet against '\r\n'. + while True: + frameend = self.buffer.find(b"\r\n", frameend) + if frameend >= (exp_end - 3): # Found realistic match + break if frameend == -1: # Incomplete frame, return what we have break packet = self.buffer[2:frameend] @@ -140,9 +154,11 @@ class Clients: def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) - zpub.bind(conf.get("collector", "publishurl")) zpull = zctx.socket(zmq.PULL) + oldmask = umask(0o117) + zpub.bind(conf.get("collector", "publishurl")) zpull.bind(conf.get("collector", "listenurl")) + umask(oldmask) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) @@ -163,7 +179,8 @@ def runserver(conf): while True: try: msg = zpull.recv(zmq.NOBLOCK) - tosend.append(Resp(msg)) + zmsg = Resp(msg) + tosend.append(zmsg) except zmq.Again: break elif sk == tcpfd: @@ -172,9 +189,7 @@ def runserver(conf): elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: - log.debug( - "Terminal gone from fd %d (IMEI %s)", sk, imei - ) + log.debug("Terminal gone from fd %d", sk) tostop.append(sk) else: for imei, when, peeraddr, packet in received: @@ -197,8 +212,8 @@ def runserver(conf): tostop.append(sk) respmsg = inline_response(packet) if respmsg is not None: - clients.response( - Resp(imei=imei, packet=respmsg) + tosend.append( + Resp(imei=imei, when=when, packet=respmsg) ) else: log.debug("Stray event: %s on socket %s", fl, sk) @@ -207,6 +222,15 @@ def runserver(conf): poller.unregister(fd) clients.stop(fd) for zmsg in tosend: + zpub.send( + Bcast( + is_incoming=False, + proto=proto_of_message(zmsg.packet), + when=zmsg.when, + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) for clntsock, clntaddr in topoll: