1 """ Zeromq messages """
3 from datetime import datetime, timezone
4 from json import dumps, loads
6 from struct import pack, unpack
8 __all__ = "Bcast", "LocEvt", "Resp"
11 def pack_peer(peeraddr):
13 saddr, port, _x, _y = peeraddr
14 addr = ip.ip_address(saddr)
16 saddr, port = peeraddr
17 a4 = ip.ip_address(saddr)
18 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
19 return addr.packed + pack("!H", port)
22 def unpack_peer(buffer):
23 a6 = ip.IPv6Address(buffer[:16])
24 port = unpack("!H", buffer[16:])[0]
32 def __init__(self, *args, **kwargs):
36 for k, v in self.KWARGS:
37 setattr(self, k, kwargs.get(k, v))
40 self.__class__.__name__
48 return "{}({})".format(
49 self.__class__.__name__,
54 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
55 if isinstance(getattr(self, k), bytes)
56 else getattr(self, k),
58 for k, _ in self.KWARGS
63 def __eq__(self, other):
65 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
68 def decode(self, buffer):
69 raise NotImplementedError(
70 self.__class__.__name__ + "must implement `decode()` method"
75 raise NotImplementedError(
76 self.__class__.__name__ + "must implement `packed()` property"
81 """Zmq message to broadcast what was received from the terminal"""
95 + ("0000000000000000" if self.imei is None else self.imei).encode()
99 else pack("!d", self.when)
101 + pack_peer(self.peeraddr)
105 def decode(self, buffer):
106 self.proto = buffer[0]
107 self.imei = buffer[1:17].decode()
108 if self.imei == "0000000000000000":
110 self.when = unpack("!d", buffer[17:25])[0]
111 self.peeraddr = unpack_peer(buffer[25:43])
112 self.packet = buffer[43:]
116 """Zmq message received from a third party to send to the terminal"""
118 KWARGS = (("imei", None), ("packet", b""))
123 "0000000000000000" if self.imei is None else self.imei.encode()
126 def decode(self, buffer):
127 self.imei = buffer[:16].decode()
128 self.packet = buffer[16:]
132 """Zmq message with original or approximated location from lookaside"""
135 ("imei", "0000000000000000"),
136 ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
142 # This message is for external consumption, so use json encoding,
143 # except imei that forms 16 byte prefix that can be used as the
144 # topic to subscribe.
148 ("0000000000000000" + self.imei)[-16:].encode()
151 "devtime": str(self.devtime),
152 "latitude": self.lat,
153 "longitude": self.lon,
154 "is-gps": self.is_gps,
159 # And this is full json that can be sent over websocket etc.
165 "devtime": str(self.devtime),
166 "latitude": self.lat,
167 "longitude": self.lon,
168 "is-gps": self.is_gps,
172 def decode(self, buffer):
173 self.imei = buffer[:16].decode()
174 json_data = loads(buffer[16:])
175 self.devtime = datetime.fromisoformat(json_data["devtime"])
176 self.lat = json_data["latitude"]
177 self.lon = json_data["longitude"]
178 self.is_gps = json_data["is-gps"]