__all__ = "Bcast", "Resp", "topic"
-def pack_peer(
+def pack_peer( # 18 bytes
peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
) -> bytes:
if peeraddr is None:
def topic(
- proto: int, is_incoming: bool = True, imei: Optional[str] = None
+ proto: str, is_incoming: bool = True, imei: Optional[str] = None
) -> bytes:
- return pack("BB", is_incoming, proto) + (
+ return pack("B16s", is_incoming, proto.encode()) + (
b"" if imei is None else pack("16s", imei.encode())
)
KWARGS = (
("is_incoming", True),
- ("proto", 256),
+ ("proto", "UNKNOWN"),
("imei", None),
("when", None),
("peeraddr", None),
def packed(self) -> bytes:
return (
pack(
- "BB16s",
+ "!B16s16sd",
int(self.is_incoming),
- self.proto,
+ self.proto[:16].ljust(16, "\0").encode(),
b"0000000000000000"
if self.imei is None
else self.imei.encode(),
- )
- + (
- b"\0\0\0\0\0\0\0\0"
- if self.when is None
- else pack("!d", self.when)
+ 0 if self.when is None else self.when,
)
+ pack_peer(self.peeraddr)
+ self.packet
)
def decode(self, buffer: bytes) -> None:
- self.is_incoming = bool(buffer[0])
- self.proto = buffer[1]
- self.imei: Optional[str] = buffer[2:18].decode()
- if self.imei == "0000000000000000":
- self.imei = None
- self.when = unpack("!d", buffer[18:26])[0]
- self.peeraddr = unpack_peer(buffer[26:44])
- self.packet = buffer[44:]
+ is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
+ self.is_incoming = bool(is_incoming)
+ self.proto = proto.decode()
+ self.imei = (
+ None if imei == b"0000000000000000" else imei.decode().strip("\0")
+ )
+ self.when = when
+ self.peeraddr = unpack_peer(buffer[41:59])
+ self.packet = buffer[59:]
class Resp(_Zmsg):
def packed(self) -> bytes:
return (
pack(
- "16s",
+ "!16sd",
"0000000000000000"
if self.imei is None
else self.imei.encode(),
- )
- + (
- b"\0\0\0\0\0\0\0\0"
- if self.when is None
- else pack("!d", self.when)
+ 0 if self.when is None else self.when,
)
+ self.packet
)
def decode(self, buffer: bytes) -> None:
- self.imei = buffer[:16].decode()
- self.when = unpack("!d", buffer[16:24])[0]
+ imei, when = unpack("!16sd", buffer[:24])
+ self.imei = (
+ None if imei == b"0000000000000000" else imei.decode().strip("\0")
+ )
+
+ self.when = when
self.packet = buffer[24:]