X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fzmsg.py;h=b6faa7025a1df310422e6e51603878d9c01254b1;hb=bf48ccad4b4b91e7d7e09d1087f5953bc2db97d7;hp=0ccef007db6a0ddc5a9491b6d661ec89a63c1a57;hpb=38f814dde6ea7ff8b33d8f5de49fa8fb007b7e2a;p=loctrkd.git diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 0ccef00..b6faa70 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -1,35 +1,48 @@ """ Zeromq messages """ -from datetime import datetime, timezone -from json import dumps, loads import ipaddress as ip from struct import pack, unpack +from typing import Any, cast, Optional, Tuple, Type, Union -__all__ = "Bcast", "LocEvt", "Resp" +__all__ = "Bcast", "Resp", "topic" -def pack_peer(peeraddr): - try: +def pack_peer( # 18 bytes + peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] +) -> bytes: + if peeraddr is None: + addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0) + port = 0 + elif len(peeraddr) == 2: + peeraddr = cast(Tuple[str, int], peeraddr) + saddr, port = peeraddr + addr = ip.ip_address(saddr) + elif len(peeraddr) == 4: + peeraddr = cast(Tuple[str, int, Any, Any], peeraddr) saddr, port, _x, _y = peeraddr addr = ip.ip_address(saddr) - except ValueError: - saddr, port = peeraddr - a4 = ip.ip_address(saddr) - addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed) + if isinstance(addr, ip.IPv4Address): + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed) return addr.packed + pack("!H", port) -def unpack_peer(buffer): +def unpack_peer( + buffer: bytes, +) -> Tuple[str, int]: a6 = ip.IPv6Address(buffer[:16]) port = unpack("!H", buffer[16:])[0] - addr = a6.ipv4_mapped - if addr is None: - addr = a6 - return (addr, port) + a4 = a6.ipv4_mapped + if a4 is not None: + return (str(a4), port) + elif a6 == ip.IPv6Address("::"): + return ("", 0) + return (str(a6), port) class _Zmsg: - def __init__(self, *args, **kwargs): + KWARGS: Tuple[Tuple[str, Any], ...] + + def __init__(self, *args: Any, **kwargs: Any) -> None: if len(args) == 1: self.decode(args[0]) elif bool(kwargs): @@ -44,7 +57,7 @@ class _Zmsg: + str(kwargs) ) - def __repr__(self): + def __repr__(self) -> str: return "{}({})".format( self.__class__.__name__, ", ".join( @@ -60,28 +73,39 @@ class _Zmsg: ), ) - def __eq__(self, other): - return all( - [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] - ) + def __eq__(self, other: object) -> bool: + if isinstance(other, self.__class__): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + return NotImplemented - def decode(self, buffer): + def decode(self, buffer: bytes) -> None: raise NotImplementedError( self.__class__.__name__ + "must implement `decode()` method" ) @property - def packed(self): + def packed(self) -> bytes: raise NotImplementedError( self.__class__.__name__ + "must implement `packed()` property" ) +def topic( + proto: str, is_incoming: bool = True, imei: Optional[str] = None +) -> bytes: + return pack("B16s", is_incoming, proto.encode()) + ( + b"" if imei is None else pack("16s", imei.encode()) + ) + + class Bcast(_Zmsg): """Zmq message to broadcast what was received from the terminal""" KWARGS = ( - ("proto", 256), + ("is_incoming", True), + ("proto", "UNKNOWN"), ("imei", None), ("when", None), ("peeraddr", None), @@ -89,90 +113,56 @@ class Bcast(_Zmsg): ) @property - def packed(self): + def packed(self) -> bytes: return ( - pack("B", self.proto) - + ("0000000000000000" if self.imei is None else self.imei).encode() - + ( - b"\0\0\0\0\0\0\0\0" - if self.when is None - else pack("!d", self.when) + pack( + "!B16s16sd", + int(self.is_incoming), + self.proto[:16].ljust(16, "\0").encode(), + b"0000000000000000" + if self.imei is None + else self.imei.encode(), + 0 if self.when is None else self.when, ) + pack_peer(self.peeraddr) + self.packet ) - def decode(self, buffer): - self.proto = buffer[0] - self.imei = buffer[1:17].decode() - if self.imei == "0000000000000000": - self.imei = None - self.when = unpack("!d", buffer[17:25])[0] - self.peeraddr = unpack_peer(buffer[25:43]) - self.packet = buffer[43:] + def decode(self, buffer: bytes) -> None: + is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41]) + self.is_incoming = bool(is_incoming) + self.proto = proto.decode() + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + self.when = when + self.peeraddr = unpack_peer(buffer[41:59]) + self.packet = buffer[59:] class Resp(_Zmsg): """Zmq message received from a third party to send to the terminal""" - KWARGS = (("imei", None), ("packet", b"")) - - @property - def packed(self): - return ( - "0000000000000000" if self.imei is None else self.imei.encode() - ) + self.packet - - def decode(self, buffer): - self.imei = buffer[:16].decode() - self.packet = buffer[16:] + KWARGS = (("imei", None), ("when", None), ("packet", b"")) - -class LocEvt(_Zmsg): - """Zmq message with original or approximated location from lookaside""" - - KWARGS = ( - ("imei", "0000000000000000"), - ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)), - ("lat", 0.0), - ("lon", 0.0), - ("is_gps", True), - ) - - # This message is for external consumption, so use json encoding, - # except imei that forms 16 byte prefix that can be used as the - # topic to subscribe. @property - def packed(self): + def packed(self) -> bytes: return ( - self.imei.encode() - + dumps( - { - "devtime": str(self.devtime), - "latitude": self.lat, - "longitude": self.lon, - "is-gps": self.is_gps, - } - ).encode() + pack( + "!16sd", + "0000000000000000" + if self.imei is None + else self.imei.encode(), + 0 if self.when is None else self.when, + ) + + self.packet ) - # And this is full json that can be sent over websocket etc. - @property - def json(self): - return dumps( - { - "imei": self.imei, - "devtime": str(self.devtime), - "latitude": self.lat, - "longitude": self.lon, - "is-gps": self.is_gps, - } + def decode(self, buffer: bytes) -> None: + imei, when = unpack("!16sd", buffer[:24]) + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") ) - def decode(self, buffer): - self.imei = buffer[:16].decode() - json_data = loads(buffer[16:]) - self.devtime = datetime.fromisoformat(json_data["devtime"]) - self.lat = json_data["latitude"] - self.lon = json_data["longitude"] - self.is_gps = json_data["is-gps"] + self.when = when + self.packet = buffer[24:]