X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=1843ce9b01d426aa897e4625d6ef6721bbc7421c;hb=dfde66b74725ce489425f320082aec3985e1e299;hp=80926fbceb53b381d067eb2560d82cbeb2aa9aad;hpb=5e1e7a4d37a1e149d5e899dada7b55a863cd8e64;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 80926fb..1843ce9 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,6 +1,7 @@ """ Websocket Gateway """ -from json import loads +from datetime import datetime, timezone +from json import dumps, loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -17,9 +18,10 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .backlog import blinit, backlog +from .evstore import initdb, fetch from .gps303proto import ( GPS_POSITIONING, + STATUS, WIFI_POSITIONING, parse_message, ) @@ -29,6 +31,33 @@ log = getLogger("gps303/wsgateway") htmlfile = None +def backlog(imei, numback): + result = [] + for is_incoming, timestamp, packet in fetch( + imei, + ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)), + numback, + ): + msg = parse_message(packet, is_incoming=is_incoming) + result.append( + { + "type": "location", + "imei": imei, + "timestamp": str( + datetime.fromtimestamp(timestamp).astimezone( + tz=timezone.utc + ) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + "accuracy": "gps" + if isinstance(msg, GPS_POSITIONING) + else "approximate", + } + ) + return result + + def try_http(data, fd, e): global htmlfile try: @@ -44,19 +73,14 @@ def try_http(data, fd, e): fd, headers, ) - try: - pos = resource.index("?") - resource = resource[:pos] - except ValueError: - pass if op == "GET": if htmlfile is None: return ( f"{proto} 500 No data configured\r\n" f"Content-Type: text/plain\r\n\r\n" - f"HTML data not configure on the server\r\n".encode() + f"HTML data not configured on the server\r\n".encode() ) - elif resource == "/": + else: try: with open(htmlfile, "rb") as fl: htmldata = fl.read() @@ -72,12 +96,6 @@ def try_http(data, fd, e): f"Content-Type: text/plain\r\n\r\n" f"HTML file could not be opened\r\n".encode() ) - else: - return ( - f"{proto} 404 File not found\r\n" - f"Content-Type: text/plain\r\n\r\n" - f'We can only serve "/"\r\n'.encode() - ) else: return ( f"{proto} 400 Bad request\r\n" @@ -146,7 +164,6 @@ class Client: log.debug("%s on fd %d", event, self.sock.fileno()) self.ws_data += self.ws.send(event.response()) elif isinstance(event, TextMessage): - # TODO: save imei "subscription" log.debug("%s on fd %d", event, self.sock.fileno()) msg = loads(event.data) msgs.append(msg) @@ -168,11 +185,11 @@ class Client: self.imeis, self.sock.fileno(), ) - return True # TODO: check subscriptions + return imei in self.imeis def send(self, message): - if self.ready and message.imei in self.imeis: - self.ws_data += self.ws.send(Message(data=message.json)) + if self.ready and message["imei"] in self.imeis: + self.ws_data += self.ws.send(Message(data=dumps(message))) def write(self): if self.ws_data: @@ -212,7 +229,7 @@ class Clients: def send(self, msg): towrite = set() for fd, clnt in self.by_fd.items(): - if clnt.wants(msg.imei): + if clnt.wants(msg["imei"]): clnt.send(msg) towrite.add(fd) return towrite @@ -234,7 +251,7 @@ class Clients: def runserver(conf): global htmlfile - blinit(conf.get("storage", "dbfn")) + initdb(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) @@ -257,20 +274,28 @@ def runserver(conf): for imei in neededsubs - activesubs: zsub.setsockopt( zmq.SUBSCRIBE, - topic(GPS_POSITIONING.PROTO, True), + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), ) zsub.setsockopt( zmq.SUBSCRIBE, - topic(WIFI_POSITIONING.PROTO, False), + topic(STATUS.PROTO, True, imei), ) for imei in activesubs - neededsubs: zsub.setsockopt( zmq.UNSUBSCRIBE, - topic(GPS_POSITIONING.PROTO, True), + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), ) zsub.setsockopt( zmq.UNSUBSCRIBE, - topic(WIFI_POSITIONING.PROTO, False), + topic(STATUS.PROTO, True, imei), ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) @@ -284,9 +309,38 @@ def runserver(conf): while True: try: zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) - msg = parse_message(zmsg.packet) - tosend.append(zmsg) - log.debug("Got %s", zmsg) + msg = parse_message(zmsg.packet, zmsg.is_incoming) + log.debug("Got %s with %s", zmsg, msg) + if isinstance(msg, STATUS): + tosend.append( + { + "type": "status", + "imei": zmsg.imei, + "timestamp": str( + datetime.fromtimestamp( + zmsg.when + ).astimezone(tz=timezone.utc) + ), + "battery": msg.batt, + } + ) + else: + tosend.append( + { + "type": "location", + "imei": zmsg.imei, + "timestamp": str( + datetime.fromtimestamp( + zmsg.when + ).astimezone(tz=timezone.utc) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + "accuracy": "gps" + if zmsg.is_incoming + else "approximate", + } + ) except zmq.Again: break elif sk == tcpfd: @@ -302,10 +356,10 @@ def runserver(conf): for msg in received: log.debug("Received from %d: %s", sk, msg) if msg.get("type", None) == "subscribe": - imei = msg.get("imei") + imeis = msg.get("imei") numback = msg.get("backlog", 5) - for elem in imei: - tosend.extend(backlog(elem, numback)) + for imei in imeis: + tosend.extend(backlog(imei, numback)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)