)
__all__ = (
- "GPS303Conn",
+ "Stream",
"class_by_prefix",
"inline_response",
"parse_message",
+ "probe_buffer",
"proto_by_name",
"DecodeError",
"Respond",
MAXBUFFER: int = 4096
-class GPS303Conn:
+class Stream:
def __init__(self) -> None:
self.buffer = b""
return packet[1]
+def imei_from_packet(packet: bytes) -> Optional[str]:
+ if proto_of_message(packet) == LOGIN.PROTO:
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN):
+ return msg.imei
+ return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+ return proto_of_message(packet) == HIBERNATION.PROTO
+
+
def inline_response(packet: bytes) -> Optional[bytes]:
proto = proto_of_message(packet)
if proto in CLASSES:
return None
+def probe_buffer(buffer: bytes) -> bool:
+ framestart = buffer.find(b"xx")
+ if framestart < 0:
+ return False
+ if len(buffer) - framestart < 6:
+ return False
+ return True
+
+
def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])