1 """ Zeromq messages """
4 from struct import pack, unpack
6 __all__ = "Bcast", "Resp"
9 def pack_peer(peeraddr):
10 saddr, port, _x, _y = peeraddr
11 addr6 = ip.ip_address(saddr)
12 addr = addr6.ipv4_mapped
16 pack("B", addr.version)
17 + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16]
22 def unpack_peer(buffer):
24 if version not in (4, 6):
27 addr = ip.IPv4Address(buffer[1:5])
29 addr = ip.IPv6Address(buffer[1:17])
30 port = unpack("!H", buffer[17:19])[0]
35 def __init__(self, *args, **kwargs):
39 for k, v in self.KWARGS:
40 setattr(self, k, kwargs.get(k, v))
43 self.__class__.__name__
51 return "{}({})".format(
52 self.__class__.__name__,
57 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
58 if isinstance(getattr(self, k), bytes)
59 else getattr(self, k),
61 for k, _ in self.KWARGS
66 def decode(self, buffer):
68 self.__class__.__name__ + "must implement `encode()` method"
74 self.__class__.__name__ + "must implement `encode()` method"
79 """Zmq message to broadcast what was received from the terminal"""
93 + ("0000000000000000" if self.imei is None else self.imei).encode()
97 else pack("!d", self.when)
99 + pack_peer(self.peeraddr)
103 def decode(self, buffer):
104 self.proto = buffer[0]
105 self.imei = buffer[1:17].decode()
106 if self.imei == "0000000000000000":
108 self.when = unpack("!d", buffer[17:25])[0]
109 self.peeraddr = unpack_peer(buffer[25:44])
110 self.packet = buffer[44:]
114 """Zmq message received from a third party to send to the terminal"""
116 KWARGS = (("imei", None), ("packet", b""))
121 "0000000000000000" if self.imei is None else self.imei.encode()
124 def decode(self, buffer):
125 self.imei = buffer[:16].decode()
126 self.packet = buffer[16:]