1 """ Zeromq messages """
4 from struct import pack, unpack
5 from typing import Any, cast, Optional, Tuple, Type, Union
7 __all__ = "Bcast", "Resp", "topic", "rtopic"
10 def pack_peer( # 18 bytes
11 peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
14 addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
16 elif len(peeraddr) == 2:
17 peeraddr = cast(Tuple[str, int], peeraddr)
18 saddr, port = peeraddr
19 addr = ip.ip_address(saddr)
20 elif len(peeraddr) == 4:
21 peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
22 saddr, port, _x, _y = peeraddr
23 addr = ip.ip_address(saddr)
24 if isinstance(addr, ip.IPv4Address):
25 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
26 return addr.packed + pack("!H", port)
32 a6 = ip.IPv6Address(buffer[:16])
33 port = unpack("!H", buffer[16:])[0]
36 return (str(a4), port)
37 elif a6 == ip.IPv6Address("::"):
39 return (str(a6), port)
43 KWARGS: Tuple[Tuple[str, Any], ...]
45 def __init__(self, *args: Any, **kwargs: Any) -> None:
49 for k, v in self.KWARGS:
50 setattr(self, k, kwargs.get(k, v))
53 self.__class__.__name__
60 def __repr__(self) -> str:
61 return "{}({})".format(
62 self.__class__.__name__,
67 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
68 if isinstance(getattr(self, k), bytes)
69 else getattr(self, k),
71 for k, _ in self.KWARGS
76 def __eq__(self, other: object) -> bool:
77 if isinstance(other, self.__class__):
79 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
83 def decode(self, buffer: bytes) -> None:
84 raise NotImplementedError(
85 self.__class__.__name__ + "must implement `decode()` method"
89 def packed(self) -> bytes:
90 raise NotImplementedError(
91 self.__class__.__name__ + "must implement `packed()` property"
96 proto: str, is_incoming: bool = True, imei: Optional[str] = None
98 return pack("B16s", is_incoming, proto.encode()) + (
99 b"" if imei is None else pack("16s", imei.encode())
103 def rtopic(imei: str) -> bytes:
104 return pack("16s", imei.encode())
108 """Zmq message to broadcast what was received from the terminal"""
111 ("is_incoming", True),
112 ("proto", "UNKNOWN"),
121 def packed(self) -> bytes:
125 int(self.is_incoming),
126 self.proto[:16].ljust(16, "\0").encode(),
129 else self.imei.encode(),
130 0 if self.when is None else self.when,
133 else self.pmod.encode(),
135 + pack_peer(self.peeraddr)
139 def decode(self, buffer: bytes) -> None:
140 is_incoming, proto, imei, when, pmod = unpack(
141 "!B16s16sd16s", buffer[:57]
143 self.is_incoming = bool(is_incoming)
144 self.proto = proto.decode().rstrip("\0")
146 None if imei == b"0000000000000000" else imei.decode().strip("\0")
150 None if pmod == b" " else pmod.decode().strip("\0")
152 self.peeraddr = unpack_peer(buffer[57:75])
153 self.packet = buffer[75:]
157 """Zmq message received from a third party to send to the terminal"""
159 KWARGS = (("imei", None), ("when", None), ("packet", b""))
162 def packed(self) -> bytes:
168 else self.imei.encode(),
169 0 if self.when is None else self.when,
174 def decode(self, buffer: bytes) -> None:
175 imei, when = unpack("!16sd", buffer[:24])
177 None if imei == b"0000000000000000" else imei.decode().strip("\0")
181 self.packet = buffer[24:]
185 """Broadcast zmq message with "rectified" proto-agnostic json data"""
187 KWARGS = (("imei", None), ("payload", ""))
190 def packed(self) -> bytes:
196 else self.imei.encode(),
198 + self.payload.encode()
201 def decode(self, buffer: bytes) -> None:
204 None if imei == b"0000000000000000" else imei.decode().strip("\0")
206 self.payload = buffer[16:].decode()