from configparser import ConfigParser
from logging import getLogger
from os import umask
-from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from socket import (
+ socket,
+ AF_INET6,
+ SOCK_STREAM,
+ SOL_SOCKET,
+ SO_KEEPALIVE,
+ SO_REUSEADDR,
+)
from struct import pack
from time import time
from typing import Dict, List, Optional, Tuple
log = getLogger("gps303/collector")
+MAXBUFFER: int = 4096
+
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
"""Read from the socket and parse complete messages"""
try:
- segment = self.sock.recv(4096)
+ segment = self.sock.recv(MAXBUFFER)
except OSError as e:
log.warning(
"Reading from fd %d (IMEI %s): %s",
return None
when = time()
self.buffer += segment
+ if len(self.buffer) > MAXBUFFER:
+ # We are receiving junk. Let's drop it or we run out of memory.
+ log.warning("More than %d unparseable data, dropping", MAXBUFFER)
+ self.buffer = b""
msgs = []
while True:
framestart = self.buffer.find(b"xx")
break
if framestart > 0: # Should not happen, report
log.warning(
- 'Undecodable data "%s" from fd %d (IMEI %s)',
- self.buffer[:framestart].hex(),
+ 'Undecodable data (%d) "%s" from fd %d (IMEI %s)',
+ framestart,
+ self.buffer[:framestart][:64].hex(),
self.sock.fileno(),
self.imei,
)
# Do this embarrassing hack to avoid accidental match
# of some binary data in the packet against '\r\n'.
while True:
- frameend = self.buffer.find(b"\r\n", frameend)
- if frameend >= (exp_end - 3): # Found realistic match
+ frameend = self.buffer.find(b"\r\n", frameend + 1)
+ if frameend == -1 or frameend >= (
+ exp_end - 3
+ ): # Found realistic match or none
break
if frameend == -1: # Incomplete frame, return what we have
break
packet = self.buffer[2:frameend]
self.buffer = self.buffer[frameend + 2 :]
+ if len(packet) < 2: # frameend comes too early
+ log.warning("Packet too short: %s", packet)
+ break
if proto_of_message(packet) == LOGIN.PROTO:
- self.imei = parse_message(packet).imei
- log.info(
- "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
- )
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN): # Can be unparseable
+ self.imei = msg.imei
+ log.info(
+ "LOGIN from fd %d (IMEI %s)",
+ self.sock.fileno(),
+ self.imei,
+ )
msgs.append((when, self.addr, packet))
return msgs
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",
- self.sock.fileno,
+ self.sock.fileno(),
self.imei,
e,
)
del self.by_imei[clnt.imei]
del self.by_fd[fd]
- def recv(self, fd: int) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
+ def recv(
+ self, fd: int
+ ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
clnt = self.by_fd[fd]
msgs = clnt.recv()
if msgs is None:
if clnt.imei:
self.by_imei[clnt.imei] = clnt
else:
- log.warning("Login message from %s: %s, but client imei unfilled", peeraddr, packet)
+ log.warning(
+ "Login message from %s: %s, but client imei unfilled",
+ peeraddr,
+ packet,
+ )
result.append((clnt.imei, when, peeraddr, packet))
log.debug(
"Received from %s (IMEI %s): %s",
log.info("Not connected (IMEI %s)", resp.imei)
-def runserver(conf: ConfigParser) -> None:
+def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zpub = zctx.socket(zmq.PUB) # type: ignore
break
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
+ clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
topoll.append((clntsock, clntaddr))
elif fl & zmq.POLLIN:
received = clients.recv(sk)
packet=packet,
).packed
)
- if proto == HIBERNATION.PROTO:
+ if proto == HIBERNATION.PROTO and handle_hibernate:
log.debug(
"HIBERNATION from fd %d (IMEI %s)",
sk,
fd = clients.add(clntsock, clntaddr)
poller.register(fd, flags=zmq.POLLIN)
except KeyboardInterrupt:
- pass
+ zpub.close()
+ zpull.close()
+ zctx.destroy() # type: ignore
+ tcpl.close()
if __name__.endswith("__main__"):