]> average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
rectifier: lookaside based on rectified objects
[loctrkd.git] / loctrkd / zx303proto.py
index 2c44a1382b1058b7b0a9e2f0325fde57c3b107a5..236f5daa16a1252f3203e660e1718370148465a5 100755 (executable)
@@ -19,6 +19,7 @@ from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
+from types import SimpleNamespace
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,55 +32,20 @@ from typing import (
     Union,
 )
 
     Union,
 )
 
+from .protomodule import ProtoClass
+
 __all__ = (
     "Stream",
     "class_by_prefix",
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
     "proto_handled",
     "parse_message",
     "probe_buffer",
     "inline_response",
     "proto_handled",
     "parse_message",
     "probe_buffer",
-    "proto_by_name",
     "proto_name",
     "DecodeError",
     "Respond",
     "proto_name",
     "DecodeError",
     "Respond",
-    "GPS303Pkt",
-    "UNKNOWN",
-    "LOGIN",
-    "SUPERVISION",
-    "HEARTBEAT",
-    "GPS_POSITIONING",
-    "GPS_OFFLINE_POSITIONING",
-    "STATUS",
-    "HIBERNATION",
-    "RESET",
-    "WHITELIST_TOTAL",
-    "WIFI_OFFLINE_POSITIONING",
-    "TIME",
-    "PROHIBIT_LBS",
-    "GPS_LBS_SWITCH_TIMES",
-    "REMOTE_MONITOR_PHONE",
-    "SOS_PHONE",
-    "DAD_PHONE",
-    "MOM_PHONE",
-    "STOP_UPLOAD",
-    "GPS_OFF_PERIOD",
-    "DND_PERIOD",
-    "RESTART_SHUTDOWN",
-    "DEVICE",
-    "ALARM_CLOCK",
-    "STOP_ALARM",
-    "SETUP",
-    "SYNCHRONOUS_WHITELIST",
-    "RESTORE_PASSWORD",
-    "WIFI_POSITIONING",
-    "MANUAL_POSITIONING",
-    "BATTERY_CHARGE",
-    "CHARGER_CONNECTED",
-    "CHARGER_DISCONNECTED",
-    "VIBRATION_RECEIVED",
-    "POSITION_UPLOAD_INTERVAL",
-    "SOS_ALARM",
-    "UNKNOWN_B3",
 )
 
 PROTO_PREFIX = "ZX:"
 )
 
 PROTO_PREFIX = "ZX:"
@@ -246,63 +212,13 @@ def l3int(x: Union[str, List[int]]) -> List[int]:
     return lx
 
 
     return lx
 
 
-class MetaPkt(type):
-    """
-    For each class corresponding to a message, automatically create
-    two nested classes `In` and `Out` that also inherit from their
-    "nest". Class attribute `IN_KWARGS` defined in the "nest" is
-    copied to the `In` nested class under the name `KWARGS`, and
-    likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
-    to the nested class `Out`. In addition, method `encode` is
-    defined in both classes equal to `in_encode()` and `out_encode()`
-    respectively.
-    """
-
-    if TYPE_CHECKING:
-
-        def __getattr__(self, name: str) -> Any:
-            pass
-
-        def __setattr__(self, name: str, value: Any) -> None:
-            pass
-
-    def __new__(
-        cls: Type["MetaPkt"],
-        name: str,
-        bases: Tuple[type, ...],
-        attrs: Dict[str, Any],
-    ) -> "MetaPkt":
-        newcls = super().__new__(cls, name, bases, attrs)
-        newcls.In = super().__new__(
-            cls,
-            name + ".In",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.IN_KWARGS,
-                "decode": newcls.in_decode,
-                "encode": newcls.in_encode,
-            },
-        )
-        newcls.Out = super().__new__(
-            cls,
-            name + ".Out",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.OUT_KWARGS,
-                "decode": newcls.out_decode,
-                "encode": newcls.out_encode,
-            },
-        )
-        return newcls
-
-
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
-class GPS303Pkt(metaclass=MetaPkt):
+class GPS303Pkt(ProtoClass):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
@@ -448,6 +364,16 @@ class _GPS_POSITIONING(GPS303Pkt):
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
+    def rectified(self) -> SimpleNamespace:  # JSON-able dict
+        return SimpleNamespace(
+            type="location",
+            devtime=str(self.devtime),
+            speed=self.speed,
+            direction=self.heading,
+            latitude=self.latitude,
+            longitude=self.longitude,
+        )
+
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
@@ -564,6 +490,16 @@ class _WIFI_POSITIONING(GPS303Pkt):
             ]
         )
 
             ]
         )
 
+    def rectified(self) -> SimpleNamespace:  # JSON-able dict
+        return SimpleNamespace(
+            type="approximate_location",
+            devtime=str(self.devtime),
+            mcc=self.mcc,
+            mnc=self.mnc,
+            base_stations=self.gsm_cells,
+            wifi_aps=self.wifi_aps,
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -864,14 +800,18 @@ if True:  # just to indent the code, sorry!
 
 def class_by_prefix(
     prefix: str,
 
 def class_by_prefix(
     prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+    if prefix.startswith(PROTO_PREFIX):
+        pname = prefix[len(PROTO_PREFIX) :]
+    else:
+        raise KeyError(pname)
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
-        return lst
+        return [name for name, _ in lst]
     _, proto = lst[0]
     return CLASSES[proto]
 
     _, proto = lst[0]
     return CLASSES[proto]
 
@@ -880,16 +820,12 @@ def proto_handled(proto: str) -> bool:
     return proto.startswith(PROTO_PREFIX)
 
 
     return proto.startswith(PROTO_PREFIX)
 
 
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
     return PROTO_PREFIX + (
         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
     )
 
 
     return PROTO_PREFIX + (
         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
     )
 
 
-def proto_by_name(name: str) -> int:
-    return PROTOS.get(name, -1)
-
-
 def proto_of_message(packet: bytes) -> str:
     return proto_name(CLASSES.get(packet[1], UNKNOWN))
 
 def proto_of_message(packet: bytes) -> str:
     return proto_name(CLASSES.get(packet[1], UNKNOWN))
 
@@ -947,3 +883,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (proto_name(cls), cls.RESPOND is Respond.EXT)
+        for cls in CLASSES.values()
+        if hasattr(cls, "rectified")
+    ]