]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
improve diagnistic message about left data
[loctrkd.git] / gps303 / gps303proto.py
index 0a222d05634930679995e6e4807d939d35104346..e3776c3a529276fd333d619ea1946a8264f8337b 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from time import time
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,9 +32,11 @@ from typing import (
 )
 
 __all__ = (
 )
 
 __all__ = (
+    "Stream",
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "class_by_prefix",
     "inline_response",
     "parse_message",
+    "probe_buffer",
     "proto_by_name",
     "DecodeError",
     "Respond",
     "proto_by_name",
     "DecodeError",
     "Respond",
@@ -77,6 +80,75 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
     "UNKNOWN_B3",
 )
 
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class Stream:
+    def __init__(self) -> None:
+        self.buffer = b""
+
+    @staticmethod
+    def enframe(buffer: bytes) -> bytes:
+        return b"xx" + buffer + b"\r\n"
+
+    def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+        """
+        Process next segment of the stream. Return successfully deframed
+        packets as `bytes` and error messages as `str`.
+        """
+        when = time()
+        self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            self.buffer = b""
+            return [f"More than {MAXBUFFER} unparseable data, dropping"]
+        msgs: List[Union[bytes, str]] = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                msgs.append(
+                    f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+                )
+                self.buffer = self.buffer[framestart:]
+            # At this point, buffer starts with a packet
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
+                    break
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if len(packet) < 2:  # frameend comes too early
+                msgs.append(f"Packet too short: {packet.hex()}")
+            else:
+                msgs.append(packet)
+        return msgs
+
+    def close(self) -> bytes:
+        ret = self.buffer
+        self.buffer = b""
+        return ret
+
+
+### Parser/Constructor ###
+
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
@@ -808,6 +880,18 @@ def proto_of_message(packet: bytes) -> int:
     return packet[1]
 
 
     return packet[1]
 
 
+def imei_from_packet(packet: bytes) -> Optional[str]:
+    if proto_of_message(packet) == LOGIN.PROTO:
+        msg = parse_message(packet)
+        if isinstance(msg, LOGIN):
+            return msg.imei
+    return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+    return proto_of_message(packet) == HIBERNATION.PROTO
+
+
 def inline_response(packet: bytes) -> Optional[bytes]:
     proto = proto_of_message(packet)
     if proto in CLASSES:
 def inline_response(packet: bytes) -> Optional[bytes]:
     proto = proto_of_message(packet)
     if proto in CLASSES:
@@ -817,6 +901,15 @@ def inline_response(packet: bytes) -> Optional[bytes]:
     return None
 
 
     return None
 
 
+def probe_buffer(buffer: bytes) -> bool:
+    framestart = buffer.find(b"xx")
+    if framestart < 0:
+        return False
+    if len(buffer) - framestart < 6:
+        return False
+    return True
+
+
 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])