]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
move stream parser/deframer to the protocol module
[loctrkd.git] / gps303 / gps303proto.py
index 0a222d05634930679995e6e4807d939d35104346..e6a80fe5118f4df0c8370539290a13c44e6de821 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from time import time
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,6 +32,8 @@ from typing import (
 )
 
 __all__ = (
 )
 
 __all__ = (
+    "GPS303Conn",
+    "StreamError",
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "class_by_prefix",
     "inline_response",
     "parse_message",
@@ -77,6 +80,76 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
     "UNKNOWN_B3",
 )
 
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class StreamError(Exception):
+    pass
+
+
+class GPS303Conn:
+    def __init__(self) -> None:
+        self.buffer = b""
+
+    @staticmethod
+    def enframe(buffer: bytes) -> bytes:
+        return b"xx" + buffer + b"\r\n"
+
+    def recv(self, segment: bytes) -> List[bytes]:
+        when = time()
+        self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            self.buffer = b""
+            raise StreamError(
+                f"More than {MAXBUFFER} unparseable data, dropping"
+            )
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                self.buffer = self.buffer[framestart:]
+                raise StreamError(
+                    f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+                )
+            # At this point, buffer starts with a packet
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
+                    break
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if len(packet) < 2:  # frameend comes too early
+                raise StreamError(f"Packet too short: {packet.hex()}")
+            msgs.append(packet)
+        return msgs
+
+    def close(self) -> bytes:
+        ret = self.buffer
+        self.buffer = b""
+        return ret
+
+
+### Parser/Constructor ###
+
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None: