class Client:
"""Connected socket to the terminal plus buffer and metadata"""
- def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
+ def __init__(self, sock: socket, addr: Any) -> None:
self.sock = sock
self.addr = addr
self.pmod: Optional[ProtoModule] = None
"%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
)
- def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
+ def recv(self) -> Optional[List[Tuple[float, Any, bytes]]]:
"""Read from the socket and parse complete messages"""
try:
segment = self.sock.recv(MAXBUFFER)
def __init__(self) -> None:
self.by_fd: Dict[int, Client] = {}
self.by_imei: Dict[str, Client] = {}
- self.tostop: Set[int] = set()
- def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
+ def fds(self) -> Set[int]:
+ return set(self.by_fd.keys())
+
+ def add(self, clntsock: socket, clntaddr: Any) -> int:
fd = clntsock.fileno()
log.info("Start serving fd %d from %s", fd, clntaddr)
self.by_fd[fd] = Client(clntsock, clntaddr)
return fd
def stop(self, fd: int) -> None:
+ if fd not in self.by_fd:
+ log.debug("Fd %d is not served, ingore stop", fd)
+ return
clnt = self.by_fd[fd]
log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
clnt.close()
def recv(
self, fd: int
- ) -> Optional[
- List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
- ]:
+ ) -> Optional[List[Tuple[ProtoModule, Optional[str], float, Any, bytes]]]:
+ if fd not in self.by_fd:
+ log.debug("Client at fd %d gone, ingore event", fd)
+ return None
clnt = self.by_fd[fd]
msgs = clnt.recv()
if msgs is None:
oldfd = oldclnt.sock.fileno()
log.info("Removing stale connection on fd %d", oldfd)
oldclnt.imei = None
- self.tostop.add(oldfd)
+ self.stop(oldfd)
self.by_imei[clnt.imei] = clnt
result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
log.debug(
poller.register(zpull, flags=zmq.POLLIN)
poller.register(tcpfd, flags=zmq.POLLIN)
clients = Clients()
+ pollingfds: Set[int] = set()
try:
while True:
- tosend = []
- topoll = []
- clients.tostop = set()
+ tosend: List[Resp] = []
+ toadd: List[Tuple[socket, Any]] = []
events = poller.poll(1000)
for sk, fl in events:
if sk is zpull:
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
- topoll.append((clntsock, clntaddr))
+ toadd.append((clntsock, clntaddr))
elif fl & zmq.POLLIN:
received = clients.recv(sk)
if received is None:
log.debug("Terminal gone from fd %d", sk)
- clients.tostop.add(sk)
+ clients.stop(sk)
else:
for pmod, imei, when, peeraddr, packet in received:
proto = pmod.proto_of_message(packet)
sk,
imei,
)
- clients.tostop.add(sk)
+ clients.stop(sk)
respmsg = pmod.inline_response(packet)
if respmsg is not None:
tosend.append(
packet=zmsg.packet,
).packed
)
- for fd in clients.tostop:
+ for fd in pollingfds - clients.fds():
poller.unregister(fd) # type: ignore
- clients.stop(fd)
- for clntsock, clntaddr in topoll:
+ for clntsock, clntaddr in toadd:
fd = clients.add(clntsock, clntaddr)
+ for fd in clients.fds() - pollingfds:
poller.register(fd, flags=zmq.POLLIN)
+ pollingfds = clients.fds()
except KeyboardInterrupt:
zpub.close()
zpull.close()