X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=loctrkd%2Fbeesure.py;h=e782c736189ca5d74a3f0cc188c7973672660c13;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hp=9e9e22b65667c3494338efd242ce19d33ad58321;hpb=42613c3084ae1371ce1141b0d66f8af5ce0ade39;p=loctrkd.git diff --git a/loctrkd/beesure.py b/loctrkd/beesure.py index 9e9e22b..e782c73 100755 --- a/loctrkd/beesure.py +++ b/loctrkd/beesure.py @@ -43,6 +43,7 @@ __all__ = ( "Respond", ) +MODNAME = __name__.split(".")[-1] PROTO_PREFIX = "BS:" ### Deframer ### @@ -116,7 +117,7 @@ class Stream: else: msgs.append( f"Packet does not end with ']'" - f" at {self.datalen+20}: {self.buffer=!r}" + f" at {self.datalen+20}: {self.buffer[:64]=!r}" ) self.buffer = self.buffer[self.datalen + 21 :] self.datalen = 0 @@ -380,12 +381,12 @@ class _LOC_DATA(BeeSurePkt): self.latitude = p.lat * p.nors self.longitude = p.lon * p.eorw - def rectified(self) -> Report: + def rectified(self) -> Tuple[str, Report]: # self.gps_valid is supposed to mean it, but it does not. Perfectly # good looking coordinates, with ten satellites, still get 'V'. # I suspect that in reality, 'A' means "hint data is absent". if self.gps_valid or self.num_of_sats > 3: - return CoordReport( + return MODNAME, CoordReport( devtime=str(self.devtime), battery_percentage=self.battery_percentage, accuracy=self.positioning_accuracy, @@ -396,7 +397,7 @@ class _LOC_DATA(BeeSurePkt): longitude=self.longitude, ) else: - return HintReport( + return MODNAME, HintReport( devtime=str(self.devtime), battery_percentage=self.battery_percentage, mcc=self.mcc, @@ -602,8 +603,15 @@ def proto_handled(proto: str) -> bool: return proto.startswith(PROTO_PREFIX) +def _local_proto(packet: bytes) -> str: + try: + return packet[20:-1].split(b",")[0].decode() + except UnicodeDecodeError: + return "UNKNOWN" + + def proto_of_message(packet: bytes) -> str: - return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode() + return PROTO_PREFIX + _local_proto(packet) def imei_from_packet(packet: bytes) -> Optional[str]: @@ -618,7 +626,7 @@ def is_goodbye_packet(packet: bytes) -> bool: def inline_response(packet: bytes) -> Optional[bytes]: - proto = packet[20:-1].split(b",")[0].decode() + proto = _local_proto(packet) if proto in CLASSES: cls = CLASSES[proto] if cls.RESPOND is Respond.INL: @@ -672,3 +680,13 @@ def exposed_protos() -> List[Tuple[str, bool]]: for cls in CLASSES.values() if hasattr(cls, "rectified") ] + + +def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[BeeSurePkt]: + if cmd == "poweroff": + return POWEROFF.Out() + elif cmd == "refresh": + return MONITOR.Out() + elif cmd == "message": + return MESSAGE.Out(message=kwargs.get("txt", "Hello")) + return None