]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
move stream parser/deframer to the protocol module
[loctrkd.git] / gps303 / gps303proto.py
index 0d02b082c18688777aa9fa4a7bbbbc0ae3727276..e6a80fe5118f4df0c8370539290a13c44e6de821 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from time import time
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,6 +32,8 @@ from typing import (
 )
 
 __all__ = (
 )
 
 __all__ = (
+    "GPS303Conn",
+    "StreamError",
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "class_by_prefix",
     "inline_response",
     "parse_message",
@@ -77,6 +80,76 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
     "UNKNOWN_B3",
 )
 
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class StreamError(Exception):
+    pass
+
+
+class GPS303Conn:
+    def __init__(self) -> None:
+        self.buffer = b""
+
+    @staticmethod
+    def enframe(buffer: bytes) -> bytes:
+        return b"xx" + buffer + b"\r\n"
+
+    def recv(self, segment: bytes) -> List[bytes]:
+        when = time()
+        self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            self.buffer = b""
+            raise StreamError(
+                f"More than {MAXBUFFER} unparseable data, dropping"
+            )
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                self.buffer = self.buffer[framestart:]
+                raise StreamError(
+                    f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+                )
+            # At this point, buffer starts with a packet
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
+                    break
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if len(packet) < 2:  # frameend comes too early
+                raise StreamError(f"Packet too short: {packet.hex()}")
+            msgs.append(packet)
+        return msgs
+
+    def close(self) -> bytes:
+        ret = self.buffer
+        self.buffer = b""
+        return ret
+
+
+### Parser/Constructor ###
+
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
@@ -85,8 +158,8 @@ class DecodeError(Exception):
             setattr(self, k, v)
 
 
             setattr(self, k, v)
 
 
-def maybe_int(x: Optional[int]) -> Optional[int]:
-    return None if x is None else int(x)
+def maybe(typ: type) -> Callable[[Any], Any]:
+    return lambda x: None if x is None else typ(x)
 
 
 def intx(x: Union[str, int]) -> int:
 
 
 def intx(x: Union[str, int]) -> int:
@@ -303,7 +376,7 @@ class GPS303Pkt(metaclass=MetaPkt):
     @property
     def packed(self) -> bytes:
         payload = self.encode()
     @property
     def packed(self) -> bytes:
         payload = self.encode()
-        length = len(payload) + 1
+        length = getattr(self, "length", len(payload) + 1)
         return pack("BB", length, self.PROTO) + payload
 
 
         return pack("BB", length, self.PROTO) + payload
 
 
@@ -389,7 +462,7 @@ class STATUS(GPS303Pkt):
         ("ver", int, 0),
         ("timezone", int, 0),
         ("intvl", int, 0),
         ("ver", int, 0),
         ("timezone", int, 0),
         ("intvl", int, 0),
-        ("signal", maybe_int, None),
+        ("signal", maybe(int), None),
     )
     OUT_KWARGS = (("upload_interval", int, 25),)
 
     )
     OUT_KWARGS = (("upload_interval", int, 25),)
 
@@ -403,10 +476,8 @@ class STATUS(GPS303Pkt):
             self.signal = None
 
     def in_encode(self) -> bytes:
             self.signal = None
 
     def in_encode(self) -> bytes:
-        return (
-            pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
-            if self.signal is None
-            else pack("B", self.signal)
+        return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
+            b"" if self.signal is None else pack("B", self.signal)
         )
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         )
 
     def out_encode(self) -> bytes:  # Set interval in minutes
@@ -435,6 +506,15 @@ class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
+    IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
+        # IN_KWARGS = (
+        ("dtime", bytes, b"\0\0\0\0\0\0"),
+        ("wifi_aps", list, []),
+        ("mcc", int, 0),
+        ("mnc", int, 0),
+        ("gsm_cells", list, []),
+    )
+
     def in_decode(self, length: int, payload: bytes) -> None:
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
     def in_decode(self, length: int, payload: bytes) -> None:
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
@@ -459,6 +539,28 @@ class _WIFI_POSITIONING(GPS303Pkt):
             )
             self.gsm_cells.append((locac, cellid, -sigstr))
 
             )
             self.gsm_cells.append((locac, cellid, -sigstr))
 
+    def in_encode(self) -> bytes:
+        self.length = len(self.wifi_aps)
+        return b"".join(
+            [
+                self.dtime,
+                b"".join(
+                    [
+                        bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
+                        + pack("B", -sigstr)
+                        for mac, sigstr in self.wifi_aps
+                    ]
+                ),
+                pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
+                b"".join(
+                    [
+                        pack("!HHB", locac, cellid, -sigstr)
+                        for locac, cellid, sigstr in self.gsm_cells
+                    ]
+                ),
+            ]
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -655,6 +757,9 @@ class SETUP(GPS303Pkt):
             + [b";".join([el.encode() for el in self.phonenumbers])]
         )
 
             + [b";".join([el.encode() for el in self.phonenumbers])]
         )
 
+    def in_encode(self) -> bytes:
+        return b""
+
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
     PROTO = 0x58
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
     PROTO = 0x58