X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fzmsg.py;fp=loctrkd%2Fzmsg.py;h=b6faa7025a1df310422e6e51603878d9c01254b1;hb=dbdf9d63af31770ad57302e16b17a2fdc526773f;hp=0000000000000000000000000000000000000000;hpb=bf48ccad4b4b91e7d7e09d1087f5953bc2db97d7;p=loctrkd.git diff --git a/loctrkd/zmsg.py b/loctrkd/zmsg.py new file mode 100644 index 0000000..b6faa70 --- /dev/null +++ b/loctrkd/zmsg.py @@ -0,0 +1,168 @@ +""" Zeromq messages """ + +import ipaddress as ip +from struct import pack, unpack +from typing import Any, cast, Optional, Tuple, Type, Union + +__all__ = "Bcast", "Resp", "topic" + + +def pack_peer( # 18 bytes + peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] +) -> bytes: + if peeraddr is None: + addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0) + port = 0 + elif len(peeraddr) == 2: + peeraddr = cast(Tuple[str, int], peeraddr) + saddr, port = peeraddr + addr = ip.ip_address(saddr) + elif len(peeraddr) == 4: + peeraddr = cast(Tuple[str, int, Any, Any], peeraddr) + saddr, port, _x, _y = peeraddr + addr = ip.ip_address(saddr) + if isinstance(addr, ip.IPv4Address): + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed) + return addr.packed + pack("!H", port) + + +def unpack_peer( + buffer: bytes, +) -> Tuple[str, int]: + a6 = ip.IPv6Address(buffer[:16]) + port = unpack("!H", buffer[16:])[0] + a4 = a6.ipv4_mapped + if a4 is not None: + return (str(a4), port) + elif a6 == ip.IPv6Address("::"): + return ("", 0) + return (str(a6), port) + + +class _Zmsg: + KWARGS: Tuple[Tuple[str, Any], ...] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + if len(args) == 1: + self.decode(args[0]) + elif bool(kwargs): + for k, v in self.KWARGS: + setattr(self, k, kwargs.get(k, v)) + else: + raise RuntimeError( + self.__class__.__name__ + + ": both args " + + str(args) + + " and kwargs " + + str(kwargs) + ) + + def __repr__(self) -> str: + return "{}({})".format( + self.__class__.__name__, + ", ".join( + [ + "{}={}".format( + k, + 'bytes.fromhex("{}")'.format(getattr(self, k).hex()) + if isinstance(getattr(self, k), bytes) + else getattr(self, k), + ) + for k, _ in self.KWARGS + ] + ), + ) + + def __eq__(self, other: object) -> bool: + if isinstance(other, self.__class__): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + return NotImplemented + + def decode(self, buffer: bytes) -> None: + raise NotImplementedError( + self.__class__.__name__ + "must implement `decode()` method" + ) + + @property + def packed(self) -> bytes: + raise NotImplementedError( + self.__class__.__name__ + "must implement `packed()` property" + ) + + +def topic( + proto: str, is_incoming: bool = True, imei: Optional[str] = None +) -> bytes: + return pack("B16s", is_incoming, proto.encode()) + ( + b"" if imei is None else pack("16s", imei.encode()) + ) + + +class Bcast(_Zmsg): + """Zmq message to broadcast what was received from the terminal""" + + KWARGS = ( + ("is_incoming", True), + ("proto", "UNKNOWN"), + ("imei", None), + ("when", None), + ("peeraddr", None), + ("packet", b""), + ) + + @property + def packed(self) -> bytes: + return ( + pack( + "!B16s16sd", + int(self.is_incoming), + self.proto[:16].ljust(16, "\0").encode(), + b"0000000000000000" + if self.imei is None + else self.imei.encode(), + 0 if self.when is None else self.when, + ) + + pack_peer(self.peeraddr) + + self.packet + ) + + def decode(self, buffer: bytes) -> None: + is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41]) + self.is_incoming = bool(is_incoming) + self.proto = proto.decode() + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + self.when = when + self.peeraddr = unpack_peer(buffer[41:59]) + self.packet = buffer[59:] + + +class Resp(_Zmsg): + """Zmq message received from a third party to send to the terminal""" + + KWARGS = (("imei", None), ("when", None), ("packet", b"")) + + @property + def packed(self) -> bytes: + return ( + pack( + "!16sd", + "0000000000000000" + if self.imei is None + else self.imei.encode(), + 0 if self.when is None else self.when, + ) + + self.packet + ) + + def decode(self, buffer: bytes) -> None: + imei, when = unpack("!16sd", buffer[:24]) + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + + self.when = when + self.packet = buffer[24:]