1 """ Zeromq messages """
4 from struct import pack, unpack
6 __all__ = "Bcast", "Resp"
9 def pack_peer(peeraddr):
11 saddr, port, _x, _y = peeraddr
12 addr = ip.ip_address(saddr)
14 saddr, port = peeraddr
15 a4 = ip.ip_address(saddr)
16 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
17 return addr.packed + pack("!H", port)
20 def unpack_peer(buffer):
21 a6 = ip.IPv6Address(buffer[:16])
22 port = unpack("!H", buffer[16:])[0]
30 def __init__(self, *args, **kwargs):
34 for k, v in self.KWARGS:
35 setattr(self, k, kwargs.get(k, v))
38 self.__class__.__name__
46 return "{}({})".format(
47 self.__class__.__name__,
52 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
53 if isinstance(getattr(self, k), bytes)
54 else getattr(self, k),
56 for k, _ in self.KWARGS
61 def decode(self, buffer):
63 self.__class__.__name__ + "must implement `encode()` method"
69 self.__class__.__name__ + "must implement `encode()` method"
74 """Zmq message to broadcast what was received from the terminal"""
88 + ("0000000000000000" if self.imei is None else self.imei).encode()
92 else pack("!d", self.when)
94 + pack_peer(self.peeraddr)
98 def decode(self, buffer):
99 self.proto = buffer[0]
100 self.imei = buffer[1:17].decode()
101 if self.imei == "0000000000000000":
103 self.when = unpack("!d", buffer[17:25])[0]
104 self.peeraddr = unpack_peer(buffer[25:43])
105 self.packet = buffer[43:]
109 """Zmq message received from a third party to send to the terminal"""
111 KWARGS = (("imei", None), ("packet", b""))
116 "0000000000000000" if self.imei is None else self.imei.encode()
119 def decode(self, buffer):
120 self.imei = buffer[:16].decode()
121 self.packet = buffer[16:]