X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fzx303proto.py;h=454214eb0c22d7f9e8713e7b367059f279886bb7;hb=5bd4b1bad2506f2f5f71161ac9e8a9b141e0fef4;hp=ebead92e0e28c88f64517547ed8ac1828fa5d301;hpb=63a086cf3956b93f760b1a0344afd757e0d0392f;p=loctrkd.git diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index ebead92..454214e 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -31,13 +31,17 @@ from typing import ( Union, ) +from .protomodule import ProtoClass + __all__ = ( "Stream", "class_by_prefix", + "enframe", + "exposed_protos", "inline_response", + "proto_handled", "parse_message", "probe_buffer", - "proto_by_name", "proto_name", "DecodeError", "Respond", @@ -92,10 +96,6 @@ class Stream: def __init__(self) -> None: self.buffer = b"" - @staticmethod - def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes: - return b"xx" + buffer + b"\r\n" - def recv(self, segment: bytes) -> List[Union[bytes, str]]: """ Process next segment of the stream. Return successfully deframed @@ -150,6 +150,10 @@ class Stream: return ret +def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes: + return b"xx" + buffer + b"\r\n" + + ### Parser/Constructor ### @@ -245,63 +249,13 @@ def l3int(x: Union[str, List[int]]) -> List[int]: return lx -class MetaPkt(type): - """ - For each class corresponding to a message, automatically create - two nested classes `In` and `Out` that also inherit from their - "nest". Class attribute `IN_KWARGS` defined in the "nest" is - copied to the `In` nested class under the name `KWARGS`, and - likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS` - to the nested class `Out`. In addition, method `encode` is - defined in both classes equal to `in_encode()` and `out_encode()` - respectively. - """ - - if TYPE_CHECKING: - - def __getattr__(self, name: str) -> Any: - pass - - def __setattr__(self, name: str, value: Any) -> None: - pass - - def __new__( - cls: Type["MetaPkt"], - name: str, - bases: Tuple[type, ...], - attrs: Dict[str, Any], - ) -> "MetaPkt": - newcls = super().__new__(cls, name, bases, attrs) - newcls.In = super().__new__( - cls, - name + ".In", - (newcls,) + bases, - { - "KWARGS": newcls.IN_KWARGS, - "decode": newcls.in_decode, - "encode": newcls.in_encode, - }, - ) - newcls.Out = super().__new__( - cls, - name + ".Out", - (newcls,) + bases, - { - "KWARGS": newcls.OUT_KWARGS, - "decode": newcls.out_decode, - "encode": newcls.out_encode, - }, - ) - return newcls - - class Respond(Enum): NON = 0 # Incoming, no response needed INL = 1 # Birirectional, use `inline_response()` EXT = 2 # Birirectional, use external responder -class GPS303Pkt(metaclass=MetaPkt): +class GPS303Pkt(ProtoClass): RESPOND = Respond.NON # Do not send anything back by default PROTO: int IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = () @@ -863,28 +817,32 @@ if True: # just to indent the code, sorry! def class_by_prefix( prefix: str, -) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]: +) -> Union[Type[GPS303Pkt], List[str]]: + if prefix.startswith(PROTO_PREFIX): + pname = prefix[len(PROTO_PREFIX) :] + else: + raise KeyError(pname) lst = [ (name, proto) for name, proto in PROTOS.items() if name.upper().startswith(prefix.upper()) ] if len(lst) != 1: - return lst + return [name for name, _ in lst] _, proto = lst[0] return CLASSES[proto] -def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str: +def proto_handled(proto: str) -> bool: + return proto.startswith(PROTO_PREFIX) + + +def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str: return PROTO_PREFIX + ( obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__ ) -def proto_by_name(name: str) -> int: - return PROTOS.get(name, -1) - - def proto_of_message(packet: bytes) -> str: return proto_name(CLASSES.get(packet[1], UNKNOWN)) @@ -942,3 +900,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: retobj.PROTO = proto # Override class attr with object attr retobj.cause = cause return retobj + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [ + (proto_name(GPS_POSITIONING), True), + (proto_name(WIFI_POSITIONING), False), + (proto_name(STATUS), True), + ]