from . import common
from .gps303proto import (
GPS303Conn,
- StreamError,
HIBERNATION,
LOGIN,
inline_response,
)
return None
when = time()
- while True:
- try:
- return [
- (when, self.addr, packet)
- for packet in self.stream.recv(segment)
- ]
- except StreamError as e:
+ msgs = []
+ for elem in self.stream.recv(segment):
+ if isinstance(elem, bytes):
+ msgs.append((when, self.addr, elem))
+ else:
log.warning(
- "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
+ "%s from fd %d (IMEI %s)",
+ elem,
+ self.sock.fileno(),
+ self.imei,
)
+ return msgs
def send(self, buffer: bytes) -> None:
try:
__all__ = (
"GPS303Conn",
- "StreamError",
"class_by_prefix",
"inline_response",
"parse_message",
MAXBUFFER: int = 4096
-class StreamError(Exception):
- pass
-
-
class GPS303Conn:
def __init__(self) -> None:
self.buffer = b""
def enframe(buffer: bytes) -> bytes:
return b"xx" + buffer + b"\r\n"
- def recv(self, segment: bytes) -> List[bytes]:
+ def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+ """
+ Process next segment of the stream. Return successfully deframed
+ packets as `bytes` and error messages as `str`.
+ """
when = time()
self.buffer += segment
if len(self.buffer) > MAXBUFFER:
# We are receiving junk. Let's drop it or we run out of memory.
self.buffer = b""
- raise StreamError(
- f"More than {MAXBUFFER} unparseable data, dropping"
- )
- msgs = []
+ return [f"More than {MAXBUFFER} unparseable data, dropping"]
+ msgs: List[Union[bytes, str]] = []
while True:
framestart = self.buffer.find(b"xx")
if framestart == -1: # No frames, return whatever we have
break
if framestart > 0: # Should not happen, report
- self.buffer = self.buffer[framestart:]
- raise StreamError(
+ msgs.append(
f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
)
+ self.buffer = self.buffer[framestart:]
# At this point, buffer starts with a packet
if len(self.buffer) < 6: # no len and proto - cannot proceed
break
packet = self.buffer[2:frameend]
self.buffer = self.buffer[frameend + 2 :]
if len(packet) < 2: # frameend comes too early
- raise StreamError(f"Packet too short: {packet.hex()}")
- msgs.append(packet)
+ msgs.append(f"Packet too short: {packet.hex()}")
+ else:
+ msgs.append(packet)
return msgs
def close(self) -> bytes: