""" Zeromq messages """
+from datetime import datetime, timezone
import ipaddress as ip
from struct import pack, unpack
-__all__ = "Bcast", "Resp"
+__all__ = "Bcast", "LocEvt", "Resp"
def pack_peer(peeraddr):
- saddr, port, _x, _y = peeraddr
- addr6 = ip.ip_address(saddr)
- addr = addr6.ipv4_mapped
- if addr is None:
- addr = addr6
- return (
- pack("B", addr.version)
- + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16]
- + pack("!H", port)
- )
+ try:
+ saddr, port, _x, _y = peeraddr
+ addr = ip.ip_address(saddr)
+ except ValueError:
+ saddr, port = peeraddr
+ a4 = ip.ip_address(saddr)
+ addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
+ return addr.packed + pack("!H", port)
def unpack_peer(buffer):
- version = buffer[0]
- if version not in (4, 6):
- return None
- if version == 4:
- addr = ip.IPv4Address(buffer[1:5])
- else:
- addr = ip.IPv6Address(buffer[1:17])
- port = unpack("!H", buffer[17:19])[0]
+ a6 = ip.IPv6Address(buffer[:16])
+ port = unpack("!H", buffer[16:])[0]
+ addr = a6.ipv4_mapped
+ if addr is None:
+ addr = a6
return (addr, port)
if self.imei == "0000000000000000":
self.imei = None
self.when = unpack("!d", buffer[17:25])[0]
- self.peeraddr = unpack_peer(buffer[25:44])
- self.packet = buffer[44:]
+ self.peeraddr = unpack_peer(buffer[25:43])
+ self.packet = buffer[43:]
class Resp(_Zmsg):
def decode(self, buffer):
self.imei = buffer[:16].decode()
self.packet = buffer[16:]
+
+
+class LocEvt(_Zmsg):
+ """Zmq message with original or approximated location from lookaside"""
+
+ KWARGS = (
+ ("imei", "0000000000000000"),
+ ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
+ ("lat", 0.0),
+ ("lon", 0.0),
+ ("is_gps", True),
+ )
+
+ @property
+ def packed(self):
+ return self.imei.encode() + pack(
+ "!dddB",
+ self.devtime.replace(tzinfo=timezone.utc).timestamp(),
+ self.lat,
+ self.lon,
+ int(self.is_gps),
+ )
+
+ def decode(self, buffer):
+ self.imei = buffer[:16].decode()
+ when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
+ self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
+ self.is_gps = bool(is_gps)