clnt.close()
del self.by_fd[fd]
- def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
+ def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
clnt = self.by_fd[fd]
- return clnt.recv()
+ return (clnt, clnt.recv())
- def send(self, msg: Dict[str, Any]) -> Set[int]:
+ def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
towrite = set()
- for fd, clnt in self.by_fd.items():
- if clnt.wants(msg["imei"]):
+ if clnt is None:
+ for fd, cl in self.by_fd.items():
+ if cl.wants(msg["imei"]):
+ cl.send(msg)
+ towrite.add(fd)
+ else:
+ fd = clnt.sock.fileno()
+ if self.by_fd.get(fd, None) == clnt:
clnt.send(msg)
towrite.add(fd)
+ else:
+ log.info(
+ "Trying to send %s to client at %d, not in service",
+ msg,
+ fd,
+ )
return towrite
def write(self, towrite: Set[int]) -> Set[int]:
zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
- tosend = []
+ tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
topoll = []
tostop = []
towrite = set()
msg = loads(zmsg.payload)
msg["imei"] = zmsg.imei
log.debug("Got %s, sending %s", zmsg, msg)
- tosend.append(msg)
+ tosend.append((None, msg))
except zmq.Again:
break
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
topoll.append((clntsock, clntaddr))
elif fl & zmq.POLLIN:
- received = clients.recv(sk)
+ clnt, received = clients.recv(sk)
if received is None:
log.debug("Client gone from fd %d", sk)
tostop.append(sk)
imeis = cast(List[str], wsmsg.get("imei"))
numback: int = wsmsg.get("backlog", 5)
for imei in imeis:
- tosend.extend(backlog(imei, numback))
+ tosend.extend(
+ [
+ (clnt, msg)
+ for msg in backlog(imei, numback)
+ ]
+ )
towrite.add(sk)
elif fl & zmq.POLLOUT:
log.debug("Write now open for fd %d", sk)
for fd in tostop:
poller.unregister(fd) # type: ignore
clients.stop(fd)
- for wsmsg in tosend:
- log.debug("Sending to the clients: %s", wsmsg)
- towrite |= clients.send(wsmsg)
+ for towhom, wsmsg in tosend:
+ log.debug("Sending to the client %s: %s", towhom, wsmsg)
+ towrite |= clients.send(towhom, wsmsg)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
poller.register(fd, flags=zmq.POLLIN)