X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fzx303proto.py;h=bd19e108c770221e3a0132dfc7c863e23181622f;hb=1f1c34f9f42911958be5b282bd9092c1ed3de2a5;hp=c47c216134c1e6ff737d01c4107c79351c3c07ea;hpb=5ef83cb7db7464a5a625b0b7c86c4e25ebbb0de1;p=loctrkd.git diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index c47c216..bd19e10 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -34,10 +34,12 @@ from typing import ( __all__ = ( "Stream", "class_by_prefix", + "enframe", + "exposed_protos", "inline_response", + "proto_handled", "parse_message", "probe_buffer", - "proto_by_name", "proto_name", "DecodeError", "Respond", @@ -863,28 +865,32 @@ if True: # just to indent the code, sorry! def class_by_prefix( prefix: str, -) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]: +) -> Union[Type[GPS303Pkt], List[str]]: + if prefix.startswith(PROTO_PREFIX): + pname = prefix[len(PROTO_PREFIX) :] + else: + raise KeyError(pname) lst = [ (name, proto) for name, proto in PROTOS.items() if name.upper().startswith(prefix.upper()) ] if len(lst) != 1: - return lst + return [name for name, _ in lst] _, proto = lst[0] return CLASSES[proto] +def proto_handled(proto: str) -> bool: + return proto.startswith(PROTO_PREFIX) + + def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str: return PROTO_PREFIX + ( obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__ ) -def proto_by_name(name: str) -> int: - return PROTOS.get(name, -1) - - def proto_of_message(packet: bytes) -> str: return proto_name(CLASSES.get(packet[1], UNKNOWN)) @@ -942,3 +948,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: retobj.PROTO = proto # Override class attr with object attr retobj.cause = cause return retobj + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [ + (proto_name(GPS_POSITIONING), True), + (proto_name(WIFI_POSITIONING), False), + (proto_name(STATUS), True), + ]