]> average.org Git - loctrkd.git/commitdiff
move stream parser/deframer to the protocol module
authorEugene Crosser <crosser@average.org>
Tue, 28 Jun 2022 20:27:31 +0000 (22:27 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 20:29:10 +0000 (22:29 +0200)
gps303/collector.py
gps303/gps303proto.py

index ce41cb57b1c91cdf7b3d1f656a1fe98057a672b7..5215d7b42289762b8b8bc79b2da0de0ec2203e09 100644 (file)
@@ -18,6 +18,8 @@ import zmq
 
 from . import common
 from .gps303proto import (
+    GPS303Conn,
+    StreamError,
     HIBERNATION,
     LOGIN,
     inline_response,
@@ -37,13 +39,15 @@ class Client:
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
-        self.buffer = b""
+        self.stream = GPS303Conn()
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
-        self.buffer = b""
+        rest = self.stream.close()
+        if rest:
+            log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
         """Read from the socket and parse complete messages"""
@@ -65,55 +69,20 @@ class Client:
             )
             return None
         when = time()
-        self.buffer += segment
-        if len(self.buffer) > MAXBUFFER:
-            # We are receiving junk. Let's drop it or we run out of memory.
-            log.warning("More than %d unparseable data, dropping", MAXBUFFER)
-            self.buffer = b""
-        msgs = []
         while True:
-            framestart = self.buffer.find(b"xx")
-            if framestart == -1:  # No frames, return whatever we have
-                break
-            if framestart > 0:  # Should not happen, report
+            try:
+                return [
+                    (when, self.addr, packet)
+                    for packet in self.stream.recv(segment)
+                ]
+            except StreamError as e:
                 log.warning(
-                    'Undecodable data (%d) "%s" from fd %d (IMEI %s)',
-                    framestart,
-                    self.buffer[:framestart][:64].hex(),
-                    self.sock.fileno(),
-                    self.imei,
+                    "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
                 )
-                self.buffer = self.buffer[framestart:]
-            # At this point, buffer starts with a packet
-            if len(self.buffer) < 6:  # no len and proto - cannot proceed
-                break
-            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
-            frameend = 0
-            # Length field can legitimeely be much less than the
-            # length of the packet (e.g. WiFi positioning), but
-            # it _should not_ be greater. Still sometimes it is.
-            # Luckily, not by too much: by maybe two or three bytes?
-            # Do this embarrassing hack to avoid accidental match
-            # of some binary data in the packet against '\r\n'.
-            while True:
-                frameend = self.buffer.find(b"\r\n", frameend + 1)
-                if frameend == -1 or frameend >= (
-                    exp_end - 3
-                ):  # Found realistic match or none
-                    break
-            if frameend == -1:  # Incomplete frame, return what we have
-                break
-            packet = self.buffer[2:frameend]
-            self.buffer = self.buffer[frameend + 2 :]
-            if len(packet) < 2:  # frameend comes too early
-                log.warning("Packet too short: %s", packet)
-                break
-            msgs.append((when, self.addr, packet))
-        return msgs
 
     def send(self, buffer: bytes) -> None:
         try:
-            self.sock.send(b"xx" + buffer + b"\r\n")
+            self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
             log.error(
                 "Sending to fd %d (IMEI %s): %s",
@@ -122,6 +91,9 @@ class Client:
                 e,
             )
 
+    def set_imei(self, imei: str) -> None:
+        self.imei = imei
+
 
 class Clients:
     def __init__(self) -> None:
index 0a222d05634930679995e6e4807d939d35104346..e6a80fe5118f4df0c8370539290a13c44e6de821 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from time import time
 from typing import (
     Any,
     Callable,
@@ -31,6 +32,8 @@ from typing import (
 )
 
 __all__ = (
+    "GPS303Conn",
+    "StreamError",
     "class_by_prefix",
     "inline_response",
     "parse_message",
@@ -77,6 +80,76 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class StreamError(Exception):
+    pass
+
+
+class GPS303Conn:
+    def __init__(self) -> None:
+        self.buffer = b""
+
+    @staticmethod
+    def enframe(buffer: bytes) -> bytes:
+        return b"xx" + buffer + b"\r\n"
+
+    def recv(self, segment: bytes) -> List[bytes]:
+        when = time()
+        self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            self.buffer = b""
+            raise StreamError(
+                f"More than {MAXBUFFER} unparseable data, dropping"
+            )
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                self.buffer = self.buffer[framestart:]
+                raise StreamError(
+                    f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+                )
+            # At this point, buffer starts with a packet
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
+                    break
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if len(packet) < 2:  # frameend comes too early
+                raise StreamError(f"Packet too short: {packet.hex()}")
+            msgs.append(packet)
+        return msgs
+
+    def close(self) -> bytes:
+        ret = self.buffer
+        self.buffer = b""
+        return ret
+
+
+### Parser/Constructor ###
+
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None: