1 """ Websocket Gateway """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from json import dumps, loads
6 from logging import getLogger
7 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
9 from typing import Any, cast, Dict, List, Optional, Set, Tuple
10 from wsproto import ConnectionType, WSConnection
11 from wsproto.events import (
20 from wsproto.utilities import RemoteProtocolError
24 from .evstore import initdb, fetch
25 from .gps303proto import (
31 from .zmsg import Bcast, topic
33 log = getLogger("gps303/wsgateway")
37 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
39 for is_incoming, timestamp, packet in fetch(
41 [(True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)],
44 msg = parse_message(packet, is_incoming=is_incoming)
50 datetime.fromtimestamp(timestamp).astimezone(
54 "longitude": msg.longitude,
55 "latitude": msg.latitude,
57 if isinstance(msg, GPS_POSITIONING)
64 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
67 lines = data.decode().split("\r\n")
70 op, resource, proto = request.split(" ")
72 "HTTP %s for %s, proto %s from fd %d, headers: %s",
82 f"{proto} 500 No data configured\r\n"
83 f"Content-Type: text/plain\r\n\r\n"
84 f"HTML data not configured on the server\r\n".encode()
88 with open(htmlfile, "rb") as fl:
90 length = len(htmldata)
93 f"Content-Type: text/html; charset=utf-8\r\n"
94 f"Content-Length: {len(htmldata):d}\r\n\r\n"
95 ).encode("utf-8") + htmldata
98 f"{proto} 500 File not found\r\n"
99 f"Content-Type: text/plain\r\n\r\n"
100 f"HTML file could not be opened\r\n".encode()
104 f"{proto} 400 Bad request\r\n"
105 "Content-Type: text/plain\r\n\r\n"
106 "Bad request\r\n".encode()
109 log.warning("Unparseable data from fd %d: %s", fd, data)
114 """Websocket connection to the client"""
116 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
119 self.ws = WSConnection(ConnectionType.SERVER)
122 self.imeis: Set[str] = set()
124 def close(self) -> None:
125 log.debug("Closing fd %d", self.sock.fileno())
128 def recv(self) -> Optional[List[Dict[str, Any]]]:
130 data = self.sock.recv(4096)
133 "Reading from fd %d: %s",
137 self.ws.receive_data(None)
139 if not data: # Client has closed connection
141 "EOF reading from fd %d",
144 self.ws.receive_data(None)
147 self.ws.receive_data(data)
148 except RemoteProtocolError as e:
150 "Websocket error on fd %d, try plain http (%s)",
154 self.ws_data = try_http(data, self.sock.fileno(), e)
155 # this `write` is a hack - writing _ought_ to be done at the
156 # stage when all other writes are performed. But I could not
157 # arrange it so in a logical way. Let it stay this way. The
158 # whole http server affair is a hack anyway.
160 log.debug("Sending HTTP response to %d", self.sock.fileno())
164 for event in self.ws.events():
165 if isinstance(event, Request):
166 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
167 # self.ws_data += self.ws.send(event.response()) # Why not?!
168 self.ws_data += self.ws.send(AcceptConnection())
170 elif isinstance(event, (CloseConnection, Ping)):
171 log.debug("%s on fd %d", event, self.sock.fileno())
172 self.ws_data += self.ws.send(event.response())
173 elif isinstance(event, TextMessage):
174 log.debug("%s on fd %d", event, self.sock.fileno())
175 msg = loads(event.data)
177 if msg.get("type", None) == "subscribe":
178 self.imeis = set(msg.get("imei", []))
180 "subs list on fd %s is %s",
185 log.warning("%s on fd %d", event, self.sock.fileno())
188 def wants(self, imei: str) -> bool:
190 "wants %s? set is %s on fd %d",
195 return imei in self.imeis
197 def send(self, message: Dict[str, Any]) -> None:
198 if self.ready and message["imei"] in self.imeis:
199 self.ws_data += self.ws.send(Message(data=dumps(message)))
201 def write(self) -> bool:
204 sent = self.sock.send(self.ws_data)
205 self.ws_data = self.ws_data[sent:]
208 "Sending to fd %d: %s",
213 return bool(self.ws_data)
217 def __init__(self) -> None:
218 self.by_fd: Dict[int, Client] = {}
220 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
221 fd = clntsock.fileno()
222 log.info("Start serving fd %d from %s", fd, clntaddr)
223 self.by_fd[fd] = Client(clntsock, clntaddr)
226 def stop(self, fd: int) -> None:
227 clnt = self.by_fd[fd]
228 log.info("Stop serving fd %d", clnt.sock.fileno())
232 def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
233 clnt = self.by_fd[fd]
236 def send(self, msg: Dict[str, Any]) -> Set[int]:
238 for fd, clnt in self.by_fd.items():
239 if clnt.wants(msg["imei"]):
244 def write(self, towrite: Set[int]) -> Set[int]:
246 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
247 if clnt and clnt.write():
251 def subs(self) -> Set[str]:
253 for clnt in self.by_fd.values():
258 def runserver(conf: ConfigParser) -> None:
261 initdb(conf.get("storage", "dbfn"))
262 htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
263 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
264 zctx = zmq.Context() # type: ignore
265 zsub = zctx.socket(zmq.SUB) # type: ignore
266 zsub.connect(conf.get("collector", "publishurl"))
267 tcpl = socket(AF_INET6, SOCK_STREAM)
268 tcpl.setblocking(False)
269 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
270 tcpl.bind(("", conf.getint("wsgateway", "port")))
272 tcpfd = tcpl.fileno()
273 poller = zmq.Poller() # type: ignore
274 poller.register(zsub, flags=zmq.POLLIN)
275 poller.register(tcpfd, flags=zmq.POLLIN)
277 activesubs: Set[str] = set()
279 towait: Set[int] = set()
281 neededsubs = clients.subs()
282 for imei in neededsubs - activesubs:
285 topic(GPS_POSITIONING.PROTO, True, imei),
289 topic(WIFI_POSITIONING.PROTO, False, imei),
293 topic(STATUS.PROTO, True, imei),
295 for imei in activesubs - neededsubs:
298 topic(GPS_POSITIONING.PROTO, True, imei),
302 topic(WIFI_POSITIONING.PROTO, False, imei),
306 topic(STATUS.PROTO, True, imei),
308 activesubs = neededsubs
309 log.debug("Subscribed to: %s", activesubs)
314 events = poller.poll()
315 for sk, fl in events:
319 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
320 msg = parse_message(zmsg.packet, zmsg.is_incoming)
321 log.debug("Got %s with %s", zmsg, msg)
322 if isinstance(msg, STATUS):
328 datetime.fromtimestamp(
330 ).astimezone(tz=timezone.utc)
341 datetime.fromtimestamp(
343 ).astimezone(tz=timezone.utc)
345 "longitude": msg.longitude,
346 "latitude": msg.latitude,
355 clntsock, clntaddr = tcpl.accept()
356 topoll.append((clntsock, clntaddr))
357 elif fl & zmq.POLLIN:
358 received = clients.recv(sk)
360 log.debug("Client gone from fd %d", sk)
364 for wsmsg in received:
365 log.debug("Received from %d: %s", sk, wsmsg)
366 if wsmsg.get("type", None) == "subscribe":
367 # Have to live w/o typeckeding from json
368 imeis = cast(List[str], wsmsg.get("imei"))
369 numback: int = wsmsg.get("backlog", 5)
371 tosend.extend(backlog(imei, numback))
373 elif fl & zmq.POLLOUT:
374 log.debug("Write now open for fd %d", sk)
378 log.debug("Stray event: %s on socket %s", fl, sk)
379 # poll queue consumed, make changes now
381 poller.unregister(fd) # type: ignore
384 log.debug("Sending to the clients: %s", wsmsg)
385 towrite |= clients.send(wsmsg)
386 for clntsock, clntaddr in topoll:
387 fd = clients.add(clntsock, clntaddr)
388 poller.register(fd, flags=zmq.POLLIN)
389 # Deal with actually writing the data out
390 trywrite = towrite - towait
391 morewait = clients.write(trywrite)
393 "towait %s, tried %s, still busy %s",
398 for fd in morewait - trywrite: # new fds waiting for write
399 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) # type: ignore
400 for fd in trywrite - morewait: # no longer waiting for write
401 poller.modify(fd, flags=zmq.POLLIN) # type: ignore
404 except KeyboardInterrupt:
406 zctx.destroy() # type: ignore
410 if __name__.endswith("__main__"):
411 runserver(common.init(log))