From 6eff65f7b03bc66a479df0fd694250e1e0b7c5ae Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Thu, 10 Nov 2022 23:17:36 +0100 Subject: [PATCH] Send backlog only the the ws client that requested --- loctrkd/wsgateway.py | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/loctrkd/wsgateway.py b/loctrkd/wsgateway.py index e568584..e94845e 100644 --- a/loctrkd/wsgateway.py +++ b/loctrkd/wsgateway.py @@ -209,16 +209,28 @@ class Clients: clnt.close() del self.by_fd[fd] - def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]: + def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]: clnt = self.by_fd[fd] - return clnt.recv() + return (clnt, clnt.recv()) - def send(self, msg: Dict[str, Any]) -> Set[int]: + def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]: towrite = set() - for fd, clnt in self.by_fd.items(): - if clnt.wants(msg["imei"]): + if clnt is None: + for fd, cl in self.by_fd.items(): + if cl.wants(msg["imei"]): + cl.send(msg) + towrite.add(fd) + else: + fd = clnt.sock.fileno() + if self.by_fd.get(fd, None) == clnt: clnt.send(msg) towrite.add(fd) + else: + log.info( + "Trying to send %s to client at %d, not in service", + msg, + fd, + ) return towrite def write(self, towrite: Set[int]) -> Set[int]: @@ -264,7 +276,7 @@ def runserver(conf: ConfigParser) -> None: zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei)) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) - tosend = [] + tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = [] topoll = [] tostop = [] towrite = set() @@ -277,14 +289,14 @@ def runserver(conf: ConfigParser) -> None: msg = loads(zmsg.payload) msg["imei"] = zmsg.imei log.debug("Got %s, sending %s", zmsg, msg) - tosend.append(msg) + tosend.append((None, msg)) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) elif fl & zmq.POLLIN: - received = clients.recv(sk) + clnt, received = clients.recv(sk) if received is None: log.debug("Client gone from fd %d", sk) tostop.append(sk) @@ -297,7 +309,12 @@ def runserver(conf: ConfigParser) -> None: imeis = cast(List[str], wsmsg.get("imei")) numback: int = wsmsg.get("backlog", 5) for imei in imeis: - tosend.extend(backlog(imei, numback)) + tosend.extend( + [ + (clnt, msg) + for msg in backlog(imei, numback) + ] + ) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk) @@ -309,9 +326,9 @@ def runserver(conf: ConfigParser) -> None: for fd in tostop: poller.unregister(fd) # type: ignore clients.stop(fd) - for wsmsg in tosend: - log.debug("Sending to the clients: %s", wsmsg) - towrite |= clients.send(wsmsg) + for towhom, wsmsg in tosend: + log.debug("Sending to the client %s: %s", towhom, wsmsg) + towrite |= clients.send(towhom, wsmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN) -- 2.43.0