)
__all__ = (
- "GPS303Conn",
+ "Stream",
"class_by_prefix",
"inline_response",
"parse_message",
+ "probe_buffer",
"proto_by_name",
"DecodeError",
"Respond",
MAXBUFFER: int = 4096
-class GPS303Conn:
+class Stream:
def __init__(self) -> None:
self.buffer = b""
return None
+def probe_buffer(buffer: bytes) -> bool:
+ framestart = buffer.find(b"xx")
+ if framestart < 0:
+ return False
+ if len(buffer) - framestart < 6:
+ return False
+ return True
+
+
def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])