1 """ TCP server that communicates with terminals """
3 from configparser import ConfigParser
4 from logging import getLogger
14 from struct import pack
16 from typing import Dict, List, Optional, Tuple
20 from .gps303proto import (
29 from .zmsg import Bcast, Resp
31 log = getLogger("gps303/collector")
37 """Connected socket to the terminal plus buffer and metadata"""
39 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
42 self.stream = GPS303Conn()
43 self.imei: Optional[str] = None
45 def close(self) -> None:
46 log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
48 rest = self.stream.close()
50 log.warning("%d bytes in buffer on close: %s", len(rest), rest)
52 def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
53 """Read from the socket and parse complete messages"""
55 segment = self.sock.recv(MAXBUFFER)
58 "Reading from fd %d (IMEI %s): %s",
64 if not segment: # Terminal has closed connection
66 "EOF reading from fd %d (IMEI %s)",
75 (when, self.addr, packet)
76 for packet in self.stream.recv(segment)
78 except StreamError as e:
80 "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
83 def send(self, buffer: bytes) -> None:
85 self.sock.send(self.stream.enframe(buffer))
88 "Sending to fd %d (IMEI %s): %s",
94 def set_imei(self, imei: str) -> None:
99 def __init__(self) -> None:
100 self.by_fd: Dict[int, Client] = {}
101 self.by_imei: Dict[str, Client] = {}
103 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
104 fd = clntsock.fileno()
105 log.info("Start serving fd %d from %s", fd, clntaddr)
106 self.by_fd[fd] = Client(clntsock, clntaddr)
109 def stop(self, fd: int) -> None:
110 clnt = self.by_fd[fd]
111 log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
114 del self.by_imei[clnt.imei]
119 ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
120 clnt = self.by_fd[fd]
125 for when, peeraddr, packet in msgs:
126 if proto_of_message(packet) == LOGIN.PROTO:
127 msg = parse_message(packet)
128 if isinstance(msg, LOGIN): # Can be unparseable
129 if clnt.imei is None:
132 "LOGIN from fd %d (IMEI %s)",
136 oldclnt = self.by_imei.get(clnt.imei)
137 if oldclnt is not None:
139 "Orphaning fd %d with the same IMEI",
140 oldclnt.sock.fileno(),
143 self.by_imei[clnt.imei] = clnt
146 "Login message from %s: %s, but client imei unfilled",
150 result.append((clnt.imei, when, peeraddr, packet))
152 "Received from %s (IMEI %s): %s",
159 def response(self, resp: Resp) -> None:
160 if resp.imei in self.by_imei:
161 self.by_imei[resp.imei].send(resp.packet)
163 log.info("Not connected (IMEI %s)", resp.imei)
166 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
167 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
168 zctx = zmq.Context() # type: ignore
169 zpub = zctx.socket(zmq.PUB) # type: ignore
170 zpull = zctx.socket(zmq.PULL) # type: ignore
171 oldmask = umask(0o117)
172 zpub.bind(conf.get("collector", "publishurl"))
173 zpull.bind(conf.get("collector", "listenurl"))
175 tcpl = socket(AF_INET6, SOCK_STREAM)
176 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
177 tcpl.bind(("", conf.getint("collector", "port")))
179 tcpfd = tcpl.fileno()
180 poller = zmq.Poller() # type: ignore
181 poller.register(zpull, flags=zmq.POLLIN)
182 poller.register(tcpfd, flags=zmq.POLLIN)
189 events = poller.poll(1000)
190 for sk, fl in events:
194 msg = zpull.recv(zmq.NOBLOCK)
200 clntsock, clntaddr = tcpl.accept()
201 clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
202 topoll.append((clntsock, clntaddr))
203 elif fl & zmq.POLLIN:
204 received = clients.recv(sk)
206 log.debug("Terminal gone from fd %d", sk)
209 for imei, when, peeraddr, packet in received:
210 proto = proto_of_message(packet)
220 if proto == HIBERNATION.PROTO and handle_hibernate:
222 "HIBERNATION from fd %d (IMEI %s)",
227 respmsg = inline_response(packet)
228 if respmsg is not None:
230 Resp(imei=imei, when=when, packet=respmsg)
233 log.debug("Stray event: %s on socket %s", fl, sk)
234 # poll queue consumed, make changes now
239 proto=proto_of_message(zmsg.packet),
245 log.debug("Sending to the client: %s", zmsg)
246 clients.response(zmsg)
248 poller.unregister(fd) # type: ignore
250 for clntsock, clntaddr in topoll:
251 fd = clients.add(clntsock, clntaddr)
252 poller.register(fd, flags=zmq.POLLIN)
253 except KeyboardInterrupt:
256 zctx.destroy() # type: ignore
260 if __name__.endswith("__main__"):
261 runserver(common.init(log))