"class_by_prefix",
"enframe",
"inline_response",
+ "proto_handled",
"parse_message",
"probe_buffer",
"proto_by_name",
return CLASSES[proto]
+def proto_handled(proto: str) -> bool:
+ return proto.startswith(PROTO_PREFIX)
+
+
def proto_name(obj: Union[MetaPkt, BeeSurePkt]) -> str:
return PROTO_PREFIX + (
obj.__class__.__name__ if isinstance(obj, BeeSurePkt) else obj.__name__
class ProtoModule:
- PROTO_PREFIX: str
+ @staticmethod
+ def proto_handled(proto: str) -> bool:
+ ...
@staticmethod
def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
zmsg = Bcast(zsub.recv())
print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei)
for pmod in pmods:
- if zmsg.proto.startswith(pmod.PROTO_PREFIX):
+ if pmod.proto_handled(zmsg.proto.startswith):
msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming)
print(msg)
except KeyboardInterrupt:
"Stream",
"class_by_prefix",
"inline_response",
+ "proto_handled",
"parse_message",
"probe_buffer",
"proto_by_name",
return CLASSES[proto]
+def proto_handled(proto: str) -> bool:
+ return proto.startswith(PROTO_PREFIX)
+
+
def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
return PROTO_PREFIX + (
obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__