1 """ Websocket Gateway """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from json import dumps, loads
7 from logging import getLogger
8 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
10 from typing import Any, cast, Dict, List, Optional, Set, Tuple
11 from wsproto import ConnectionType, WSConnection
12 from wsproto.events import (
21 from wsproto.utilities import RemoteProtocolError
25 from .evstore import initdb, fetch, fetchpmod
26 from .protomodule import ProtoModule
27 from .zmsg import Rept, Resp, rtopic
29 log = getLogger("loctrkd/wsgateway")
34 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
36 for report in fetch(imei, numback):
37 report["type"] = "location"
38 timestamp = report.pop("devtime")
39 report["timestamp"] = timestamp
44 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
47 lines = data.decode().split("\r\n")
50 op, resource, proto = request.split(" ")
52 "HTTP %s for %s, proto %s from fd %d, headers: %s",
62 f"{proto} 500 No data configured\r\n"
63 f"Content-Type: text/plain\r\n\r\n"
64 f"HTML data not configured on the server\r\n".encode()
68 with open(htmlfile, "rb") as fl:
70 length = len(htmldata)
73 f"Content-Type: text/html; charset=utf-8\r\n"
74 f"Content-Length: {len(htmldata):d}\r\n\r\n"
75 ).encode("utf-8") + htmldata
78 f"{proto} 500 File not found\r\n"
79 f"Content-Type: text/plain\r\n\r\n"
80 f"HTML file could not be opened\r\n".encode()
84 f"{proto} 400 Bad request\r\n"
85 "Content-Type: text/plain\r\n\r\n"
86 "Bad request\r\n".encode()
89 log.warning("Unparseable data from fd %d: %s", fd, data)
94 """Websocket connection to the client"""
96 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
99 self.ws = WSConnection(ConnectionType.SERVER)
102 self.imeis: Set[str] = set()
104 def __str__(self) -> str:
105 return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})"
107 def close(self) -> None:
108 log.debug("Closing fd %d", self.sock.fileno())
111 def recv(self) -> Optional[List[Dict[str, Any]]]:
113 data = self.sock.recv(4096)
116 "Reading from fd %d: %s",
120 self.ws.receive_data(None)
122 if not data: # Client has closed connection
124 "EOF reading from fd %d",
127 self.ws.receive_data(None)
130 self.ws.receive_data(data)
131 except RemoteProtocolError as e:
133 "Websocket error on fd %d, try plain http (%s)",
137 self.ws_data = try_http(data, self.sock.fileno(), e)
138 # this `write` is a hack - writing _ought_ to be done at the
139 # stage when all other writes are performed. But I could not
140 # arrange it so in a logical way. Let it stay this way. The
141 # whole http server affair is a hack anyway.
143 log.debug("Sending HTTP response to %d", self.sock.fileno())
147 for event in self.ws.events():
148 if isinstance(event, Request):
149 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
150 # self.ws_data += self.ws.send(event.response()) # Why not?!
151 self.ws_data += self.ws.send(AcceptConnection())
153 elif isinstance(event, (CloseConnection, Ping)):
154 log.debug("%s on fd %d", event, self.sock.fileno())
155 self.ws_data += self.ws.send(event.response())
156 elif isinstance(event, TextMessage):
157 log.debug("%s on fd %d", event, self.sock.fileno())
158 msg = loads(event.data)
160 if msg.get("type", None) == "subscribe":
161 self.imeis = set(msg.get("imei", []))
163 "subs list on fd %s is %s",
168 log.warning("%s on fd %d", event, self.sock.fileno())
171 def wants(self, imei: str) -> bool:
173 "wants %s? set is %s on fd %d",
178 return imei in self.imeis
180 def send(self, message: Dict[str, Any]) -> None:
181 if self.ready and message["imei"] in self.imeis:
182 self.ws_data += self.ws.send(Message(data=dumps(message)))
184 def write(self) -> bool:
187 sent = self.sock.send(self.ws_data)
188 self.ws_data = self.ws_data[sent:]
191 "Sending to fd %d: %s",
196 return bool(self.ws_data)
200 def __init__(self) -> None:
201 self.by_fd: Dict[int, Client] = {}
203 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
204 fd = clntsock.fileno()
205 log.info("Start serving fd %d from %s", fd, clntaddr)
206 self.by_fd[fd] = Client(clntsock, clntaddr)
209 def stop(self, fd: int) -> None:
210 clnt = self.by_fd[fd]
211 log.info("Stop serving fd %d", clnt.sock.fileno())
215 def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
216 clnt = self.by_fd[fd]
217 return (clnt, clnt.recv())
219 def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
222 for fd, cl in self.by_fd.items():
223 if cl.wants(msg["imei"]):
227 fd = clnt.sock.fileno()
228 if self.by_fd.get(fd, None) == clnt:
233 "Trying to send %s to client at %d, not in service",
239 def write(self, towrite: Set[int]) -> Set[int]:
241 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
242 if clnt and clnt.write():
246 def subs(self) -> Set[str]:
248 for clnt in self.by_fd.values():
253 def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]:
254 imei = wsmsg.pop("imei", None)
255 cmd = wsmsg.pop("type", None)
256 if imei is None or cmd is None:
257 log.info("Unhandled message %s %s %s", cmd, imei, wsmsg)
261 "result": "Did not get imei or cmd",
263 pmod = fetchpmod(imei)
265 log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg)
269 "result": "Type of the terminal is unknown",
271 tmsg = common.make_response(pmod, cmd, imei, **wsmsg)
273 log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg)
277 "result": f"{cmd} unimplemented for terminal protocol {pmod}",
279 resp = Resp(imei=imei, when=time(), packet=tmsg.packed)
280 log.debug("Response: %s", resp)
281 zpush.send(resp.packed)
285 "result": f"{cmd} sent to {imei}",
289 def runserver(conf: ConfigParser) -> None:
291 initdb(conf.get("storage", "dbfn"))
292 htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
293 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
294 zctx = zmq.Context() # type: ignore
295 zsub = zctx.socket(zmq.SUB) # type: ignore
296 zsub.connect(conf.get("rectifier", "publishurl"))
297 zpush = zctx.socket(zmq.PUSH) # type: ignore
298 zpush.connect(conf.get("collector", "listenurl"))
299 tcpl = socket(AF_INET6, SOCK_STREAM)
300 tcpl.setblocking(False)
301 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
302 tcpl.bind(("", conf.getint("wsgateway", "port")))
304 tcpfd = tcpl.fileno()
305 poller = zmq.Poller() # type: ignore
306 poller.register(zsub, flags=zmq.POLLIN)
307 poller.register(tcpfd, flags=zmq.POLLIN)
309 activesubs: Set[str] = set()
311 towait: Set[int] = set()
313 neededsubs = clients.subs()
314 for imei in neededsubs - activesubs:
315 zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
316 for imei in activesubs - neededsubs:
317 zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
318 activesubs = neededsubs
319 log.debug("Subscribed to: %s", activesubs)
320 tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
324 events = poller.poll()
325 for sk, fl in events:
329 zmsg = Rept(zsub.recv(zmq.NOBLOCK))
330 msg = loads(zmsg.payload)
331 msg["imei"] = zmsg.imei
332 log.debug("Got %s, sending %s", zmsg, msg)
333 tosend.append((None, msg))
337 clntsock, clntaddr = tcpl.accept()
338 topoll.append((clntsock, clntaddr))
339 elif fl & zmq.POLLIN:
340 clnt, received = clients.recv(sk)
342 log.debug("Client gone from fd %d", sk)
346 for wsmsg in received:
347 log.debug("Received from %d: %s", sk, wsmsg)
348 if wsmsg.get("type", None) == "subscribe":
349 # Have to live w/o typeckeding from json
350 imeis = cast(List[str], wsmsg.get("imei"))
351 numback: int = wsmsg.get("backlog", 5)
356 for msg in backlog(imei, numback)
360 tosend.append((clnt, sendcmd(zpush, wsmsg)))
362 elif fl & zmq.POLLOUT:
363 log.debug("Write now open for fd %d", sk)
367 log.debug("Stray event: %s on socket %s", fl, sk)
368 # poll queue consumed, make changes now
370 poller.unregister(fd) # type: ignore
372 for towhom, wsmsg in tosend:
373 log.debug("Sending to the client %s: %s", towhom, wsmsg)
374 towrite |= clients.send(towhom, wsmsg)
375 for clntsock, clntaddr in topoll:
376 fd = clients.add(clntsock, clntaddr)
377 poller.register(fd, flags=zmq.POLLIN)
378 # Deal with actually writing the data out
379 trywrite = towrite - towait
380 morewait = clients.write(trywrite)
382 "towait %s, tried %s, still busy %s",
387 for fd in morewait - trywrite: # new fds waiting for write
388 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) # type: ignore
389 for fd in trywrite - morewait: # no longer waiting for write
390 poller.modify(fd, flags=zmq.POLLIN) # type: ignore
393 except KeyboardInterrupt:
395 zctx.destroy() # type: ignore
399 if __name__.endswith("__main__"):
400 runserver(common.init(log))