1 """ Websocket Gateway """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from json import dumps, loads
7 from logging import getLogger
8 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
10 from typing import Any, cast, Dict, List, Optional, Set, Tuple
11 from wsproto import ConnectionType, WSConnection
12 from wsproto.events import (
21 from wsproto.utilities import RemoteProtocolError
25 from .evstore import initdb, fetch
26 from .zmsg import Bcast, topic
28 log = getLogger("loctrkd/wsgateway")
33 def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
37 def exposed_protos() -> List[Tuple[str, bool]]:
41 def proto_handled(proto: str) -> bool:
46 pmods: List[ProtoModule] = []
47 selector: List[Tuple[bool, str]] = []
50 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
52 for is_incoming, timestamp, proto, packet in fetch(
58 if pmod.proto_handled(proto):
59 msg = pmod.parse_message(packet, is_incoming=is_incoming)
65 datetime.fromtimestamp(timestamp).astimezone(
69 "longitude": msg.longitude,
70 "latitude": msg.latitude,
72 if True # TODO isinstance(msg, GPS_POSITIONING)
79 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
82 lines = data.decode().split("\r\n")
85 op, resource, proto = request.split(" ")
87 "HTTP %s for %s, proto %s from fd %d, headers: %s",
97 f"{proto} 500 No data configured\r\n"
98 f"Content-Type: text/plain\r\n\r\n"
99 f"HTML data not configured on the server\r\n".encode()
103 with open(htmlfile, "rb") as fl:
105 length = len(htmldata)
107 f"{proto} 200 Ok\r\n"
108 f"Content-Type: text/html; charset=utf-8\r\n"
109 f"Content-Length: {len(htmldata):d}\r\n\r\n"
110 ).encode("utf-8") + htmldata
113 f"{proto} 500 File not found\r\n"
114 f"Content-Type: text/plain\r\n\r\n"
115 f"HTML file could not be opened\r\n".encode()
119 f"{proto} 400 Bad request\r\n"
120 "Content-Type: text/plain\r\n\r\n"
121 "Bad request\r\n".encode()
124 log.warning("Unparseable data from fd %d: %s", fd, data)
129 """Websocket connection to the client"""
131 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
134 self.ws = WSConnection(ConnectionType.SERVER)
137 self.imeis: Set[str] = set()
139 def close(self) -> None:
140 log.debug("Closing fd %d", self.sock.fileno())
143 def recv(self) -> Optional[List[Dict[str, Any]]]:
145 data = self.sock.recv(4096)
148 "Reading from fd %d: %s",
152 self.ws.receive_data(None)
154 if not data: # Client has closed connection
156 "EOF reading from fd %d",
159 self.ws.receive_data(None)
162 self.ws.receive_data(data)
163 except RemoteProtocolError as e:
165 "Websocket error on fd %d, try plain http (%s)",
169 self.ws_data = try_http(data, self.sock.fileno(), e)
170 # this `write` is a hack - writing _ought_ to be done at the
171 # stage when all other writes are performed. But I could not
172 # arrange it so in a logical way. Let it stay this way. The
173 # whole http server affair is a hack anyway.
175 log.debug("Sending HTTP response to %d", self.sock.fileno())
179 for event in self.ws.events():
180 if isinstance(event, Request):
181 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
182 # self.ws_data += self.ws.send(event.response()) # Why not?!
183 self.ws_data += self.ws.send(AcceptConnection())
185 elif isinstance(event, (CloseConnection, Ping)):
186 log.debug("%s on fd %d", event, self.sock.fileno())
187 self.ws_data += self.ws.send(event.response())
188 elif isinstance(event, TextMessage):
189 log.debug("%s on fd %d", event, self.sock.fileno())
190 msg = loads(event.data)
192 if msg.get("type", None) == "subscribe":
193 self.imeis = set(msg.get("imei", []))
195 "subs list on fd %s is %s",
200 log.warning("%s on fd %d", event, self.sock.fileno())
203 def wants(self, imei: str) -> bool:
205 "wants %s? set is %s on fd %d",
210 return imei in self.imeis
212 def send(self, message: Dict[str, Any]) -> None:
213 if self.ready and message["imei"] in self.imeis:
214 self.ws_data += self.ws.send(Message(data=dumps(message)))
216 def write(self) -> bool:
219 sent = self.sock.send(self.ws_data)
220 self.ws_data = self.ws_data[sent:]
223 "Sending to fd %d: %s",
228 return bool(self.ws_data)
232 def __init__(self) -> None:
233 self.by_fd: Dict[int, Client] = {}
235 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
236 fd = clntsock.fileno()
237 log.info("Start serving fd %d from %s", fd, clntaddr)
238 self.by_fd[fd] = Client(clntsock, clntaddr)
241 def stop(self, fd: int) -> None:
242 clnt = self.by_fd[fd]
243 log.info("Stop serving fd %d", clnt.sock.fileno())
247 def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
248 clnt = self.by_fd[fd]
251 def send(self, msg: Dict[str, Any]) -> Set[int]:
253 for fd, clnt in self.by_fd.items():
254 if clnt.wants(msg["imei"]):
259 def write(self, towrite: Set[int]) -> Set[int]:
261 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
262 if clnt and clnt.write():
266 def subs(self) -> Set[str]:
268 for clnt in self.by_fd.values():
273 def runserver(conf: ConfigParser) -> None:
274 global htmlfile, pmods, selector
276 cast(ProtoModule, import_module("." + modnm, __package__))
277 for modnm in conf.get("collector", "protocols").split(",")
280 for proto, is_incoming in pmod.exposed_protos():
281 if proto != "ZX:STATUS": # TODO make it better
282 selector.append((is_incoming, proto))
283 initdb(conf.get("storage", "dbfn"))
284 htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
285 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
286 zctx = zmq.Context() # type: ignore
287 zsub = zctx.socket(zmq.SUB) # type: ignore
288 zsub.connect(conf.get("collector", "publishurl"))
289 tcpl = socket(AF_INET6, SOCK_STREAM)
290 tcpl.setblocking(False)
291 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
292 tcpl.bind(("", conf.getint("wsgateway", "port")))
294 tcpfd = tcpl.fileno()
295 poller = zmq.Poller() # type: ignore
296 poller.register(zsub, flags=zmq.POLLIN)
297 poller.register(tcpfd, flags=zmq.POLLIN)
299 activesubs: Set[str] = set()
301 towait: Set[int] = set()
303 neededsubs = clients.subs()
305 for proto, is_incoming in pmod.exposed_protos():
306 for imei in neededsubs - activesubs:
309 topic(proto, is_incoming, imei),
311 for imei in activesubs - neededsubs:
314 topic(proto, is_incoming, imei),
316 activesubs = neededsubs
317 log.debug("Subscribed to: %s", activesubs)
322 events = poller.poll()
323 for sk, fl in events:
327 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
329 if pmod.proto_handled(zmsg.proto):
330 msg = pmod.parse_message(
331 zmsg.packet, zmsg.is_incoming
333 log.debug("Got %s with %s", zmsg, msg)
334 if zmsg.proto == "ZX:STATUS":
340 datetime.fromtimestamp(
342 ).astimezone(tz=timezone.utc)
353 datetime.fromtimestamp(
355 ).astimezone(tz=timezone.utc)
357 "longitude": msg.longitude,
358 "latitude": msg.latitude,
367 clntsock, clntaddr = tcpl.accept()
368 topoll.append((clntsock, clntaddr))
369 elif fl & zmq.POLLIN:
370 received = clients.recv(sk)
372 log.debug("Client gone from fd %d", sk)
376 for wsmsg in received:
377 log.debug("Received from %d: %s", sk, wsmsg)
378 if wsmsg.get("type", None) == "subscribe":
379 # Have to live w/o typeckeding from json
380 imeis = cast(List[str], wsmsg.get("imei"))
381 numback: int = wsmsg.get("backlog", 5)
383 tosend.extend(backlog(imei, numback))
385 elif fl & zmq.POLLOUT:
386 log.debug("Write now open for fd %d", sk)
390 log.debug("Stray event: %s on socket %s", fl, sk)
391 # poll queue consumed, make changes now
393 poller.unregister(fd) # type: ignore
396 log.debug("Sending to the clients: %s", wsmsg)
397 towrite |= clients.send(wsmsg)
398 for clntsock, clntaddr in topoll:
399 fd = clients.add(clntsock, clntaddr)
400 poller.register(fd, flags=zmq.POLLIN)
401 # Deal with actually writing the data out
402 trywrite = towrite - towait
403 morewait = clients.write(trywrite)
405 "towait %s, tried %s, still busy %s",
410 for fd in morewait - trywrite: # new fds waiting for write
411 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) # type: ignore
412 for fd in trywrite - morewait: # no longer waiting for write
413 poller.modify(fd, flags=zmq.POLLIN) # type: ignore
416 except KeyboardInterrupt:
418 zctx.destroy() # type: ignore
422 if __name__.endswith("__main__"):
423 runserver(common.init(log))