1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import (
15 from wsproto.utilities import RemoteProtocolError
19 from .zmsg import LocEvt
21 log = getLogger("gps303/wsgateway")
25 def try_http(data, fd, e):
28 lines = data.decode().split("\r\n")
31 op, resource, proto = request.split(" ")
33 "HTTP %s for %s, proto %s from fd %d, headers: %s",
41 pos = resource.index("?")
42 resource = resource[:pos]
48 f"{proto} 500 No data configured\r\n"
49 f"Content-Type: text/plain\r\n\r\n"
50 f"HTML data not configure on the server\r\n".encode()
54 with open(htmlfile, "rb") as fl:
56 length = len(htmldata)
59 f"Content-Type: text/html; charset=utf-8\r\n"
60 f"Content-Length: {len(htmldata):d}\r\n\r\n"
61 ).encode("utf-8") + htmldata
64 f"{proto} 500 File not found\r\n"
65 f"Content-Type: text/plain\r\n\r\n"
66 f'HTML file could not be opened\r\n'.encode()
70 f"{proto} 404 File not found\r\n"
71 f"Content-Type: text/plain\r\n\r\n"
72 f'We can only serve "/"\r\n'.encode()
76 f"{proto} 400 Bad request\r\n"
77 "Content-Type: text/plain\r\n\r\n"
78 "Bad request\r\n".encode()
81 log.warning("Unparseable data from fd %d: %s", fd, data)
86 """Websocket connection to the client"""
88 def __init__(self, sock, addr):
91 self.ws = WSConnection(ConnectionType.SERVER)
95 log.debug("Closing fd %d", self.sock.fileno())
100 data = self.sock.recv(4096)
103 "Reading from fd %d: %s",
107 self.ws.receive_data(None)
109 if not data: # Client has closed connection
111 "EOF reading from fd %d",
114 self.ws.receive_data(None)
117 self.ws.receive_data(data)
118 except RemoteProtocolError as e:
120 "Websocket error on fd %d, try plain http (%s)",
124 self.ws_data = try_http(data, self.sock.fileno(), e)
125 log.debug("Sending HTTP response to %d", self.sock.fileno())
129 for event in self.ws.events():
130 if isinstance(event, Request):
131 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
132 # self.ws_data += self.ws.send(event.response()) # Why not?!
133 self.ws_data += self.ws.send(AcceptConnection())
134 elif isinstance(event, (CloseConnection, Ping)):
135 log.debug("%s on fd %d", event, self.sock.fileno())
136 self.ws_data += self.ws.send(event.response())
137 elif isinstance(event, TextMessage):
138 # TODO: save imei "subscription"
139 log.debug("%s on fd %d", event, self.sock.fileno())
140 msgs.append(event.data)
142 log.warning("%s on fd %d", event, self.sock.fileno())
143 if self.ws_data: # Temp hack
147 def wants(self, imei):
148 return True # TODO: check subscriptions
150 def send(self, message):
151 # TODO: filter only wanted imei got from the client
152 self.ws_data += self.ws.send(Message(data=message.json))
157 sent = self.sock.send(self.ws_data)
158 self.ws_data = self.ws_data[sent:]
161 "Sending to fd %d: %s",
166 return bool(self.ws_data)
173 def add(self, clntsock, clntaddr):
174 fd = clntsock.fileno()
175 log.info("Start serving fd %d from %s", fd, clntaddr)
176 self.by_fd[fd] = Client(clntsock, clntaddr)
180 clnt = self.by_fd[fd]
181 log.info("Stop serving fd %d", clnt.sock.fileno())
186 clnt = self.by_fd[fd]
192 log.debug("Received: %s", msg)
197 for fd, clnt in self.by_fd.items():
198 if clnt.wants(msg.imei):
203 def write(self, towrite):
205 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
214 htmlfile = conf.get("wsgateway", "htmlfile")
216 zsub = zctx.socket(zmq.SUB)
217 zsub.connect(conf.get("lookaside", "publishurl"))
218 zsub.setsockopt(zmq.SUBSCRIBE, b"")
219 tcpl = socket(AF_INET6, SOCK_STREAM)
220 tcpl.setblocking(False)
221 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
222 tcpl.bind(("", conf.getint("wsgateway", "port")))
224 tcpfd = tcpl.fileno()
225 poller = zmq.Poller()
226 poller.register(zsub, flags=zmq.POLLIN)
227 poller.register(tcpfd, flags=zmq.POLLIN)
236 events = poller.poll()
237 for sk, fl in events:
241 zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
246 clntsock, clntaddr = tcpl.accept()
247 topoll.append((clntsock, clntaddr))
248 elif fl & zmq.POLLIN:
249 received = clients.recv(sk)
251 log.debug("Client gone from fd %d", sk)
255 log.debug("Received from %d: %s", sk, msg)
256 elif fl & zmq.POLLOUT:
257 log.debug("Write now open for fd %d", sk)
261 log.debug("Stray event: %s on socket %s", fl, sk)
262 # poll queue consumed, make changes now
264 poller.unregister(fd)
267 log.debug("Sending to the clients: %s", zmsg)
268 towrite |= clients.send(zmsg)
269 for clntsock, clntaddr in topoll:
270 fd = clients.add(clntsock, clntaddr)
271 poller.register(fd, flags=zmq.POLLIN)
272 # Deal with actually writing the data out
273 trywrite = towrite - towait
274 morewait = clients.write(trywrite)
276 "towait %s, tried %s, still busy %s",
281 for fd in morewait - trywrite: # new fds waiting for write
282 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
283 for fd in trywrite - morewait: # no longer waiting for write
284 poller.modify(fd, flags=zmq.POLLIN)
287 except KeyboardInterrupt:
291 if __name__.endswith("__main__"):
292 runserver(common.init(log))