__all__ = (
"Stream",
"class_by_prefix",
+ "enframe",
"inline_response",
"parse_message",
"probe_buffer",
self.imei: Optional[str] = None
self.datalen: int = 0
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- assert imei is not None and len(imei) == 10
- return f"[LT*{imei:10s}*{len(buffer):04X}*".encode() + buffer + b"]"
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
"""
Process next segment of the stream. Return successfully deframed
)
self.buffer = self.buffer[toskip:]
# From this point, buffer starts with a packet header
- if self.imei is not None and self.imei != imei:
+ if self.imei is None:
+ self.imei = imei
+ if self.imei != imei:
msgs.append(
f"Packet's imei {imei} mismatches"
- f" previous value {self.imei}"
+ f" previous value {self.imei}, old value kept"
)
- self.imei = imei
self.datalen = datalen
if len(self.buffer) < self.datalen + 21: # Incomplete packet
break
return ret
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ assert imei is not None and len(imei) == 10
+ off, vid, _, dlen = _framestart(buffer)
+ assert off == 0
+ return f"[{vid:2s}*{imei:10s}*{dlen:04X}*".encode() + buffer[20:]
+
+
### Parser/Constructor ###
@property
def packed(self) -> bytes:
- return self.encode().encode() # first is object's, second str's
+ buffer = self.encode().encode()
+ return f"[LT*0000000000*{len(buffer):04X}*".encode() + buffer + b"]"
class UNKNOWN(BeeSurePkt):
def proto_of_message(packet: bytes) -> str:
- return PROTO_PREFIX + packet.split(b",")[0].decode()
+ return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
def imei_from_packet(packet: bytes) -> Optional[str]:
class ProtoModule:
class Stream:
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- ...
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
...
def close(self) -> bytes:
...
+ @staticmethod
+ def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ ...
+
@staticmethod
def probe_buffer(buffer: bytes) -> bool:
...
return msgs
def send(self, buffer: bytes) -> None:
- assert self.stream is not None
+ assert self.stream is not None and self.pmod is not None
try:
- self.sock.send(self.stream.enframe(buffer, imei=self.imei))
+ self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",
try:
while True:
zmsg = Bcast(zsub.recv())
- print("Bcast:", zmsg)
+ print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei)
for pmod in pmods:
if zmsg.proto.startswith(pmod.PROTO_PREFIX):
msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming)
- print("I" if zmsg.is_incoming else "O", zmsg.imei, msg)
+ print(msg)
except KeyboardInterrupt:
pass
def __init__(self) -> None:
self.buffer = b""
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- return b"xx" + buffer + b"\r\n"
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
"""
Process next segment of the stream. Return successfully deframed
return ret
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+
### Parser/Constructor ###