X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fzmsg.py;h=80cf0003cbb97eee3baf55684a9085cb010d2c3f;hb=80e795c08def3466884223357798cd1aff265212;hp=6e591ab65c2fb9a8c4ec7ff7a9bcab8d69e7de08;hpb=ecb31fcdc354471235ba44edbad3b24a6bded905;p=loctrkd.git diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 6e591ab..80cf000 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -1,33 +1,29 @@ """ Zeromq messages """ +from datetime import datetime, timezone import ipaddress as ip from struct import pack, unpack -__all__ = "Bcast", "Resp" +__all__ = "Bcast", "LocEvt", "Resp" def pack_peer(peeraddr): - saddr, port, _x, _y = peeraddr - addr6 = ip.ip_address(saddr) - addr = addr6.ipv4_mapped - if addr is None: - addr = addr6 - return ( - pack("B", addr.version) - + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] - + pack("!H", port) - ) + try: + saddr, port, _x, _y = peeraddr + addr = ip.ip_address(saddr) + except ValueError: + saddr, port = peeraddr + a4 = ip.ip_address(saddr) + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed) + return addr.packed + pack("!H", port) def unpack_peer(buffer): - version = buffer[0] - if version not in (4, 6): - return None - if version == 4: - addr = ip.IPv4Address(buffer[1:5]) - else: - addr = ip.IPv6Address(buffer[1:17]) - port = unpack("!H", buffer[17:19])[0] + a6 = ip.IPv6Address(buffer[:16]) + port = unpack("!H", buffer[16:])[0] + addr = a6.ipv4_mapped + if addr is None: + addr = a6 return (addr, port) @@ -106,8 +102,8 @@ class Bcast(_Zmsg): if self.imei == "0000000000000000": self.imei = None self.when = unpack("!d", buffer[17:25])[0] - self.peeraddr = unpack_peer(buffer[25:44]) - self.packet = buffer[44:] + self.peeraddr = unpack_peer(buffer[25:43]) + self.packet = buffer[43:] class Resp(_Zmsg): @@ -124,3 +120,31 @@ class Resp(_Zmsg): def decode(self, buffer): self.imei = buffer[:16].decode() self.packet = buffer[16:] + + +class LocEvt(_Zmsg): + """Zmq message with original or approximated location from lookaside""" + + KWARGS = ( + ("imei", "0000000000000000"), + ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)), + ("lat", 0.0), + ("lon", 0.0), + ("is_gps", True), + ) + + @property + def packed(self): + return self.imei.encode() + pack( + "!dddB", + self.devtime.replace(tzinfo=timezone.utc).timestamp(), + self.lat, + self.lon, + int(self.is_gps), + ) + + def decode(self, buffer): + self.imei = buffer[:16].decode() + when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:]) + self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc) + self.is_gps = bool(is_gps)