X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fzmsg.py;h=28c233a3cafe7b45f89fc6b0c44f94c8a98d5251;hb=117fd17ef103bb32940433955eba22f7fa457b99;hp=56392007f1ff74ecd61cf59b8a95b1b9c2c06c48;hpb=faa0b11a46a7ff8797640433afdb1bfd11bce611;p=loctrkd.git diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 5639200..28c233a 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -1,9 +1,11 @@ """ Zeromq messages """ +from datetime import datetime, timezone +from json import dumps, loads import ipaddress as ip from struct import pack, unpack -__all__ = "Bcast", "Resp" +__all__ = "Bcast", "LocEvt", "Resp" def pack_peer(peeraddr): @@ -58,15 +60,20 @@ class _Zmsg: ), ) + def __eq__(self, other): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + def decode(self, buffer): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + raise NotImplementedError( + self.__class__.__name__ + "must implement `decode()` method" ) @property def packed(self): - raise RuntimeError( - self.__class__.__name__ + "must implement `encode()` method" + raise NotImplementedError( + self.__class__.__name__ + "must implement `packed()` property" ) @@ -119,3 +126,53 @@ class Resp(_Zmsg): def decode(self, buffer): self.imei = buffer[:16].decode() self.packet = buffer[16:] + + +class LocEvt(_Zmsg): + """Zmq message with original or approximated location from lookaside""" + + KWARGS = ( + ("imei", "0000000000000000"), + ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)), + ("lat", 0.0), + ("lon", 0.0), + ("is_gps", True), + ) + + # This message is for external consumption, so use json encoding, + # except imei that forms 16 byte prefix that can be used as the + # topic to subscribe. + @property + def packed(self): + return ( + ("0000000000000000" + self.imei)[-16:].encode() + + dumps( + { + "devtime": str(self.devtime), + "latitude": self.lat, + "longitude": self.lon, + "is-gps": self.is_gps, + } + ).encode() + ) + + # And this is full json that can be sent over websocket etc. + @property + def json(self): + return dumps( + { + "imei": self.imei, + "devtime": str(self.devtime), + "latitude": self.lat, + "longitude": self.lon, + "is-gps": self.is_gps, + } + ) + + def decode(self, buffer): + self.imei = buffer[:16].decode() + json_data = loads(buffer[16:]) + self.devtime = datetime.fromisoformat(json_data["devtime"]) + self.lat = json_data["latitude"] + self.lon = json_data["longitude"] + self.is_gps = json_data["is-gps"]