1 """ Websocket Gateway """
3 from datetime import datetime, timezone
4 from json import dumps, loads
5 from logging import getLogger
6 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
8 from wsproto import ConnectionType, WSConnection
9 from wsproto.events import (
17 from wsproto.utilities import RemoteProtocolError
21 from .backlog import blinit, backlog
22 from .gps303proto import (
27 from .zmsg import Bcast, topic
29 log = getLogger("gps303/wsgateway")
33 def try_http(data, fd, e):
36 lines = data.decode().split("\r\n")
39 op, resource, proto = request.split(" ")
41 "HTTP %s for %s, proto %s from fd %d, headers: %s",
49 pos = resource.index("?")
50 resource = resource[:pos]
56 f"{proto} 500 No data configured\r\n"
57 f"Content-Type: text/plain\r\n\r\n"
58 f"HTML data not configure on the server\r\n".encode()
62 with open(htmlfile, "rb") as fl:
64 length = len(htmldata)
67 f"Content-Type: text/html; charset=utf-8\r\n"
68 f"Content-Length: {len(htmldata):d}\r\n\r\n"
69 ).encode("utf-8") + htmldata
72 f"{proto} 500 File not found\r\n"
73 f"Content-Type: text/plain\r\n\r\n"
74 f"HTML file could not be opened\r\n".encode()
78 f"{proto} 404 File not found\r\n"
79 f"Content-Type: text/plain\r\n\r\n"
80 f'We can only serve "/"\r\n'.encode()
84 f"{proto} 400 Bad request\r\n"
85 "Content-Type: text/plain\r\n\r\n"
86 "Bad request\r\n".encode()
89 log.warning("Unparseable data from fd %d: %s", fd, data)
94 """Websocket connection to the client"""
96 def __init__(self, sock, addr):
99 self.ws = WSConnection(ConnectionType.SERVER)
105 log.debug("Closing fd %d", self.sock.fileno())
110 data = self.sock.recv(4096)
113 "Reading from fd %d: %s",
117 self.ws.receive_data(None)
119 if not data: # Client has closed connection
121 "EOF reading from fd %d",
124 self.ws.receive_data(None)
127 self.ws.receive_data(data)
128 except RemoteProtocolError as e:
130 "Websocket error on fd %d, try plain http (%s)",
134 self.ws_data = try_http(data, self.sock.fileno(), e)
135 self.write() # TODO this is a hack
136 log.debug("Sending HTTP response to %d", self.sock.fileno())
140 for event in self.ws.events():
141 if isinstance(event, Request):
142 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
143 # self.ws_data += self.ws.send(event.response()) # Why not?!
144 self.ws_data += self.ws.send(AcceptConnection())
146 elif isinstance(event, (CloseConnection, Ping)):
147 log.debug("%s on fd %d", event, self.sock.fileno())
148 self.ws_data += self.ws.send(event.response())
149 elif isinstance(event, TextMessage):
150 # TODO: save imei "subscription"
151 log.debug("%s on fd %d", event, self.sock.fileno())
152 msg = loads(event.data)
154 if msg.get("type", None) == "subscribe":
155 self.imeis = set(msg.get("imei", []))
157 "subs list on fd %s is %s",
162 log.warning("%s on fd %d", event, self.sock.fileno())
165 def wants(self, imei):
167 "wants %s? set is %s on fd %d",
172 return True # TODO: check subscriptions
174 def send(self, message):
175 if self.ready and message["imei"] in self.imeis:
176 self.ws_data += self.ws.send(Message(data=dumps(message)))
181 sent = self.sock.send(self.ws_data)
182 self.ws_data = self.ws_data[sent:]
185 "Sending to fd %d: %s",
190 return bool(self.ws_data)
197 def add(self, clntsock, clntaddr):
198 fd = clntsock.fileno()
199 log.info("Start serving fd %d from %s", fd, clntaddr)
200 self.by_fd[fd] = Client(clntsock, clntaddr)
204 clnt = self.by_fd[fd]
205 log.info("Stop serving fd %d", clnt.sock.fileno())
210 clnt = self.by_fd[fd]
215 for fd, clnt in self.by_fd.items():
216 if clnt.wants(msg["imei"]):
221 def write(self, towrite):
223 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
230 for clnt in self.by_fd.values():
238 blinit(conf.get("storage", "dbfn"))
239 htmlfile = conf.get("wsgateway", "htmlfile")
241 zsub = zctx.socket(zmq.SUB)
242 zsub.connect(conf.get("collector", "publishurl"))
243 tcpl = socket(AF_INET6, SOCK_STREAM)
244 tcpl.setblocking(False)
245 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
246 tcpl.bind(("", conf.getint("wsgateway", "port")))
248 tcpfd = tcpl.fileno()
249 poller = zmq.Poller()
250 poller.register(zsub, flags=zmq.POLLIN)
251 poller.register(tcpfd, flags=zmq.POLLIN)
257 neededsubs = clients.subs()
258 for imei in neededsubs - activesubs:
261 topic(GPS_POSITIONING.PROTO, True, imei),
265 topic(WIFI_POSITIONING.PROTO, False, imei),
267 for imei in activesubs - neededsubs:
270 topic(GPS_POSITIONING.PROTO, True, imei),
274 topic(WIFI_POSITIONING.PROTO, False, imei),
276 activesubs = neededsubs
277 log.debug("Subscribed to: %s", activesubs)
282 events = poller.poll()
283 for sk, fl in events:
287 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
288 msg = parse_message(zmsg.packet, zmsg.is_incoming)
289 log.debug("Got %s with %s", zmsg, msg)
290 if isinstance(msg, GPS_POSITIONING):
294 "timestamp": str(msg.devtime),
295 "longitude": msg.longitude,
296 "latitude": msg.latitude,
299 elif isinstance(msg, WIFI_POSITIONING):
304 datetime.fromtimestamp(
306 ).astimezone(tz=timezone.utc)
308 "longitude": msg.longitude,
309 "latitude": msg.latitude,
315 clntsock, clntaddr = tcpl.accept()
316 topoll.append((clntsock, clntaddr))
317 elif fl & zmq.POLLIN:
318 received = clients.recv(sk)
320 log.debug("Client gone from fd %d", sk)
325 log.debug("Received from %d: %s", sk, msg)
326 if msg.get("type", None) == "subscribe":
327 imei = msg.get("imei")
328 numback = msg.get("backlog", 5)
330 tosend.extend(backlog(elem, numback))
332 elif fl & zmq.POLLOUT:
333 log.debug("Write now open for fd %d", sk)
337 log.debug("Stray event: %s on socket %s", fl, sk)
338 # poll queue consumed, make changes now
340 poller.unregister(fd)
343 log.debug("Sending to the clients: %s", zmsg)
344 towrite |= clients.send(zmsg)
345 for clntsock, clntaddr in topoll:
346 fd = clients.add(clntsock, clntaddr)
347 poller.register(fd, flags=zmq.POLLIN)
348 # Deal with actually writing the data out
349 trywrite = towrite - towait
350 morewait = clients.write(trywrite)
352 "towait %s, tried %s, still busy %s",
357 for fd in morewait - trywrite: # new fds waiting for write
358 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
359 for fd in trywrite - morewait: # no longer waiting for write
360 poller.modify(fd, flags=zmq.POLLIN)
363 except KeyboardInterrupt:
367 if __name__.endswith("__main__"):
368 runserver(common.init(log))