1 """ Zeromq messages """
4 from struct import pack, unpack
5 from typing import Any, cast, Optional, Tuple, Type, Union
7 __all__ = "Bcast", "Resp", "topic", "rtopic"
10 def pack_peer( # 18 bytes
11 peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
14 addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
16 elif len(peeraddr) == 2:
17 peeraddr = cast(Tuple[str, int], peeraddr)
18 saddr, port = peeraddr
19 addr = ip.ip_address(saddr)
20 elif len(peeraddr) == 4:
21 peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
22 saddr, port, _x, _y = peeraddr
23 addr = ip.ip_address(saddr)
24 if isinstance(addr, ip.IPv4Address):
25 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
26 return addr.packed + pack("!H", port)
32 a6 = ip.IPv6Address(buffer[:16])
33 port = unpack("!H", buffer[16:])[0]
36 return (str(a4), port)
37 elif a6 == ip.IPv6Address("::"):
39 return (str(a6), port)
43 KWARGS: Tuple[Tuple[str, Any], ...]
45 def __init__(self, *args: Any, **kwargs: Any) -> None:
49 for k, v in self.KWARGS:
50 setattr(self, k, kwargs.get(k, v))
53 self.__class__.__name__
60 def __repr__(self) -> str:
61 return "{}({})".format(
62 self.__class__.__name__,
67 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
68 if isinstance(getattr(self, k), bytes)
69 else getattr(self, k),
71 for k, _ in self.KWARGS
76 def __eq__(self, other: object) -> bool:
77 if isinstance(other, self.__class__):
79 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
83 def decode(self, buffer: bytes) -> None:
84 raise NotImplementedError(
85 self.__class__.__name__ + "must implement `decode()` method"
89 def packed(self) -> bytes:
90 raise NotImplementedError(
91 self.__class__.__name__ + "must implement `packed()` property"
96 proto: str, is_incoming: bool = True, imei: Optional[str] = None
98 return pack("B16s", is_incoming, proto.encode()) + (
99 b"" if imei is None else pack("16s", imei.encode())
103 def rtopic(imei: str) -> bytes:
104 return pack("16s", imei.encode())
108 """Zmq message to broadcast what was received from the terminal"""
111 ("is_incoming", True),
112 ("proto", "UNKNOWN"),
120 def packed(self) -> bytes:
124 int(self.is_incoming),
125 self.proto[:16].ljust(16, "\0").encode(),
128 else self.imei.encode(),
129 0 if self.when is None else self.when,
131 + pack_peer(self.peeraddr)
135 def decode(self, buffer: bytes) -> None:
136 is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
137 self.is_incoming = bool(is_incoming)
138 self.proto = proto.decode().rstrip("\0")
140 None if imei == b"0000000000000000" else imei.decode().strip("\0")
143 self.peeraddr = unpack_peer(buffer[41:59])
144 self.packet = buffer[59:]
148 """Zmq message received from a third party to send to the terminal"""
150 KWARGS = (("imei", None), ("when", None), ("packet", b""))
153 def packed(self) -> bytes:
159 else self.imei.encode(),
160 0 if self.when is None else self.when,
165 def decode(self, buffer: bytes) -> None:
166 imei, when = unpack("!16sd", buffer[:24])
168 None if imei == b"0000000000000000" else imei.decode().strip("\0")
172 self.packet = buffer[24:]
176 """Broadcast Zzmq message with "rectified" proto-agnostic json data"""
178 KWARGS = (("imei", None), ("payload", ""))
181 def packed(self) -> bytes:
187 else self.imei.encode(),
189 + self.payload.encode()
192 def decode(self, buffer: bytes) -> None:
195 None if imei == b"0000000000000000" else imei.decode().strip("\0")
197 self.payload = buffer[16:].decode()