""" Zeromq messages """
+from datetime import datetime, timezone
+from json import dumps, loads
import ipaddress as ip
from struct import pack, unpack
-__all__ = "Bcast", "Resp"
+__all__ = "Bcast", "LocEvt", "Resp"
def pack_peer(peeraddr):
),
)
+ def __eq__(self, other):
+ return all(
+ [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+ )
+
def decode(self, buffer):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `decode()` method"
)
@property
def packed(self):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `packed()` property"
)
def decode(self, buffer):
self.imei = buffer[:16].decode()
self.packet = buffer[16:]
+
+
+class LocEvt(_Zmsg):
+ """Zmq message with original or approximated location from lookaside"""
+
+ KWARGS = (
+ ("imei", "0000000000000000"),
+ ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
+ ("lat", 0.0),
+ ("lon", 0.0),
+ ("is_gps", True),
+ )
+
+ # This message is for external consumption, so use json encoding,
+ # except imei that forms 16 byte prefix that can be used as the
+ # topic to subscribe.
+ @property
+ def packed(self):
+ return (
+ self.imei.encode()
+ + dumps(
+ {
+ "devtime": str(self.devtime),
+ "latitude": self.lat,
+ "longitude": self.lon,
+ "is-gps": self.is_gps,
+ }
+ ).encode()
+ )
+
+ # And this is full json that can be sent over websocket etc.
+ @property
+ def json(self):
+ return dumps(
+ {
+ "imei": self.imei,
+ "devtime": str(self.devtime),
+ "latitude": self.lat,
+ "longitude": self.lon,
+ "is-gps": self.is_gps,
+ }
+ )
+
+ def decode(self, buffer):
+ self.imei = buffer[:16].decode()
+ json_data = loads(buffer[16:])
+ self.devtime = datetime.fromisoformat(json_data["devtime"])
+ self.lat = json_data["latitude"]
+ self.lon = json_data["longitude"]
+ self.is_gps = json_data["is-gps"]