X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=80926fbceb53b381d067eb2560d82cbeb2aa9aad;hb=5e1e7a4d37a1e149d5e899dada7b55a863cd8e64;hp=7756fe9d52ae10d0bcdb8012869ac7e78f3393b9;hpb=96538346bd332d76d2cac5d6a0ef2b4e4a40de30;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 7756fe9..80926fb 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -18,7 +18,12 @@ import zmq from . import common from .backlog import blinit, backlog -from .zmsg import LocEvt +from .gps303proto import ( + GPS_POSITIONING, + WIFI_POSITIONING, + parse_message, +) +from .zmsg import Bcast, topic log = getLogger("gps303/wsgateway") htmlfile = None @@ -157,7 +162,12 @@ class Client: return msgs def wants(self, imei): - log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno()) + log.debug( + "wants %s? set is %s on fd %d", + imei, + self.imeis, + self.sock.fileno(), + ) return True # TODO: check subscriptions def send(self, message): @@ -224,11 +234,11 @@ class Clients: def runserver(conf): global htmlfile - blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn")) + blinit(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("lookaside", "publishurl")) + zsub.connect(conf.get("collector", "publishurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -245,9 +255,23 @@ def runserver(conf): while True: neededsubs = clients.subs() for imei in neededsubs - activesubs: - zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True), + ) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False), + ) for imei in activesubs - neededsubs: - zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True), + ) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False), + ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) tosend = [] @@ -259,8 +283,10 @@ def runserver(conf): if sk is zsub: while True: try: - zmsg = LocEvt(zsub.recv(zmq.NOBLOCK)) + zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) + msg = parse_message(zmsg.packet) tosend.append(zmsg) + log.debug("Got %s", zmsg) except zmq.Again: break elif sk == tcpfd: @@ -277,8 +303,9 @@ def runserver(conf): log.debug("Received from %d: %s", sk, msg) if msg.get("type", None) == "subscribe": imei = msg.get("imei") - if imei: - tosend.extend(backlog(imei[0], 5)) + numback = msg.get("backlog", 5) + for elem in imei: + tosend.extend(backlog(elem, numback)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)