1 """ Websocket Gateway """
3 from datetime import datetime, timezone
4 from json import dumps, loads
5 from logging import getLogger
6 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
8 from wsproto import ConnectionType, WSConnection
9 from wsproto.events import (
17 from wsproto.utilities import RemoteProtocolError
21 from .evstore import initdb, fetch
22 from .gps303proto import (
28 from .zmsg import Bcast, topic
30 log = getLogger("gps303/wsgateway")
34 def backlog(imei, numback):
36 for is_incoming, timestamp, packet in fetch(
38 ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)),
41 msg = parse_message(packet, is_incoming=is_incoming)
47 datetime.fromtimestamp(timestamp).astimezone(
51 "longitude": msg.longitude,
52 "latitude": msg.latitude,
54 if isinstance(msg, GPS_POSITIONING)
61 def try_http(data, fd, e):
64 lines = data.decode().split("\r\n")
67 op, resource, proto = request.split(" ")
69 "HTTP %s for %s, proto %s from fd %d, headers: %s",
79 f"{proto} 500 No data configured\r\n"
80 f"Content-Type: text/plain\r\n\r\n"
81 f"HTML data not configured on the server\r\n".encode()
85 with open(htmlfile, "rb") as fl:
87 length = len(htmldata)
90 f"Content-Type: text/html; charset=utf-8\r\n"
91 f"Content-Length: {len(htmldata):d}\r\n\r\n"
92 ).encode("utf-8") + htmldata
95 f"{proto} 500 File not found\r\n"
96 f"Content-Type: text/plain\r\n\r\n"
97 f"HTML file could not be opened\r\n".encode()
101 f"{proto} 400 Bad request\r\n"
102 "Content-Type: text/plain\r\n\r\n"
103 "Bad request\r\n".encode()
106 log.warning("Unparseable data from fd %d: %s", fd, data)
111 """Websocket connection to the client"""
113 def __init__(self, sock, addr):
116 self.ws = WSConnection(ConnectionType.SERVER)
122 log.debug("Closing fd %d", self.sock.fileno())
127 data = self.sock.recv(4096)
130 "Reading from fd %d: %s",
134 self.ws.receive_data(None)
136 if not data: # Client has closed connection
138 "EOF reading from fd %d",
141 self.ws.receive_data(None)
144 self.ws.receive_data(data)
145 except RemoteProtocolError as e:
147 "Websocket error on fd %d, try plain http (%s)",
151 self.ws_data = try_http(data, self.sock.fileno(), e)
152 self.write() # TODO this is a hack
153 log.debug("Sending HTTP response to %d", self.sock.fileno())
157 for event in self.ws.events():
158 if isinstance(event, Request):
159 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
160 # self.ws_data += self.ws.send(event.response()) # Why not?!
161 self.ws_data += self.ws.send(AcceptConnection())
163 elif isinstance(event, (CloseConnection, Ping)):
164 log.debug("%s on fd %d", event, self.sock.fileno())
165 self.ws_data += self.ws.send(event.response())
166 elif isinstance(event, TextMessage):
167 log.debug("%s on fd %d", event, self.sock.fileno())
168 msg = loads(event.data)
170 if msg.get("type", None) == "subscribe":
171 self.imeis = set(msg.get("imei", []))
173 "subs list on fd %s is %s",
178 log.warning("%s on fd %d", event, self.sock.fileno())
181 def wants(self, imei):
183 "wants %s? set is %s on fd %d",
188 return imei in self.imeis
190 def send(self, message):
191 if self.ready and message["imei"] in self.imeis:
192 self.ws_data += self.ws.send(Message(data=dumps(message)))
197 sent = self.sock.send(self.ws_data)
198 self.ws_data = self.ws_data[sent:]
201 "Sending to fd %d: %s",
206 return bool(self.ws_data)
213 def add(self, clntsock, clntaddr):
214 fd = clntsock.fileno()
215 log.info("Start serving fd %d from %s", fd, clntaddr)
216 self.by_fd[fd] = Client(clntsock, clntaddr)
220 clnt = self.by_fd[fd]
221 log.info("Stop serving fd %d", clnt.sock.fileno())
226 clnt = self.by_fd[fd]
231 for fd, clnt in self.by_fd.items():
232 if clnt.wants(msg["imei"]):
237 def write(self, towrite):
239 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
246 for clnt in self.by_fd.values():
254 initdb(conf.get("storage", "dbfn"))
255 htmlfile = conf.get("wsgateway", "htmlfile")
257 zsub = zctx.socket(zmq.SUB)
258 zsub.connect(conf.get("collector", "publishurl"))
259 tcpl = socket(AF_INET6, SOCK_STREAM)
260 tcpl.setblocking(False)
261 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
262 tcpl.bind(("", conf.getint("wsgateway", "port")))
264 tcpfd = tcpl.fileno()
265 poller = zmq.Poller()
266 poller.register(zsub, flags=zmq.POLLIN)
267 poller.register(tcpfd, flags=zmq.POLLIN)
273 neededsubs = clients.subs()
274 for imei in neededsubs - activesubs:
277 topic(GPS_POSITIONING.PROTO, True, imei),
281 topic(WIFI_POSITIONING.PROTO, False, imei),
285 topic(STATUS.PROTO, True, imei),
287 for imei in activesubs - neededsubs:
290 topic(GPS_POSITIONING.PROTO, True, imei),
294 topic(WIFI_POSITIONING.PROTO, False, imei),
298 topic(STATUS.PROTO, True, imei),
300 activesubs = neededsubs
301 log.debug("Subscribed to: %s", activesubs)
306 events = poller.poll()
307 for sk, fl in events:
311 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
312 msg = parse_message(zmsg.packet, zmsg.is_incoming)
313 log.debug("Got %s with %s", zmsg, msg)
314 if isinstance(msg, STATUS):
320 datetime.fromtimestamp(
322 ).astimezone(tz=timezone.utc)
333 datetime.fromtimestamp(
335 ).astimezone(tz=timezone.utc)
337 "longitude": msg.longitude,
338 "latitude": msg.latitude,
347 clntsock, clntaddr = tcpl.accept()
348 topoll.append((clntsock, clntaddr))
349 elif fl & zmq.POLLIN:
350 received = clients.recv(sk)
352 log.debug("Client gone from fd %d", sk)
357 log.debug("Received from %d: %s", sk, msg)
358 if msg.get("type", None) == "subscribe":
359 imeis = msg.get("imei")
360 numback = msg.get("backlog", 5)
362 tosend.extend(backlog(imei, numback))
364 elif fl & zmq.POLLOUT:
365 log.debug("Write now open for fd %d", sk)
369 log.debug("Stray event: %s on socket %s", fl, sk)
370 # poll queue consumed, make changes now
372 poller.unregister(fd)
375 log.debug("Sending to the clients: %s", zmsg)
376 towrite |= clients.send(zmsg)
377 for clntsock, clntaddr in topoll:
378 fd = clients.add(clntsock, clntaddr)
379 poller.register(fd, flags=zmq.POLLIN)
380 # Deal with actually writing the data out
381 trywrite = towrite - towait
382 morewait = clients.write(trywrite)
384 "towait %s, tried %s, still busy %s",
389 for fd in morewait - trywrite: # new fds waiting for write
390 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
391 for fd in trywrite - morewait: # no longer waiting for write
392 poller.modify(fd, flags=zmq.POLLIN)
395 except KeyboardInterrupt:
399 if __name__.endswith("__main__"):
400 runserver(common.init(log))