"Stream",
"class_by_prefix",
"inline_response",
+ "proto_handled",
"parse_message",
"probe_buffer",
- "proto_by_name",
"proto_name",
"DecodeError",
"Respond",
"UNKNOWN_B3",
)
-PROTO_PREFIX = "ZX"
+PROTO_PREFIX = "ZX:"
### Deframer ###
def __init__(self) -> None:
self.buffer = b""
- @staticmethod
- def enframe(buffer: bytes) -> bytes:
- return b"xx" + buffer + b"\r\n"
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
"""
Process next segment of the stream. Return successfully deframed
return ret
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+
### Parser/Constructor ###
def class_by_prefix(
prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+ if prefix.startswith(PROTO_PREFIX):
+ pname = prefix[len(PROTO_PREFIX) :]
+ else:
+ raise KeyError(pname)
lst = [
(name, proto)
for name, proto in PROTOS.items()
if name.upper().startswith(prefix.upper())
]
if len(lst) != 1:
- return lst
+ return [name for name, _ in lst]
_, proto = lst[0]
return CLASSES[proto]
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
- return (
- PROTO_PREFIX
- + ":"
- + (
- obj.__class__.__name__
- if isinstance(obj, GPS303Pkt)
- else obj.__name__
- )
- ).ljust(16, "\0")[:16]
+def proto_handled(proto: str) -> bool:
+ return proto.startswith(PROTO_PREFIX)
-def proto_by_name(name: str) -> int:
- return PROTOS.get(name, -1)
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+ return PROTO_PREFIX + (
+ obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
+ )
def proto_of_message(packet: bytes) -> str: