""" Websocket Gateway """
+from datetime import datetime, timezone
+from json import dumps, loads
from logging import getLogger
from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
from wsproto import ConnectionType, WSConnection
-from wsproto.events import AcceptConnection, CloseConnection, Message, Ping, Request, TextMessage
+from wsproto.events import (
+ AcceptConnection,
+ CloseConnection,
+ Message,
+ Ping,
+ Request,
+ TextMessage,
+)
+from wsproto.utilities import RemoteProtocolError
import zmq
from . import common
-from .zmsg import LocEvt
+from .evstore import initdb, fetch
+from .gps303proto import (
+ GPS_POSITIONING,
+ WIFI_POSITIONING,
+ parse_message,
+)
+from .zmsg import Bcast, topic
log = getLogger("gps303/wsgateway")
+htmlfile = None
+
+
+def backlog(imei, numback):
+ result = []
+ for is_incoming, timestamp, packet in fetch(
+ imei,
+ ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)),
+ numback,
+ ):
+ msg = parse_message(packet, is_incoming=is_incoming)
+ result.append(
+ {
+ "imei": imei,
+ "timestamp": str(
+ datetime.fromtimestamp(timestamp).astimezone(
+ tz=timezone.utc
+ )
+ ),
+ "longitude": msg.longitude,
+ "latitude": msg.latitude,
+ }
+ )
+ return result
+
+
+def try_http(data, fd, e):
+ global htmlfile
+ try:
+ lines = data.decode().split("\r\n")
+ request = lines[0]
+ headers = lines[1:]
+ op, resource, proto = request.split(" ")
+ log.debug(
+ "HTTP %s for %s, proto %s from fd %d, headers: %s",
+ op,
+ resource,
+ proto,
+ fd,
+ headers,
+ )
+ try:
+ pos = resource.index("?")
+ resource = resource[:pos]
+ except ValueError:
+ pass
+ if op == "GET":
+ if htmlfile is None:
+ return (
+ f"{proto} 500 No data configured\r\n"
+ f"Content-Type: text/plain\r\n\r\n"
+ f"HTML data not configure on the server\r\n".encode()
+ )
+ elif resource == "/":
+ try:
+ with open(htmlfile, "rb") as fl:
+ htmldata = fl.read()
+ length = len(htmldata)
+ return (
+ f"{proto} 200 Ok\r\n"
+ f"Content-Type: text/html; charset=utf-8\r\n"
+ f"Content-Length: {len(htmldata):d}\r\n\r\n"
+ ).encode("utf-8") + htmldata
+ except OSError:
+ return (
+ f"{proto} 500 File not found\r\n"
+ f"Content-Type: text/plain\r\n\r\n"
+ f"HTML file could not be opened\r\n".encode()
+ )
+ else:
+ return (
+ f"{proto} 404 File not found\r\n"
+ f"Content-Type: text/plain\r\n\r\n"
+ f'We can only serve "/"\r\n'.encode()
+ )
+ else:
+ return (
+ f"{proto} 400 Bad request\r\n"
+ "Content-Type: text/plain\r\n\r\n"
+ "Bad request\r\n".encode()
+ )
+ except ValueError:
+ log.warning("Unparseable data from fd %d: %s", fd, data)
+ raise e
class Client:
self.addr = addr
self.ws = WSConnection(ConnectionType.SERVER)
self.ws_data = b""
+ self.ready = False
+ self.imeis = set()
def close(self):
log.debug("Closing fd %d", self.sock.fileno())
)
self.ws.receive_data(None)
return None
- self.ws.receive_data(data)
- msgs = []
- for event in self.ws.events():
- if isinstance(event, Request):
- log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
- #self.ws_data += self.ws.send(event.response()) # Why not?!
- self.ws_data += self.ws.send(AcceptConnection())
- elif isinstance(event, (CloseConnection, Ping)):
- log.debug("%s on fd %d", event, self.sock.fileno())
- self.ws_data += self.ws.send(event.response())
- elif isinstance(event, TextMessage):
- # TODO: save imei "subscription"
- log.debug("%s on fd %d", event, self.sock.fileno())
- msgs.append(event.data)
- else:
- log.warning("%s on fd %d", event, self.sock.fileno())
- if self.ws_data: # Temp hack
- self.write()
- return msgs
-
- def send(self, imei, message):
- # TODO: filter only wanted imei got from the client
- self.ws_data += self.ws.send(Message(data=message))
-
- def write(self):
try:
- sent = self.sock.send(self.ws_data)
- self.ws_data = self.ws_data[sent:]
- except OSError as e:
- log.error(
- "Sending to fd %d (IMEI %s): %s",
+ self.ws.receive_data(data)
+ except RemoteProtocolError as e:
+ log.debug(
+ "Websocket error on fd %d, try plain http (%s)",
self.sock.fileno(),
- self.imei,
e,
)
- self.ws_data = b""
+ self.ws_data = try_http(data, self.sock.fileno(), e)
+ self.write() # TODO this is a hack
+ log.debug("Sending HTTP response to %d", self.sock.fileno())
+ msgs = None
+ else:
+ msgs = []
+ for event in self.ws.events():
+ if isinstance(event, Request):
+ log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
+ # self.ws_data += self.ws.send(event.response()) # Why not?!
+ self.ws_data += self.ws.send(AcceptConnection())
+ self.ready = True
+ elif isinstance(event, (CloseConnection, Ping)):
+ log.debug("%s on fd %d", event, self.sock.fileno())
+ self.ws_data += self.ws.send(event.response())
+ elif isinstance(event, TextMessage):
+ # TODO: save imei "subscription"
+ log.debug("%s on fd %d", event, self.sock.fileno())
+ msg = loads(event.data)
+ msgs.append(msg)
+ if msg.get("type", None) == "subscribe":
+ self.imeis = set(msg.get("imei", []))
+ log.debug(
+ "subs list on fd %s is %s",
+ self.sock.fileno(),
+ self.imeis,
+ )
+ else:
+ log.warning("%s on fd %d", event, self.sock.fileno())
+ return msgs
+
+ def wants(self, imei):
+ log.debug(
+ "wants %s? set is %s on fd %d",
+ imei,
+ self.imeis,
+ self.sock.fileno(),
+ )
+ return True # TODO: check subscriptions
+
+ def send(self, message):
+ if self.ready and message["imei"] in self.imeis:
+ self.ws_data += self.ws.send(Message(data=dumps(message)))
+
+ def write(self):
+ if self.ws_data:
+ try:
+ sent = self.sock.send(self.ws_data)
+ self.ws_data = self.ws_data[sent:]
+ except OSError as e:
+ log.error(
+ "Sending to fd %d: %s",
+ self.sock.fileno(),
+ e,
+ )
+ self.ws_data = b""
+ return bool(self.ws_data)
class Clients:
def recv(self, fd):
clnt = self.by_fd[fd]
- msgs = clnt.recv()
- if msgs is None:
- return None
- result = []
- for msg in msgs:
- log.debug("Received: %s", msg)
- return result
+ return clnt.recv()
+
+ def send(self, msg):
+ towrite = set()
+ for fd, clnt in self.by_fd.items():
+ if clnt.wants(msg["imei"]):
+ clnt.send(msg)
+ towrite.add(fd)
+ return towrite
- def send(self, msgs):
+ def write(self, towrite):
+ waiting = set()
+ for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
+ if clnt.write():
+ waiting.add(fd)
+ return waiting
+
+ def subs(self):
+ result = set()
for clnt in self.by_fd.values():
- clnt.send(msgs)
- clnt.write()
+ result |= clnt.imeis
+ return result
+
def runserver(conf):
+ global htmlfile
+
+ initdb(conf.get("storage", "dbfn"))
+ htmlfile = conf.get("wsgateway", "htmlfile")
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
- zsub.connect(conf.get("lookaside", "publishurl"))
- zsub.setsockopt(zmq.SUBSCRIBE, b"")
+ zsub.connect(conf.get("collector", "publishurl"))
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setblocking(False)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
poller.register(zsub, flags=zmq.POLLIN)
poller.register(tcpfd, flags=zmq.POLLIN)
clients = Clients()
+ activesubs = set()
try:
+ towait = set()
while True:
+ neededsubs = clients.subs()
+ for imei in neededsubs - activesubs:
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(GPS_POSITIONING.PROTO, True, imei),
+ )
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False, imei),
+ )
+ for imei in activesubs - neededsubs:
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(GPS_POSITIONING.PROTO, True, imei),
+ )
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False, imei),
+ )
+ activesubs = neededsubs
+ log.debug("Subscribed to: %s", activesubs)
tosend = []
topoll = []
tostop = []
- events = poller.poll(1000)
+ towrite = set()
+ events = poller.poll()
for sk, fl in events:
if sk is zsub:
while True:
try:
- msg = zsub.recv(zmq.NOBLOCK)
- tosend.append(LocEvt(msg))
+ zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
+ msg = parse_message(zmsg.packet, zmsg.is_incoming)
+ log.debug("Got %s with %s", zmsg, msg)
+ tosend.append(
+ {
+ "imei": zmsg.imei,
+ "timestamp": str(
+ datetime.fromtimestamp(
+ zmsg.when
+ ).astimezone(tz=timezone.utc)
+ ),
+ "longitude": msg.longitude,
+ "latitude": msg.latitude,
+ }
+ )
except zmq.Again:
break
elif sk == tcpfd:
elif fl & zmq.POLLIN:
received = clients.recv(sk)
if received is None:
- log.debug(
- "Client gone from fd %d", sk
- )
+ log.debug("Client gone from fd %d", sk)
tostop.append(sk)
+ towait.discard(fd)
else:
for msg in received:
log.debug("Received from %d: %s", sk, msg)
+ if msg.get("type", None) == "subscribe":
+ imeis = msg.get("imei")
+ numback = msg.get("backlog", 5)
+ for imei in imeis:
+ tosend.extend(backlog(imei, numback))
+ towrite.add(sk)
+ elif fl & zmq.POLLOUT:
+ log.debug("Write now open for fd %d", sk)
+ towrite.add(sk)
+ towait.discard(sk)
else:
log.debug("Stray event: %s on socket %s", fl, sk)
# poll queue consumed, make changes now
poller.unregister(fd)
clients.stop(fd)
for zmsg in tosend:
- log.debug("Sending to the client: %s", zmsg)
- clients.send(zmsg)
+ log.debug("Sending to the clients: %s", zmsg)
+ towrite |= clients.send(zmsg)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
poller.register(fd, flags=zmq.POLLIN)
- # TODO: Handle write overruns (register for POLLOUT)
+ # Deal with actually writing the data out
+ trywrite = towrite - towait
+ morewait = clients.write(trywrite)
+ log.debug(
+ "towait %s, tried %s, still busy %s",
+ towait,
+ trywrite,
+ morewait,
+ )
+ for fd in morewait - trywrite: # new fds waiting for write
+ poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
+ for fd in trywrite - morewait: # no longer waiting for write
+ poller.modify(fd, flags=zmq.POLLIN)
+ towait &= trywrite
+ towait |= morewait
except KeyboardInterrupt:
pass