X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=3cb7b0bda98b09a961af7f5cc834ea95ae46a76a;hb=7bf96d53196eb6caf035335260a2dc25cec72e57;hp=9d694b0ae4c83180a7a718aa21be5daa4e7fe6f4;hpb=057cce452eb53d5fbe365a66669bd8dec7dfe989;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 9d694b0..3cb7b0b 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -18,6 +18,7 @@ import zmq from . import common from .gps303proto import ( + GPS303Conn, HIBERNATION, LOGIN, inline_response, @@ -37,13 +38,15 @@ class Client: def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.buffer = b"" + self.stream = GPS303Conn() self.imei: Optional[str] = None def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - self.buffer = b"" + rest = self.stream.close() + if rest: + log.warning("%d bytes in buffer on close: %s", len(rest), rest) def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" @@ -65,64 +68,22 @@ class Client: ) return None when = time() - self.buffer += segment - if len(self.buffer) > MAXBUFFER: - # We are receiving junk. Let's drop it or we run out of memory. - log.warning("More than %d unparseable data, dropping", MAXBUFFER) - self.buffer = b"" msgs = [] - while True: - framestart = self.buffer.find(b"xx") - if framestart == -1: # No frames, return whatever we have - break - if framestart > 0: # Should not happen, report + for elem in self.stream.recv(segment): + if isinstance(elem, bytes): + msgs.append((when, self.addr, elem)) + else: log.warning( - 'Undecodable data (%d) "%s" from fd %d (IMEI %s)', - framestart, - self.buffer[:framestart][:64].hex(), + "%s from fd %d (IMEI %s)", + elem, self.sock.fileno(), self.imei, ) - self.buffer = self.buffer[framestart:] - # At this point, buffer starts with a packet - if len(self.buffer) < 6: # no len and proto - cannot proceed - break - exp_end = self.buffer[2] + 3 # Expect '\r\n' here - frameend = 0 - # Length field can legitimeely be much less than the - # length of the packet (e.g. WiFi positioning), but - # it _should not_ be greater. Still sometimes it is. - # Luckily, not by too much: by maybe two or three bytes? - # Do this embarrassing hack to avoid accidental match - # of some binary data in the packet against '\r\n'. - while True: - frameend = self.buffer.find(b"\r\n", frameend + 1) - if frameend == -1 or frameend >= ( - exp_end - 3 - ): # Found realistic match or none - break - if frameend == -1: # Incomplete frame, return what we have - break - packet = self.buffer[2:frameend] - self.buffer = self.buffer[frameend + 2 :] - if len(packet) < 2: # frameend comes too early - log.warning("Packet too short: %s", packet) - break - if proto_of_message(packet) == LOGIN.PROTO: - msg = parse_message(packet) - if isinstance(msg, LOGIN): # Can be unparseable - self.imei = msg.imei - log.info( - "LOGIN from fd %d (IMEI %s)", - self.sock.fileno(), - self.imei, - ) - msgs.append((when, self.addr, packet)) return msgs def send(self, buffer: bytes) -> None: try: - self.sock.send(b"xx" + buffer + b"\r\n") + self.sock.send(self.stream.enframe(buffer)) except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", @@ -131,6 +92,9 @@ class Client: e, ) + def set_imei(self, imei: str) -> None: + self.imei = imei + class Clients: def __init__(self) -> None: @@ -160,8 +124,23 @@ class Clients: return None result = [] for when, peeraddr, packet in msgs: - if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... - if clnt.imei: + if proto_of_message(packet) == LOGIN.PROTO: + msg = parse_message(packet) + if isinstance(msg, LOGIN): # Can be unparseable + if clnt.imei is None: + clnt.imei = msg.imei + log.info( + "LOGIN from fd %d (IMEI %s)", + clnt.sock.fileno(), + clnt.imei, + ) + oldclnt = self.by_imei.get(clnt.imei) + if oldclnt is not None: + log.info( + "Orphaning fd %d with the same IMEI", + oldclnt.sock.fileno(), + ) + oldclnt.imei = None self.by_imei[clnt.imei] = clnt else: log.warning(