1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import (
15 from wsproto.utilities import RemoteProtocolError
19 from .zmsg import LocEvt
21 log = getLogger("gps303/wsgateway")
25 def try_http(data, fd, e):
28 lines = data.decode().split("\r\n")
31 op, resource, proto = request.split(" ")
33 "HTTP %s for %s, proto %s from fd %d, headers: %s",
43 f"{proto} 500 No data configured\r\n"
44 f"Content-Type: text/plain\r\n\r\n"
45 f"HTML data not configure on the server\r\n".encode()
48 length = len(htmldata.encode("utf-8"))
51 f"Content-Type: text/html; charset=utf-8\r\n"
52 f"Content-Length: {length:d}\r\n\r\n" + htmldata
56 f"{proto} 404 File not found\r\n"
57 f"Content-Type: text/plain\r\n\r\n"
58 f"We can only serve \"/\"\r\n".encode()
62 f"{proto} 400 Bad request\r\n"
63 "Content-Type: text/plain\r\n\r\n"
64 "Bad request\r\n".encode()
67 log.warning("Unparseable data from fd %d: %s", fd, data)
72 """Websocket connection to the client"""
74 def __init__(self, sock, addr):
77 self.ws = WSConnection(ConnectionType.SERVER)
81 log.debug("Closing fd %d", self.sock.fileno())
86 data = self.sock.recv(4096)
89 "Reading from fd %d: %s",
93 self.ws.receive_data(None)
95 if not data: # Client has closed connection
97 "EOF reading from fd %d",
100 self.ws.receive_data(None)
103 self.ws.receive_data(data)
104 except RemoteProtocolError as e:
106 "Websocket error on fd %d, try plain http (%s)",
110 self.ws_data = try_http(data, self.sock.fileno(), e)
111 log.debug("Sending HTTP response to %d", self.sock.fileno())
115 for event in self.ws.events():
116 if isinstance(event, Request):
117 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
118 # self.ws_data += self.ws.send(event.response()) # Why not?!
119 self.ws_data += self.ws.send(AcceptConnection())
120 elif isinstance(event, (CloseConnection, Ping)):
121 log.debug("%s on fd %d", event, self.sock.fileno())
122 self.ws_data += self.ws.send(event.response())
123 elif isinstance(event, TextMessage):
124 # TODO: save imei "subscription"
125 log.debug("%s on fd %d", event, self.sock.fileno())
126 msgs.append(event.data)
128 log.warning("%s on fd %d", event, self.sock.fileno())
129 if self.ws_data: # Temp hack
133 def wants(self, imei):
134 return True # TODO: check subscriptions
136 def send(self, message):
137 # TODO: filter only wanted imei got from the client
138 self.ws_data += self.ws.send(Message(data=message.json))
142 sent = self.sock.send(self.ws_data)
143 self.ws_data = self.ws_data[sent:]
146 "Sending to fd %d: %s",
157 def add(self, clntsock, clntaddr):
158 fd = clntsock.fileno()
159 log.info("Start serving fd %d from %s", fd, clntaddr)
160 self.by_fd[fd] = Client(clntsock, clntaddr)
164 clnt = self.by_fd[fd]
165 log.info("Stop serving fd %d", clnt.sock.fileno())
170 clnt = self.by_fd[fd]
176 log.debug("Received: %s", msg)
180 for clnt in self.by_fd.values():
181 if clnt.wants(msg.imei):
190 conf.get("wsgateway", "htmlfile"), encoding="utf-8"
196 zsub = zctx.socket(zmq.SUB)
197 zsub.connect(conf.get("lookaside", "publishurl"))
198 zsub.setsockopt(zmq.SUBSCRIBE, b"")
199 tcpl = socket(AF_INET6, SOCK_STREAM)
200 tcpl.setblocking(False)
201 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
202 tcpl.bind(("", conf.getint("wsgateway", "port")))
204 tcpfd = tcpl.fileno()
205 poller = zmq.Poller()
206 poller.register(zsub, flags=zmq.POLLIN)
207 poller.register(tcpfd, flags=zmq.POLLIN)
214 events = poller.poll(5000)
215 log.debug("got events: %s", events)
216 for sk, fl in events:
220 zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
225 clntsock, clntaddr = tcpl.accept()
226 topoll.append((clntsock, clntaddr))
227 elif fl & zmq.POLLIN:
228 received = clients.recv(sk)
230 log.debug("Client gone from fd %d", sk)
234 log.debug("Received from %d: %s", sk, msg)
236 log.debug("Stray event: %s on socket %s", fl, sk)
237 # poll queue consumed, make changes now
239 poller.unregister(fd)
242 log.debug("Sending to the client: %s", zmsg)
244 for clntsock, clntaddr in topoll:
245 fd = clients.add(clntsock, clntaddr)
246 poller.register(fd, flags=zmq.POLLIN)
247 # TODO: Handle write overruns (register for POLLOUT)
248 except KeyboardInterrupt:
252 if __name__.endswith("__main__"):
253 runserver(common.init(log))