X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=79937d8e5b1f5d1835e1dfab8a627f2844701f31;hb=311d3cc7b0692e66edb9b9bb9285b2bfc094d571;hp=00770ebd88032c35f9b9e7172dba3c63233d7434;hpb=d6c20543cf997f20cb72bf7380674bf012a5af88;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 00770eb..79937d8 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,6 +1,7 @@ """ Websocket Gateway """ -from json import loads +from datetime import datetime, timezone +from json import dumps, loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -17,7 +18,7 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .backlog import blinit, backlog +from .evstore import initdb, fetch from .gps303proto import ( GPS_POSITIONING, WIFI_POSITIONING, @@ -29,6 +30,29 @@ log = getLogger("gps303/wsgateway") htmlfile = None +def backlog(imei, numback): + result = [] + for is_incoming, timestamp, packet in fetch( + imei, + ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)), + numback, + ): + msg = parse_message(packet, is_incoming=is_incoming) + result.append( + { + "imei": imei, + "timestamp": str( + datetime.fromtimestamp(timestamp).astimezone( + tz=timezone.utc + ) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + } + ) + return result + + def try_http(data, fd, e): global htmlfile try: @@ -171,8 +195,8 @@ class Client: return True # TODO: check subscriptions def send(self, message): - if self.ready and message.imei in self.imeis: - self.ws_data += self.ws.send(Message(data=message.json)) + if self.ready and message["imei"] in self.imeis: + self.ws_data += self.ws.send(Message(data=dumps(message))) def write(self): if self.ws_data: @@ -212,7 +236,7 @@ class Clients: def send(self, msg): towrite = set() for fd, clnt in self.by_fd.items(): - if clnt.wants(msg.imei): + if clnt.wants(msg["imei"]): clnt.send(msg) towrite.add(fd) return towrite @@ -234,7 +258,7 @@ class Clients: def runserver(conf): global htmlfile - blinit(conf.get("storage", "dbfn")) + initdb(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) @@ -255,7 +279,6 @@ def runserver(conf): while True: neededsubs = clients.subs() for imei in neededsubs - activesubs: - log.debug("topics: %s", [tpc.hex() for tpc in [topic(GPS_POSITIONING.PROTO, True, imei), topic(WIFI_POSITIONING.PROTO, False, imei)]]) zsub.setsockopt( zmq.SUBSCRIBE, topic(GPS_POSITIONING.PROTO, True, imei), @@ -284,13 +307,21 @@ def runserver(conf): if sk is zsub: while True: try: - buf = zsub.recv(zmq.NOBLOCK) - zmsg = Bcast(buf) - log.debug("zmq packet: %s", buf.hex()) - # zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) - msg = parse_message(zmsg.packet) - tosend.append(zmsg) - log.debug("Got %s", zmsg) + zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) + msg = parse_message(zmsg.packet, zmsg.is_incoming) + log.debug("Got %s with %s", zmsg, msg) + tosend.append( + { + "imei": zmsg.imei, + "timestamp": str( + datetime.fromtimestamp( + zmsg.when + ).astimezone(tz=timezone.utc) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + } + ) except zmq.Again: break elif sk == tcpfd: @@ -306,10 +337,10 @@ def runserver(conf): for msg in received: log.debug("Received from %d: %s", sk, msg) if msg.get("type", None) == "subscribe": - imei = msg.get("imei") + imeis = msg.get("imei") numback = msg.get("backlog", 5) - for elem in imei: - tosend.extend(backlog(elem, numback)) + for imei in imeis: + tosend.extend(backlog(imei, numback)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)