X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=8a5af7f43ec37c94d1500c9ab3a46bf934991e33;hb=ccfb47bba216d7f1b99f1d2e4f488afec4c37738;hp=f2c6596c3e6a584c1280a9d3c2025e6ee265ebae;hpb=88213d8cfd248de750f53ec11feb30473818ac13;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index f2c6596..8a5af7f 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -17,6 +17,7 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common +from .backlog import blinit, backlog from .zmsg import LocEvt log = getLogger("gps303/wsgateway") @@ -196,14 +197,7 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] - msgs = clnt.recv() - if msgs is None: - return None - result = [] - for msg in msgs: - log.debug("Received: %s", msg) - result.append(msg) - return result + return clnt.recv() def send(self, msg): towrite = set() @@ -220,15 +214,21 @@ class Clients: waiting.add(fd) return waiting + def subs(self): + result = set() + for clnt in self.by_fd.values(): + result |= clnt.imeis + return result + def runserver(conf): global htmlfile + blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) zsub.connect(conf.get("lookaside", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -239,9 +239,17 @@ def runserver(conf): poller.register(zsub, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() + activesubs = set() try: towait = set() while True: + neededsubs = clients.subs() + for imei in neededsubs - activesubs: + zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + for imei in activesubs - neededsubs: + zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + activesubs = neededsubs + log.debug("Subscribed to: %s", activesubs) tosend = [] topoll = [] tostop = [] @@ -267,6 +275,11 @@ def runserver(conf): else: for msg in received: log.debug("Received from %d: %s", sk, msg) + if msg.get("type", None) == "subscribe": + imei = msg.get("imei") + numback = msg.get("backlog", 5) + for elem in imei: + tosend.extend(backlog(elem, numback)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)