From 1f1c34f9f42911958be5b282bd9092c1ed3de2a5 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Mon, 18 Jul 2022 15:04:13 +0200 Subject: [PATCH] collector: streamline tracking of polled fd-s --- loctrkd/collector.py | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/loctrkd/collector.py b/loctrkd/collector.py index 26d37f3..6fc606a 100644 --- a/loctrkd/collector.py +++ b/loctrkd/collector.py @@ -68,7 +68,7 @@ pmods: List[ProtoModule] = [] class Client: """Connected socket to the terminal plus buffer and metadata""" - def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: + def __init__(self, sock: socket, addr: Any) -> None: self.sock = sock self.addr = addr self.pmod: Optional[ProtoModule] = None @@ -87,7 +87,7 @@ class Client: "%d bytes in buffer on close: %s", len(rest), rest[:64].hex() ) - def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: + def recv(self) -> Optional[List[Tuple[float, Any, bytes]]]: """Read from the socket and parse complete messages""" try: segment = self.sock.recv(MAXBUFFER) @@ -151,15 +151,20 @@ class Clients: def __init__(self) -> None: self.by_fd: Dict[int, Client] = {} self.by_imei: Dict[str, Client] = {} - self.tostop: Set[int] = set() - def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int: + def fds(self) -> Set[int]: + return set(self.by_fd.keys()) + + def add(self, clntsock: socket, clntaddr: Any) -> int: fd = clntsock.fileno() log.info("Start serving fd %d from %s", fd, clntaddr) self.by_fd[fd] = Client(clntsock, clntaddr) return fd def stop(self, fd: int) -> None: + if fd not in self.by_fd: + log.debug("Fd %d is not served, ingore stop", fd) + return clnt = self.by_fd[fd] log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei) clnt.close() @@ -169,9 +174,10 @@ class Clients: def recv( self, fd: int - ) -> Optional[ - List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]] - ]: + ) -> Optional[List[Tuple[ProtoModule, Optional[str], float, Any, bytes]]]: + if fd not in self.by_fd: + log.debug("Client at fd %d gone, ingore event", fd) + return None clnt = self.by_fd[fd] msgs = clnt.recv() if msgs is None: @@ -189,7 +195,7 @@ class Clients: oldfd = oldclnt.sock.fileno() log.info("Removing stale connection on fd %d", oldfd) oldclnt.imei = None - self.tostop.add(oldfd) + self.stop(oldfd) self.by_imei[clnt.imei] = clnt result.append((clnt.pmod, clnt.imei, when, peeraddr, packet)) log.debug( @@ -233,11 +239,11 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() + pollingfds: Set[int] = set() try: while True: - tosend = [] - topoll = [] - clients.tostop = set() + tosend: List[Resp] = [] + toadd: List[Tuple[socket, Any]] = [] events = poller.poll(1000) for sk, fl in events: if sk is zpull: @@ -251,12 +257,12 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) - topoll.append((clntsock, clntaddr)) + toadd.append((clntsock, clntaddr)) elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: log.debug("Terminal gone from fd %d", sk) - clients.tostop.add(sk) + clients.stop(sk) else: for pmod, imei, when, peeraddr, packet in received: proto = pmod.proto_of_message(packet) @@ -278,7 +284,7 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: sk, imei, ) - clients.tostop.add(sk) + clients.stop(sk) respmsg = pmod.inline_response(packet) if respmsg is not None: tosend.append( @@ -300,12 +306,13 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: packet=zmsg.packet, ).packed ) - for fd in clients.tostop: + for fd in pollingfds - clients.fds(): poller.unregister(fd) # type: ignore - clients.stop(fd) - for clntsock, clntaddr in topoll: + for clntsock, clntaddr in toadd: fd = clients.add(clntsock, clntaddr) + for fd in clients.fds() - pollingfds: poller.register(fd, flags=zmq.POLLIN) + pollingfds = clients.fds() except KeyboardInterrupt: zpub.close() zpull.close() -- 2.43.0