X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fzmsg.py;h=898918cab10204384986bf2b36ae710148dbab9c;hb=da8493cf8d5652664822afed88f073c683278d5f;hp=7ab3ce4caf755271bd91531cc81f9103240d8b35;hpb=3dea189c7bb47f02db07b52fdcda53fdb986fd2b;p=loctrkd.git diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 7ab3ce4..898918c 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -1,27 +1,30 @@ """ Zeromq messages """ +from datetime import datetime, timezone +from json import dumps, loads import ipaddress as ip from struct import pack, unpack -__all__ = "Bcast", "Resp" +__all__ = "Bcast", "LocEvt", "Resp" + def pack_peer(peeraddr): - saddr, port, _x, _y = peeraddr - addr6 = ip.ip_address(saddr) - addr = addr6.ipv4_mapped - if addr is None: - addr = addr6 - return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port) + try: + saddr, port, _x, _y = peeraddr + addr = ip.ip_address(saddr) + except ValueError: + saddr, port = peeraddr + a4 = ip.ip_address(saddr) + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed) + return addr.packed + pack("!H", port) + def unpack_peer(buffer): - version = buffer[0] - if version not in (4, 6): - return None - if version == 4: - addr = ip.IPv4Address(buffer[1:5]) - else: - addr = ip.IPv6Address(buffer[1:17]) - port = unpack("!H", buffer[17:19])[0] + a6 = ip.IPv6Address(buffer[:16]) + port = unpack("!H", buffer[16:])[0] + addr = a6.ipv4_mapped + if addr is None: + addr = a6 return (addr, port) @@ -41,15 +44,36 @@ class _Zmsg: + str(kwargs) ) + def __repr__(self): + return "{}({})".format( + self.__class__.__name__, + ", ".join( + [ + "{}={}".format( + k, + 'bytes.fromhex("{}")'.format(getattr(self, k).hex()) + if isinstance(getattr(self, k), bytes) + else getattr(self, k), + ) + for k, _ in self.KWARGS + ] + ), + ) + + def __eq__(self, other): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + def decode(self, buffer): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + raise NotImplementedError( + self.__class__.__name__ + "must implement `decode()` method" ) @property def packed(self): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + raise NotImplementedError( + self.__class__.__name__ + "must implement `packed()` property" ) @@ -69,7 +93,11 @@ class Bcast(_Zmsg): return ( pack("B", self.proto) + ("0000000000000000" if self.imei is None else self.imei).encode() - + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when)) + + ( + b"\0\0\0\0\0\0\0\0" + if self.when is None + else pack("!d", self.when) + ) + pack_peer(self.peeraddr) + self.packet ) @@ -80,8 +108,8 @@ class Bcast(_Zmsg): if self.imei == "0000000000000000": self.imei = None self.when = unpack("!d", buffer[17:25])[0] - self.peeraddr = unpack_peer(buffer[25:44]) - self.packet = buffer[44:] + self.peeraddr = unpack_peer(buffer[25:43]) + self.packet = buffer[43:] class Resp(_Zmsg): @@ -98,3 +126,40 @@ class Resp(_Zmsg): def decode(self, buffer): self.imei = buffer[:16].decode() self.packet = buffer[16:] + + +class LocEvt(_Zmsg): + """Zmq message with original or approximated location from lookaside""" + + KWARGS = ( + ("imei", "0000000000000000"), + ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)), + ("lat", 0.0), + ("lon", 0.0), + ("is_gps", True), + ) + + # This message is for external consumption, so use json encoding, + # except imei that forms 16 byte prefix that can be used as the + # topic to subscribe. + @property + def packed(self): + return ( + self.imei.encode() + + dumps( + { + "devtime": str(self.devtime), + "latitude": self.lat, + "longitude": self.lon, + "is-gps": self.is_gps, + } + ).encode() + ) + + def decode(self, buffer): + self.imei = buffer[:16].decode() + json_data = loads(buffer[16:]) + self.devtime = datetime.fromisoformat(json_data["devtime"]) + self.lat = json_data["latitude"] + self.lon = json_data["longitude"] + self.is_gps = json_data["is-gps"]