""" Zeromq messages """
from datetime import datetime, timezone
+from json import dumps, loads
import ipaddress as ip
from struct import pack, unpack
),
)
+ def __eq__(self, other):
+ return all(
+ [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+ )
+
def decode(self, buffer):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `decode()` method"
)
@property
def packed(self):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `packed()` property"
)
("is_gps", True),
)
+ # This message is for external consumption, so use json encoding,
+ # except imei that forms 16 byte prefix that can be used as the
+ # topic to subscribe.
@property
def packed(self):
- return self.imei.encode() + pack(
- "!dddB",
- self.devtime.replace(tzinfo=timezone.utc).timestamp(),
- self.lat,
- self.lon,
- int(self.is_gps),
+ return (
+ self.imei.encode()
+ + dumps(
+ {
+ "devtime": str(self.devtime),
+ "latitude": self.lat,
+ "longitude": self.lon,
+ "is-gps": self.is_gps,
+ }
+ ).encode()
)
def decode(self, buffer):
self.imei = buffer[:16].decode()
- when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
- self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
- self.is_gps = bool(is_gps)
+ json_data = loads(buffer[16:])
+ self.devtime = datetime.fromisoformat(json_data["devtime"])
+ self.lat = json_data["latitude"]
+ self.lon = json_data["longitude"]
+ self.is_gps = json_data["is-gps"]