From 2ad23fd8e9afe8d6c46263124ecd88ea28fd3059 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Tue, 28 Jun 2022 22:27:31 +0200 Subject: [PATCH] move stream parser/deframer to the protocol module --- gps303/collector.py | 62 ++++++++++-------------------------- gps303/gps303proto.py | 73 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 45 deletions(-) diff --git a/gps303/collector.py b/gps303/collector.py index ce41cb5..5215d7b 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -18,6 +18,8 @@ import zmq from . import common from .gps303proto import ( + GPS303Conn, + StreamError, HIBERNATION, LOGIN, inline_response, @@ -37,13 +39,15 @@ class Client: def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.buffer = b"" + self.stream = GPS303Conn() self.imei: Optional[str] = None def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - self.buffer = b"" + rest = self.stream.close() + if rest: + log.warning("%d bytes in buffer on close: %s", len(rest), rest) def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" @@ -65,55 +69,20 @@ class Client: ) return None when = time() - self.buffer += segment - if len(self.buffer) > MAXBUFFER: - # We are receiving junk. Let's drop it or we run out of memory. - log.warning("More than %d unparseable data, dropping", MAXBUFFER) - self.buffer = b"" - msgs = [] while True: - framestart = self.buffer.find(b"xx") - if framestart == -1: # No frames, return whatever we have - break - if framestart > 0: # Should not happen, report + try: + return [ + (when, self.addr, packet) + for packet in self.stream.recv(segment) + ] + except StreamError as e: log.warning( - 'Undecodable data (%d) "%s" from fd %d (IMEI %s)', - framestart, - self.buffer[:framestart][:64].hex(), - self.sock.fileno(), - self.imei, + "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei ) - self.buffer = self.buffer[framestart:] - # At this point, buffer starts with a packet - if len(self.buffer) < 6: # no len and proto - cannot proceed - break - exp_end = self.buffer[2] + 3 # Expect '\r\n' here - frameend = 0 - # Length field can legitimeely be much less than the - # length of the packet (e.g. WiFi positioning), but - # it _should not_ be greater. Still sometimes it is. - # Luckily, not by too much: by maybe two or three bytes? - # Do this embarrassing hack to avoid accidental match - # of some binary data in the packet against '\r\n'. - while True: - frameend = self.buffer.find(b"\r\n", frameend + 1) - if frameend == -1 or frameend >= ( - exp_end - 3 - ): # Found realistic match or none - break - if frameend == -1: # Incomplete frame, return what we have - break - packet = self.buffer[2:frameend] - self.buffer = self.buffer[frameend + 2 :] - if len(packet) < 2: # frameend comes too early - log.warning("Packet too short: %s", packet) - break - msgs.append((when, self.addr, packet)) - return msgs def send(self, buffer: bytes) -> None: try: - self.sock.send(b"xx" + buffer + b"\r\n") + self.sock.send(self.stream.enframe(buffer)) except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", @@ -122,6 +91,9 @@ class Client: e, ) + def set_imei(self, imei: str) -> None: + self.imei = imei + class Clients: def __init__(self) -> None: diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 0a222d0..e6a80fe 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -18,6 +18,7 @@ from datetime import datetime, timezone from enum import Enum from inspect import isclass from struct import error, pack, unpack +from time import time from typing import ( Any, Callable, @@ -31,6 +32,8 @@ from typing import ( ) __all__ = ( + "GPS303Conn", + "StreamError", "class_by_prefix", "inline_response", "parse_message", @@ -77,6 +80,76 @@ __all__ = ( "UNKNOWN_B3", ) +### Deframer ### + +MAXBUFFER: int = 4096 + + +class StreamError(Exception): + pass + + +class GPS303Conn: + def __init__(self) -> None: + self.buffer = b"" + + @staticmethod + def enframe(buffer: bytes) -> bytes: + return b"xx" + buffer + b"\r\n" + + def recv(self, segment: bytes) -> List[bytes]: + when = time() + self.buffer += segment + if len(self.buffer) > MAXBUFFER: + # We are receiving junk. Let's drop it or we run out of memory. + self.buffer = b"" + raise StreamError( + f"More than {MAXBUFFER} unparseable data, dropping" + ) + msgs = [] + while True: + framestart = self.buffer.find(b"xx") + if framestart == -1: # No frames, return whatever we have + break + if framestart > 0: # Should not happen, report + self.buffer = self.buffer[framestart:] + raise StreamError( + f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"' + ) + # At this point, buffer starts with a packet + if len(self.buffer) < 6: # no len and proto - cannot proceed + break + exp_end = self.buffer[2] + 3 # Expect '\r\n' here + frameend = 0 + # Length field can legitimeely be much less than the + # length of the packet (e.g. WiFi positioning), but + # it _should not_ be greater. Still sometimes it is. + # Luckily, not by too much: by maybe two or three bytes? + # Do this embarrassing hack to avoid accidental match + # of some binary data in the packet against '\r\n'. + while True: + frameend = self.buffer.find(b"\r\n", frameend + 1) + if frameend == -1 or frameend >= ( + exp_end - 3 + ): # Found realistic match or none + break + if frameend == -1: # Incomplete frame, return what we have + break + packet = self.buffer[2:frameend] + self.buffer = self.buffer[frameend + 2 :] + if len(packet) < 2: # frameend comes too early + raise StreamError(f"Packet too short: {packet.hex()}") + msgs.append(packet) + return msgs + + def close(self) -> bytes: + ret = self.buffer + self.buffer = b"" + return ret + + +### Parser/Constructor ### + class DecodeError(Exception): def __init__(self, e: Exception, **kwargs: Any) -> None: -- 2.43.0