]> average.org Git - loctrkd.git/commitdiff
wsgateway aggregate subscriptions upstream
authorEugene Crosser <crosser@average.org>
Fri, 6 May 2022 22:22:07 +0000 (00:22 +0200)
committerEugene Crosser <crosser@average.org>
Fri, 6 May 2022 22:22:07 +0000 (00:22 +0200)
gps303/wsgateway.py

index f2c6596c3e6a584c1280a9d3c2025e6ee265ebae..f9f5c6a2ad936f33c9961419245c65de14414dcd 100644 (file)
@@ -220,6 +220,12 @@ class Clients:
                 waiting.add(fd)
         return waiting
 
+    def subs(self):
+        result = set()
+        for clnt in self.by_fd.values():
+            result |= clnt.imeis
+        return result
+
 
 def runserver(conf):
     global htmlfile
@@ -228,7 +234,6 @@ def runserver(conf):
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
     zsub.connect(conf.get("lookaside", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -239,9 +244,17 @@ def runserver(conf):
     poller.register(zsub, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
+    activesubs = set()
     try:
         towait = set()
         while True:
+            neededsubs = clients.subs()
+            for imei in neededsubs - activesubs:
+                zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+            for imei in activesubs - neededsubs:
+                zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+            activesubs = neededsubs
+            log.debug("Subscribed to: %s", activesubs)
             tosend = []
             topoll = []
             tostop = []