X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fzmsg.py;h=73a4f801aa273079540ef9a83441b5d166feecbb;hb=b84a40a485b0563d572d14e748ad324185584344;hp=7ab3ce4caf755271bd91531cc81f9103240d8b35;hpb=3dea189c7bb47f02db07b52fdcda53fdb986fd2b;p=loctrkd.git diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 7ab3ce4..73a4f80 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -2,31 +2,47 @@ import ipaddress as ip from struct import pack, unpack - -__all__ = "Bcast", "Resp" - -def pack_peer(peeraddr): - saddr, port, _x, _y = peeraddr - addr6 = ip.ip_address(saddr) - addr = addr6.ipv4_mapped - if addr is None: - addr = addr6 - return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port) - -def unpack_peer(buffer): - version = buffer[0] - if version not in (4, 6): - return None - if version == 4: - addr = ip.IPv4Address(buffer[1:5]) - else: - addr = ip.IPv6Address(buffer[1:17]) - port = unpack("!H", buffer[17:19])[0] - return (addr, port) +from typing import Any, cast, Optional, Tuple, Type, Union + +__all__ = "Bcast", "Resp", "topic" + + +def pack_peer( + peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] +) -> bytes: + if peeraddr is None: + addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0) + port = 0 + elif len(peeraddr) == 2: + peeraddr = cast(Tuple[str, int], peeraddr) + saddr, port = peeraddr + addr = ip.ip_address(saddr) + elif len(peeraddr) == 4: + peeraddr = cast(Tuple[str, int, Any, Any], peeraddr) + saddr, port, _x, _y = peeraddr + addr = ip.ip_address(saddr) + if isinstance(addr, ip.IPv4Address): + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed) + return addr.packed + pack("!H", port) + + +def unpack_peer( + buffer: bytes, +) -> Tuple[str, int]: + a6 = ip.IPv6Address(buffer[:16]) + port = unpack("!H", buffer[16:])[0] + a4 = a6.ipv4_mapped + if a4 is not None: + return (str(a4), port) + elif a6 == ip.IPv6Address("::"): + return ("", 0) + return (str(a6), port) class _Zmsg: - def __init__(self, *args, **kwargs): + KWARGS: Tuple[Tuple[str, Any], ...] + + def __init__(self, *args: Any, **kwargs: Any) -> None: if len(args) == 1: self.decode(args[0]) elif bool(kwargs): @@ -41,22 +57,54 @@ class _Zmsg: + str(kwargs) ) - def decode(self, buffer): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + def __repr__(self) -> str: + return "{}({})".format( + self.__class__.__name__, + ", ".join( + [ + "{}={}".format( + k, + 'bytes.fromhex("{}")'.format(getattr(self, k).hex()) + if isinstance(getattr(self, k), bytes) + else getattr(self, k), + ) + for k, _ in self.KWARGS + ] + ), + ) + + def __eq__(self, other: object) -> bool: + if isinstance(other, self.__class__): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + return NotImplemented + + def decode(self, buffer: bytes) -> None: + raise NotImplementedError( + self.__class__.__name__ + "must implement `decode()` method" ) @property - def packed(self): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + def packed(self) -> bytes: + raise NotImplementedError( + self.__class__.__name__ + "must implement `packed()` property" ) +def topic( + proto: int, is_incoming: bool = True, imei: Optional[str] = None +) -> bytes: + return pack("BB", is_incoming, proto) + ( + b"" if imei is None else pack("16s", imei.encode()) + ) + + class Bcast(_Zmsg): """Zmq message to broadcast what was received from the terminal""" KWARGS = ( + ("is_incoming", True), ("proto", 256), ("imei", None), ("when", None), @@ -65,36 +113,59 @@ class Bcast(_Zmsg): ) @property - def packed(self): + def packed(self) -> bytes: return ( - pack("B", self.proto) - + ("0000000000000000" if self.imei is None else self.imei).encode() - + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when)) + pack( + "BB16s", + int(self.is_incoming), + self.proto, + "0000000000000000" + if self.imei is None + else self.imei.encode(), + ) + + ( + b"\0\0\0\0\0\0\0\0" + if self.when is None + else pack("!d", self.when) + ) + pack_peer(self.peeraddr) + self.packet ) - def decode(self, buffer): - self.proto = buffer[0] - self.imei = buffer[1:17].decode() + def decode(self, buffer: bytes) -> None: + self.is_incoming = bool(buffer[0]) + self.proto = buffer[1] + self.imei: Optional[str] = buffer[2:18].decode() if self.imei == "0000000000000000": self.imei = None - self.when = unpack("!d", buffer[17:25])[0] - self.peeraddr = unpack_peer(buffer[25:44]) + self.when = unpack("!d", buffer[18:26])[0] + self.peeraddr = unpack_peer(buffer[26:44]) self.packet = buffer[44:] class Resp(_Zmsg): """Zmq message received from a third party to send to the terminal""" - KWARGS = (("imei", None), ("packet", b"")) + KWARGS = (("imei", None), ("when", None), ("packet", b"")) @property - def packed(self): + def packed(self) -> bytes: return ( - "0000000000000000" if self.imei is None else self.imei.encode() - ) + self.packet + pack( + "16s", + "0000000000000000" + if self.imei is None + else self.imei.encode(), + ) + + ( + b"\0\0\0\0\0\0\0\0" + if self.when is None + else pack("!d", self.when) + ) + + self.packet + ) - def decode(self, buffer): + def decode(self, buffer: bytes) -> None: self.imei = buffer[:16].decode() - self.packet = buffer[16:] + self.when = unpack("!d", buffer[16:24])[0] + self.packet = buffer[24:]