X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=c2efd79b82182e13319526a644abccb0e85071b9;hb=84c11e46ef8ec36824b0d040972de37e41943b5d;hp=00594e1d9f180bd542cdc99823f917f016f67f5d;hpb=10665d8aca2b350782616da33888343e8fcc1e65;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 00594e1..c2efd79 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -21,6 +21,8 @@ from .zmsg import Bcast, Resp log = getLogger("gps303/collector") +MAXBUFFER: int = 4096 + class Client: """Connected socket to the terminal plus buffer and metadata""" @@ -39,7 +41,7 @@ class Client: def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" try: - segment = self.sock.recv(4096) + segment = self.sock.recv(MAXBUFFER) except OSError as e: log.warning( "Reading from fd %d (IMEI %s): %s", @@ -57,6 +59,10 @@ class Client: return None when = time() self.buffer += segment + if len(self.buffer) > MAXBUFFER: + # We are receiving junk. Let's drop it or we run out of memory. + log.warning("More than %d unparseable data, dropping", MAXBUFFER) + self.buffer = b"" msgs = [] while True: framestart = self.buffer.find(b"xx") @@ -64,8 +70,9 @@ class Client: break if framestart > 0: # Should not happen, report log.warning( - 'Undecodable data "%s" from fd %d (IMEI %s)', - self.buffer[:framestart].hex(), + 'Undecodable data (%d) "%s" from fd %d (IMEI %s)', + framestart, + self.buffer[:framestart][:64].hex(), self.sock.fileno(), self.imei, ) @@ -82,8 +89,10 @@ class Client: # Do this embarrassing hack to avoid accidental match # of some binary data in the packet against '\r\n'. while True: - frameend = self.buffer.find(b"\r\n", frameend) - if frameend >= (exp_end - 3): # Found realistic match + frameend = self.buffer.find(b"\r\n", frameend + 1) + if frameend == -1 or frameend >= ( + exp_end - 3 + ): # Found realistic match or none break if frameend == -1: # Incomplete frame, return what we have break @@ -162,7 +171,7 @@ class Clients: log.info("Not connected (IMEI %s)", resp.imei) -def runserver(conf: ConfigParser) -> None: +def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zpub = zctx.socket(zmq.PUB) # type: ignore @@ -215,7 +224,7 @@ def runserver(conf: ConfigParser) -> None: packet=packet, ).packed ) - if proto == HIBERNATION.PROTO: + if proto == HIBERNATION.PROTO and handle_hibernate: log.debug( "HIBERNATION from fd %d (IMEI %s)", sk, @@ -249,7 +258,10 @@ def runserver(conf: ConfigParser) -> None: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: - pass + zpub.close() + zpull.close() + zctx.destroy() # type: ignore + tcpl.close() if __name__.endswith("__main__"):