X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=9d694b0ae4c83180a7a718aa21be5daa4e7fe6f4;hb=945c7abb8b29898f46db4ad2833428d6db588a63;hp=c2efd79b82182e13319526a644abccb0e85071b9;hpb=dba371efd7945c29be62ec915f21794e8a737669;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index c2efd79..9d694b0 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -3,7 +3,14 @@ from configparser import ConfigParser from logging import getLogger from os import umask -from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from socket import ( + socket, + AF_INET6, + SOCK_STREAM, + SOL_SOCKET, + SO_KEEPALIVE, + SO_REUSEADDR, +) from struct import pack from time import time from typing import Dict, List, Optional, Tuple @@ -98,11 +105,18 @@ class Client: break packet = self.buffer[2:frameend] self.buffer = self.buffer[frameend + 2 :] + if len(packet) < 2: # frameend comes too early + log.warning("Packet too short: %s", packet) + break if proto_of_message(packet) == LOGIN.PROTO: - self.imei = parse_message(packet).imei - log.info( - "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei - ) + msg = parse_message(packet) + if isinstance(msg, LOGIN): # Can be unparseable + self.imei = msg.imei + log.info( + "LOGIN from fd %d (IMEI %s)", + self.sock.fileno(), + self.imei, + ) msgs.append((when, self.addr, packet)) return msgs @@ -112,7 +126,7 @@ class Client: except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", - self.sock.fileno, + self.sock.fileno(), self.imei, e, ) @@ -206,6 +220,7 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() + clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) topoll.append((clntsock, clntaddr)) elif fl & zmq.POLLIN: received = clients.recv(sk) @@ -239,9 +254,6 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: else: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now - for fd in tostop: - poller.unregister(fd) # type: ignore - clients.stop(fd) for zmsg in tosend: zpub.send( Bcast( @@ -254,6 +266,9 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: ) log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) + for fd in tostop: + poller.unregister(fd) # type: ignore + clients.stop(fd) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN)