X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fzx303proto.py;h=aff3405691baa596ca36ca90e00f7067880cef3f;hb=ca67cd29fc86054f08bcbf4995d484bab77a4e60;hp=e27eb5f72c985456b925594a30066973b252bb64;hpb=84861997657f7a8daab41aa13790981bd77749f8;p=loctrkd.git diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index e27eb5f..aff3405 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -19,6 +19,7 @@ from enum import Enum from inspect import isclass from struct import error, pack, unpack from time import time +from types import SimpleNamespace from typing import ( Any, Callable, @@ -31,6 +32,7 @@ from typing import ( Union, ) +from .common import CoordReport, HintReport, StatusReport from .protomodule import ProtoClass __all__ = ( @@ -42,12 +44,11 @@ __all__ = ( "proto_handled", "parse_message", "probe_buffer", - "proto_name", "DecodeError", "Respond", ) -PROTO_PREFIX = "ZX:" +PROTO_PREFIX: str = "ZX:" ### Deframer ### @@ -291,6 +292,11 @@ class GPS303Pkt(ProtoClass): # Overridden in subclasses, otherwise make empty payload return b"" + @classmethod + def proto_name(cls) -> str: + """Name of the command as used externally""" + return (PROTO_PREFIX + cls.__name__)[:16] + @property def packed(self) -> bytes: payload = self.encode() @@ -363,15 +369,17 @@ class _GPS_POSITIONING(GPS303Pkt): ttup = (tup[0] % 100,) + tup[1:6] return pack("BBBBBB", *ttup) - def rectified(self) -> Dict[str, Any]: # JSON-able dict - return { - "type": "location", - "devtime": str(self.devtime), - "speed": self.speed, - "direction": self.heading, - "latitude": self.latitude, - "longitude": self.longitude, - } + def rectified(self) -> CoordReport: # JSON-able dict + return CoordReport( + devtime=str(self.devtime), + battery_percentage=None, + accuracy=None, + altitude=None, + speed=self.speed, + direction=self.heading, + latitude=self.latitude, + longitude=self.longitude, + ) class GPS_POSITIONING(_GPS_POSITIONING): @@ -411,6 +419,9 @@ class STATUS(GPS303Pkt): def out_encode(self) -> bytes: # Set interval in minutes return pack("B", self.upload_interval) + def rectified(self) -> StatusReport: + return StatusReport(battery_percentage=self.batt) + class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep PROTO = 0x14 @@ -489,15 +500,15 @@ class _WIFI_POSITIONING(GPS303Pkt): ] ) - def rectified(self) -> Dict[str, Any]: # JSON-able dict - return { - "type": "approximate_location", - "devtime": str(self.devtime), - "mcc": self.mcc, - "mnc": self.mnc, - "base_stations": self.gsm_cells, - "wifi_aps": self.wifi_aps, - } + def rectified(self) -> HintReport: + return HintReport( + devtime=str(self.devtime), + battery_percentage=None, + mcc=self.mcc, + mnc=self.mnc, + gsm_cells=self.gsm_cells, + wifi_aps=[("", mac, sig) for mac, sig in self.wifi_aps], + ) class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): @@ -819,14 +830,8 @@ def proto_handled(proto: str) -> bool: return proto.startswith(PROTO_PREFIX) -def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str: - return PROTO_PREFIX + ( - obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__ - ) - - def proto_of_message(packet: bytes) -> str: - return proto_name(CLASSES.get(packet[1], UNKNOWN)) + return CLASSES.get(packet[1], UNKNOWN).proto_name() def imei_from_packet(packet: bytes) -> Optional[str]: @@ -886,7 +891,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: def exposed_protos() -> List[Tuple[str, bool]]: return [ - (proto_name(cls), cls.RESPOND is Respond.EXT) + (cls.proto_name(), cls.RESPOND is Respond.EXT) for cls in CLASSES.values() if hasattr(cls, "rectified") ]