1 """ Websocket Gateway """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from json import dumps, loads
6 from logging import getLogger
7 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
9 from typing import Any, cast, Dict, List, Optional, Set, Tuple
10 from wsproto import ConnectionType, WSConnection
11 from wsproto.events import (
20 from wsproto.utilities import RemoteProtocolError
24 from .evstore import initdb, fetch
25 from .zx303proto import (
32 from .zmsg import Bcast, topic
34 log = getLogger("loctrkd/wsgateway")
38 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
40 for is_incoming, timestamp, packet in fetch(
43 (True, proto_name(GPS_POSITIONING)),
44 (False, proto_name(WIFI_POSITIONING)),
48 msg = parse_message(packet, is_incoming=is_incoming)
54 datetime.fromtimestamp(timestamp).astimezone(
58 "longitude": msg.longitude,
59 "latitude": msg.latitude,
61 if isinstance(msg, GPS_POSITIONING)
68 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
71 lines = data.decode().split("\r\n")
74 op, resource, proto = request.split(" ")
76 "HTTP %s for %s, proto %s from fd %d, headers: %s",
86 f"{proto} 500 No data configured\r\n"
87 f"Content-Type: text/plain\r\n\r\n"
88 f"HTML data not configured on the server\r\n".encode()
92 with open(htmlfile, "rb") as fl:
94 length = len(htmldata)
97 f"Content-Type: text/html; charset=utf-8\r\n"
98 f"Content-Length: {len(htmldata):d}\r\n\r\n"
99 ).encode("utf-8") + htmldata
102 f"{proto} 500 File not found\r\n"
103 f"Content-Type: text/plain\r\n\r\n"
104 f"HTML file could not be opened\r\n".encode()
108 f"{proto} 400 Bad request\r\n"
109 "Content-Type: text/plain\r\n\r\n"
110 "Bad request\r\n".encode()
113 log.warning("Unparseable data from fd %d: %s", fd, data)
118 """Websocket connection to the client"""
120 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
123 self.ws = WSConnection(ConnectionType.SERVER)
126 self.imeis: Set[str] = set()
128 def close(self) -> None:
129 log.debug("Closing fd %d", self.sock.fileno())
132 def recv(self) -> Optional[List[Dict[str, Any]]]:
134 data = self.sock.recv(4096)
137 "Reading from fd %d: %s",
141 self.ws.receive_data(None)
143 if not data: # Client has closed connection
145 "EOF reading from fd %d",
148 self.ws.receive_data(None)
151 self.ws.receive_data(data)
152 except RemoteProtocolError as e:
154 "Websocket error on fd %d, try plain http (%s)",
158 self.ws_data = try_http(data, self.sock.fileno(), e)
159 # this `write` is a hack - writing _ought_ to be done at the
160 # stage when all other writes are performed. But I could not
161 # arrange it so in a logical way. Let it stay this way. The
162 # whole http server affair is a hack anyway.
164 log.debug("Sending HTTP response to %d", self.sock.fileno())
168 for event in self.ws.events():
169 if isinstance(event, Request):
170 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
171 # self.ws_data += self.ws.send(event.response()) # Why not?!
172 self.ws_data += self.ws.send(AcceptConnection())
174 elif isinstance(event, (CloseConnection, Ping)):
175 log.debug("%s on fd %d", event, self.sock.fileno())
176 self.ws_data += self.ws.send(event.response())
177 elif isinstance(event, TextMessage):
178 log.debug("%s on fd %d", event, self.sock.fileno())
179 msg = loads(event.data)
181 if msg.get("type", None) == "subscribe":
182 self.imeis = set(msg.get("imei", []))
184 "subs list on fd %s is %s",
189 log.warning("%s on fd %d", event, self.sock.fileno())
192 def wants(self, imei: str) -> bool:
194 "wants %s? set is %s on fd %d",
199 return imei in self.imeis
201 def send(self, message: Dict[str, Any]) -> None:
202 if self.ready and message["imei"] in self.imeis:
203 self.ws_data += self.ws.send(Message(data=dumps(message)))
205 def write(self) -> bool:
208 sent = self.sock.send(self.ws_data)
209 self.ws_data = self.ws_data[sent:]
212 "Sending to fd %d: %s",
217 return bool(self.ws_data)
221 def __init__(self) -> None:
222 self.by_fd: Dict[int, Client] = {}
224 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
225 fd = clntsock.fileno()
226 log.info("Start serving fd %d from %s", fd, clntaddr)
227 self.by_fd[fd] = Client(clntsock, clntaddr)
230 def stop(self, fd: int) -> None:
231 clnt = self.by_fd[fd]
232 log.info("Stop serving fd %d", clnt.sock.fileno())
236 def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
237 clnt = self.by_fd[fd]
240 def send(self, msg: Dict[str, Any]) -> Set[int]:
242 for fd, clnt in self.by_fd.items():
243 if clnt.wants(msg["imei"]):
248 def write(self, towrite: Set[int]) -> Set[int]:
250 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
251 if clnt and clnt.write():
255 def subs(self) -> Set[str]:
257 for clnt in self.by_fd.values():
262 def runserver(conf: ConfigParser) -> None:
265 initdb(conf.get("storage", "dbfn"))
266 htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
267 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
268 zctx = zmq.Context() # type: ignore
269 zsub = zctx.socket(zmq.SUB) # type: ignore
270 zsub.connect(conf.get("collector", "publishurl"))
271 tcpl = socket(AF_INET6, SOCK_STREAM)
272 tcpl.setblocking(False)
273 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
274 tcpl.bind(("", conf.getint("wsgateway", "port")))
276 tcpfd = tcpl.fileno()
277 poller = zmq.Poller() # type: ignore
278 poller.register(zsub, flags=zmq.POLLIN)
279 poller.register(tcpfd, flags=zmq.POLLIN)
281 activesubs: Set[str] = set()
283 towait: Set[int] = set()
285 neededsubs = clients.subs()
286 for imei in neededsubs - activesubs:
289 topic(proto_name(GPS_POSITIONING), True, imei),
293 topic(proto_name(WIFI_POSITIONING), False, imei),
297 topic(proto_name(STATUS), True, imei),
299 for imei in activesubs - neededsubs:
302 topic(proto_name(GPS_POSITIONING), True, imei),
306 topic(proto_name(WIFI_POSITIONING), False, imei),
310 topic(proto_name(STATUS), True, imei),
312 activesubs = neededsubs
313 log.debug("Subscribed to: %s", activesubs)
318 events = poller.poll()
319 for sk, fl in events:
323 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
324 msg = parse_message(zmsg.packet, zmsg.is_incoming)
325 log.debug("Got %s with %s", zmsg, msg)
326 if isinstance(msg, STATUS):
332 datetime.fromtimestamp(
334 ).astimezone(tz=timezone.utc)
345 datetime.fromtimestamp(
347 ).astimezone(tz=timezone.utc)
349 "longitude": msg.longitude,
350 "latitude": msg.latitude,
359 clntsock, clntaddr = tcpl.accept()
360 topoll.append((clntsock, clntaddr))
361 elif fl & zmq.POLLIN:
362 received = clients.recv(sk)
364 log.debug("Client gone from fd %d", sk)
368 for wsmsg in received:
369 log.debug("Received from %d: %s", sk, wsmsg)
370 if wsmsg.get("type", None) == "subscribe":
371 # Have to live w/o typeckeding from json
372 imeis = cast(List[str], wsmsg.get("imei"))
373 numback: int = wsmsg.get("backlog", 5)
375 tosend.extend(backlog(imei, numback))
377 elif fl & zmq.POLLOUT:
378 log.debug("Write now open for fd %d", sk)
382 log.debug("Stray event: %s on socket %s", fl, sk)
383 # poll queue consumed, make changes now
385 poller.unregister(fd) # type: ignore
388 log.debug("Sending to the clients: %s", wsmsg)
389 towrite |= clients.send(wsmsg)
390 for clntsock, clntaddr in topoll:
391 fd = clients.add(clntsock, clntaddr)
392 poller.register(fd, flags=zmq.POLLIN)
393 # Deal with actually writing the data out
394 trywrite = towrite - towait
395 morewait = clients.write(trywrite)
397 "towait %s, tried %s, still busy %s",
402 for fd in morewait - trywrite: # new fds waiting for write
403 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) # type: ignore
404 for fd in trywrite - morewait: # no longer waiting for write
405 poller.modify(fd, flags=zmq.POLLIN) # type: ignore
408 except KeyboardInterrupt:
410 zctx.destroy() # type: ignore
414 if __name__.endswith("__main__"):
415 runserver(common.init(log))