from inspect import isclass
from struct import error, pack, unpack
from time import time
+from types import SimpleNamespace
from typing import (
Any,
Callable,
Union,
)
+from .common import CoordReport, HintReport, StatusReport
+from .protomodule import ProtoClass
+
__all__ = (
"Stream",
"class_by_prefix",
+ "enframe",
+ "exposed_protos",
"inline_response",
+ "proto_handled",
"parse_message",
"probe_buffer",
- "proto_by_name",
- "proto_name",
"DecodeError",
"Respond",
- "GPS303Pkt",
- "UNKNOWN",
- "LOGIN",
- "SUPERVISION",
- "HEARTBEAT",
- "GPS_POSITIONING",
- "GPS_OFFLINE_POSITIONING",
- "STATUS",
- "HIBERNATION",
- "RESET",
- "WHITELIST_TOTAL",
- "WIFI_OFFLINE_POSITIONING",
- "TIME",
- "PROHIBIT_LBS",
- "GPS_LBS_SWITCH_TIMES",
- "REMOTE_MONITOR_PHONE",
- "SOS_PHONE",
- "DAD_PHONE",
- "MOM_PHONE",
- "STOP_UPLOAD",
- "GPS_OFF_PERIOD",
- "DND_PERIOD",
- "RESTART_SHUTDOWN",
- "DEVICE",
- "ALARM_CLOCK",
- "STOP_ALARM",
- "SETUP",
- "SYNCHRONOUS_WHITELIST",
- "RESTORE_PASSWORD",
- "WIFI_POSITIONING",
- "MANUAL_POSITIONING",
- "BATTERY_CHARGE",
- "CHARGER_CONNECTED",
- "CHARGER_DISCONNECTED",
- "VIBRATION_RECEIVED",
- "POSITION_UPLOAD_INTERVAL",
- "SOS_ALARM",
- "UNKNOWN_B3",
)
-PROTO_PREFIX = "ZX"
+PROTO_PREFIX: str = "ZX:"
### Deframer ###
def __init__(self) -> None:
self.buffer = b""
- @staticmethod
- def enframe(buffer: bytes) -> bytes:
- return b"xx" + buffer + b"\r\n"
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
"""
Process next segment of the stream. Return successfully deframed
return ret
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+
### Parser/Constructor ###
return lx
-class MetaPkt(type):
- """
- For each class corresponding to a message, automatically create
- two nested classes `In` and `Out` that also inherit from their
- "nest". Class attribute `IN_KWARGS` defined in the "nest" is
- copied to the `In` nested class under the name `KWARGS`, and
- likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
- to the nested class `Out`. In addition, method `encode` is
- defined in both classes equal to `in_encode()` and `out_encode()`
- respectively.
- """
-
- if TYPE_CHECKING:
-
- def __getattr__(self, name: str) -> Any:
- pass
-
- def __setattr__(self, name: str, value: Any) -> None:
- pass
-
- def __new__(
- cls: Type["MetaPkt"],
- name: str,
- bases: Tuple[type, ...],
- attrs: Dict[str, Any],
- ) -> "MetaPkt":
- newcls = super().__new__(cls, name, bases, attrs)
- newcls.In = super().__new__(
- cls,
- name + ".In",
- (newcls,) + bases,
- {
- "KWARGS": newcls.IN_KWARGS,
- "decode": newcls.in_decode,
- "encode": newcls.in_encode,
- },
- )
- newcls.Out = super().__new__(
- cls,
- name + ".Out",
- (newcls,) + bases,
- {
- "KWARGS": newcls.OUT_KWARGS,
- "decode": newcls.out_decode,
- "encode": newcls.out_encode,
- },
- )
- return newcls
-
-
class Respond(Enum):
NON = 0 # Incoming, no response needed
INL = 1 # Birirectional, use `inline_response()`
EXT = 2 # Birirectional, use external responder
-class GPS303Pkt(metaclass=MetaPkt):
+class GPS303Pkt(ProtoClass):
RESPOND = Respond.NON # Do not send anything back by default
PROTO: int
IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
# Overridden in subclasses, otherwise make empty payload
return b""
+ @classmethod
+ def proto_name(cls) -> str:
+ """Name of the command as used externally"""
+ return (PROTO_PREFIX + cls.__name__)[:16]
+
@property
def packed(self) -> bytes:
payload = self.encode()
ttup = (tup[0] % 100,) + tup[1:6]
return pack("BBBBBB", *ttup)
+ def rectified(self) -> CoordReport: # JSON-able dict
+ return CoordReport(
+ devtime=str(self.devtime),
+ battery_percentage=None,
+ accuracy=None,
+ altitude=None,
+ speed=self.speed,
+ direction=self.heading,
+ latitude=self.latitude,
+ longitude=self.longitude,
+ )
+
class GPS_POSITIONING(_GPS_POSITIONING):
PROTO = 0x10
def out_encode(self) -> bytes: # Set interval in minutes
return pack("B", self.upload_interval)
+ def rectified(self) -> StatusReport:
+ return StatusReport(battery_percentage=self.batt)
+
class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
PROTO = 0x14
]
)
+ def rectified(self) -> HintReport:
+ return HintReport(
+ devtime=str(self.devtime),
+ battery_percentage=None,
+ mcc=self.mcc,
+ mnc=self.mnc,
+ gsm_cells=self.gsm_cells,
+ wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
+ )
+
class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x17
def class_by_prefix(
prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+ if prefix.startswith(PROTO_PREFIX):
+ pname = prefix[len(PROTO_PREFIX) :]
+ else:
+ raise KeyError(pname)
lst = [
(name, proto)
for name, proto in PROTOS.items()
if name.upper().startswith(prefix.upper())
]
if len(lst) != 1:
- return lst
+ return [name for name, _ in lst]
_, proto = lst[0]
return CLASSES[proto]
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
- return (
- PROTO_PREFIX
- + ":"
- + (
- obj.__class__.__name__
- if isinstance(obj, GPS303Pkt)
- else obj.__name__
- )
- ).ljust(16, "\0")[:16]
-
-
-def proto_by_name(name: str) -> int:
- return PROTOS.get(name, -1)
+def proto_handled(proto: str) -> bool:
+ return proto.startswith(PROTO_PREFIX)
def proto_of_message(packet: bytes) -> str:
- return proto_name(CLASSES.get(packet[1], UNKNOWN))
+ return CLASSES.get(packet[1], UNKNOWN).proto_name()
def imei_from_packet(packet: bytes) -> Optional[str]:
retobj.PROTO = proto # Override class attr with object attr
retobj.cause = cause
return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+ return [
+ (cls.proto_name(), cls.RESPOND is Respond.EXT)
+ for cls in CLASSES.values()
+ if hasattr(cls, "rectified")
+ ]