from struct import pack, unpack
from typing import Any, cast, Optional, Tuple, Type, Union
-__all__ = "Bcast", "Resp", "topic"
+__all__ = "Bcast", "Resp", "topic", "rtopic"
def pack_peer( # 18 bytes
)
+def rtopic(imei: str) -> bytes:
+ return pack("16s", imei.encode())
+
+
class Bcast(_Zmsg):
"""Zmq message to broadcast what was received from the terminal"""
self.when = when
self.packet = buffer[24:]
+
+
+class Rept(_Zmsg):
+ """Broadcast Zzmq message with "rectified" proto-agnostic json data"""
+
+ KWARGS = (("imei", None), ("payload", ""))
+
+ @property
+ def packed(self) -> bytes:
+ return (
+ pack(
+ "16s",
+ "0000000000000000"
+ if self.imei is None
+ else self.imei.encode(),
+ )
+ + self.payload.encode()
+ )
+
+ def decode(self, buffer: bytes) -> None:
+ imei = buffer[:16]
+ self.imei = (
+ None if imei == b"0000000000000000" else imei.decode().strip("\0")
+ )
+ self.payload = buffer[16:].decode()