1 """ Zeromq messages """
4 from struct import pack, unpack
5 from typing import Any, cast, Optional, Tuple, Type, Union
7 __all__ = "Bcast", "Resp", "topic"
11 peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
14 addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
16 elif len(peeraddr) == 2:
17 peeraddr = cast(Tuple[str, int], peeraddr)
18 saddr, port = peeraddr
19 addr = ip.ip_address(saddr)
20 elif len(peeraddr) == 4:
21 peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
22 saddr, port, _x, _y = peeraddr
23 addr = ip.ip_address(saddr)
24 if isinstance(addr, ip.IPv4Address):
25 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
26 return addr.packed + pack("!H", port)
32 a6 = ip.IPv6Address(buffer[:16])
33 port = unpack("!H", buffer[16:])[0]
36 return (str(a4), port)
37 elif a6 == ip.IPv6Address("::"):
39 return (str(a6), port)
43 KWARGS: Tuple[Tuple[str, Any], ...]
45 def __init__(self, *args: Any, **kwargs: Any) -> None:
49 for k, v in self.KWARGS:
50 setattr(self, k, kwargs.get(k, v))
53 self.__class__.__name__
60 def __repr__(self) -> str:
61 return "{}({})".format(
62 self.__class__.__name__,
67 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
68 if isinstance(getattr(self, k), bytes)
69 else getattr(self, k),
71 for k, _ in self.KWARGS
76 def __eq__(self, other: object) -> bool:
77 if isinstance(other, self.__class__):
79 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
83 def decode(self, buffer: bytes) -> None:
84 raise NotImplementedError(
85 self.__class__.__name__ + "must implement `decode()` method"
89 def packed(self) -> bytes:
90 raise NotImplementedError(
91 self.__class__.__name__ + "must implement `packed()` property"
96 proto: int, is_incoming: bool = True, imei: Optional[str] = None
98 return pack("BB", is_incoming, proto) + (
99 b"" if imei is None else pack("16s", imei.encode())
104 """Zmq message to broadcast what was received from the terminal"""
107 ("is_incoming", True),
116 def packed(self) -> bytes:
120 int(self.is_incoming),
124 else self.imei.encode(),
129 else pack("!d", self.when)
131 + pack_peer(self.peeraddr)
135 def decode(self, buffer: bytes) -> None:
136 self.is_incoming = bool(buffer[0])
137 self.proto = buffer[1]
138 self.imei: Optional[str] = buffer[2:18].decode()
139 if self.imei == "0000000000000000":
141 self.when = unpack("!d", buffer[18:26])[0]
142 self.peeraddr = unpack_peer(buffer[26:44])
143 self.packet = buffer[44:]
147 """Zmq message received from a third party to send to the terminal"""
149 KWARGS = (("imei", None), ("when", None), ("packet", b""))
152 def packed(self) -> bytes:
158 else self.imei.encode(),
163 else pack("!d", self.when)
168 def decode(self, buffer: bytes) -> None:
169 self.imei = buffer[:16].decode()
170 self.when = unpack("!d", buffer[16:24])[0]
171 self.packet = buffer[24:]