from . import common
from .gps303proto import (
+ GPS303Conn,
+ StreamError,
HIBERNATION,
LOGIN,
inline_response,
def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
self.sock = sock
self.addr = addr
- self.buffer = b""
+ self.stream = GPS303Conn()
self.imei: Optional[str] = None
def close(self) -> None:
log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
self.sock.close()
- self.buffer = b""
+ rest = self.stream.close()
+ if rest:
+ log.warning("%d bytes in buffer on close: %s", len(rest), rest)
def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
"""Read from the socket and parse complete messages"""
)
return None
when = time()
- self.buffer += segment
- if len(self.buffer) > MAXBUFFER:
- # We are receiving junk. Let's drop it or we run out of memory.
- log.warning("More than %d unparseable data, dropping", MAXBUFFER)
- self.buffer = b""
- msgs = []
while True:
- framestart = self.buffer.find(b"xx")
- if framestart == -1: # No frames, return whatever we have
- break
- if framestart > 0: # Should not happen, report
+ try:
+ return [
+ (when, self.addr, packet)
+ for packet in self.stream.recv(segment)
+ ]
+ except StreamError as e:
log.warning(
- 'Undecodable data (%d) "%s" from fd %d (IMEI %s)',
- framestart,
- self.buffer[:framestart][:64].hex(),
- self.sock.fileno(),
- self.imei,
+ "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
)
- self.buffer = self.buffer[framestart:]
- # At this point, buffer starts with a packet
- if len(self.buffer) < 6: # no len and proto - cannot proceed
- break
- exp_end = self.buffer[2] + 3 # Expect '\r\n' here
- frameend = 0
- # Length field can legitimeely be much less than the
- # length of the packet (e.g. WiFi positioning), but
- # it _should not_ be greater. Still sometimes it is.
- # Luckily, not by too much: by maybe two or three bytes?
- # Do this embarrassing hack to avoid accidental match
- # of some binary data in the packet against '\r\n'.
- while True:
- frameend = self.buffer.find(b"\r\n", frameend + 1)
- if frameend == -1 or frameend >= (
- exp_end - 3
- ): # Found realistic match or none
- break
- if frameend == -1: # Incomplete frame, return what we have
- break
- packet = self.buffer[2:frameend]
- self.buffer = self.buffer[frameend + 2 :]
- if len(packet) < 2: # frameend comes too early
- log.warning("Packet too short: %s", packet)
- break
- msgs.append((when, self.addr, packet))
- return msgs
def send(self, buffer: bytes) -> None:
try:
- self.sock.send(b"xx" + buffer + b"\r\n")
+ self.sock.send(self.stream.enframe(buffer))
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",
e,
)
+ def set_imei(self, imei: str) -> None:
+ self.imei = imei
+
class Clients:
def __init__(self) -> None:
from enum import Enum
from inspect import isclass
from struct import error, pack, unpack
+from time import time
from typing import (
Any,
Callable,
)
__all__ = (
+ "GPS303Conn",
+ "StreamError",
"class_by_prefix",
"inline_response",
"parse_message",
"UNKNOWN_B3",
)
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class StreamError(Exception):
+ pass
+
+
+class GPS303Conn:
+ def __init__(self) -> None:
+ self.buffer = b""
+
+ @staticmethod
+ def enframe(buffer: bytes) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+ def recv(self, segment: bytes) -> List[bytes]:
+ when = time()
+ self.buffer += segment
+ if len(self.buffer) > MAXBUFFER:
+ # We are receiving junk. Let's drop it or we run out of memory.
+ self.buffer = b""
+ raise StreamError(
+ f"More than {MAXBUFFER} unparseable data, dropping"
+ )
+ msgs = []
+ while True:
+ framestart = self.buffer.find(b"xx")
+ if framestart == -1: # No frames, return whatever we have
+ break
+ if framestart > 0: # Should not happen, report
+ self.buffer = self.buffer[framestart:]
+ raise StreamError(
+ f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+ )
+ # At this point, buffer starts with a packet
+ if len(self.buffer) < 6: # no len and proto - cannot proceed
+ break
+ exp_end = self.buffer[2] + 3 # Expect '\r\n' here
+ frameend = 0
+ # Length field can legitimeely be much less than the
+ # length of the packet (e.g. WiFi positioning), but
+ # it _should not_ be greater. Still sometimes it is.
+ # Luckily, not by too much: by maybe two or three bytes?
+ # Do this embarrassing hack to avoid accidental match
+ # of some binary data in the packet against '\r\n'.
+ while True:
+ frameend = self.buffer.find(b"\r\n", frameend + 1)
+ if frameend == -1 or frameend >= (
+ exp_end - 3
+ ): # Found realistic match or none
+ break
+ if frameend == -1: # Incomplete frame, return what we have
+ break
+ packet = self.buffer[2:frameend]
+ self.buffer = self.buffer[frameend + 2 :]
+ if len(packet) < 2: # frameend comes too early
+ raise StreamError(f"Packet too short: {packet.hex()}")
+ msgs.append(packet)
+ return msgs
+
+ def close(self) -> bytes:
+ ret = self.buffer
+ self.buffer = b""
+ return ret
+
+
+### Parser/Constructor ###
+
class DecodeError(Exception):
def __init__(self, e: Exception, **kwargs: Any) -> None: