1 """ TCP server that communicates with terminals """
3 from configparser import ConfigParser
4 from importlib import import_module
5 from logging import getLogger
15 from struct import pack
17 from typing import Any, Dict, List, Optional, Set, Tuple, Union
21 from .protomodule import ProtoModule
22 from .zmsg import Bcast, Resp
24 log = getLogger("loctrkd/collector")
30 """Connected socket to the terminal plus buffer and metadata"""
32 def __init__(self, sock: socket, addr: Any) -> None:
35 self.pmod: Optional[ProtoModule] = None
36 self.stream: Optional[ProtoModule.Stream] = None
37 self.imei: Optional[str] = None
39 def close(self) -> None:
40 log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
43 rest = self.stream.close()
48 "%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
51 def recv(self) -> Optional[List[Tuple[float, Any, bytes]]]:
52 """Read from the socket and parse complete messages"""
54 segment = self.sock.recv(MAXBUFFER)
57 "Reading from fd %d (IMEI %s): %s",
63 if not segment: # Terminal has closed connection
65 "EOF reading from fd %d (IMEI %s)",
70 if self.stream is None:
71 self.pmod = common.probe_pmod(segment)
72 if self.pmod is not None:
73 self.stream = self.pmod.Stream()
74 if self.stream is None:
76 "unrecognizable %d bytes of data %s from fd %d",
84 for elem in self.stream.recv(segment):
85 if isinstance(elem, bytes):
86 msgs.append((when, self.addr, elem))
89 "%s from fd %d (IMEI %s)",
96 def send(self, buffer: bytes) -> None:
97 assert self.stream is not None and self.pmod is not None
99 self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
102 "Sending to fd %d (IMEI %s): %s",
110 def __init__(self) -> None:
111 self.by_fd: Dict[int, Client] = {}
112 self.by_imei: Dict[str, Client] = {}
114 def fds(self) -> Set[int]:
115 return set(self.by_fd.keys())
117 def add(self, clntsock: socket, clntaddr: Any) -> int:
118 fd = clntsock.fileno()
119 log.info("Start serving fd %d from %s", fd, clntaddr)
120 self.by_fd[fd] = Client(clntsock, clntaddr)
123 def stop(self, fd: int) -> None:
124 if fd not in self.by_fd:
125 log.debug("Fd %d is not served, ingore stop", fd)
127 clnt = self.by_fd[fd]
128 log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
130 if clnt.imei and self.by_imei[clnt.imei] == clnt: # could be replaced
131 del self.by_imei[clnt.imei]
136 ) -> Optional[List[Tuple[ProtoModule, Optional[str], float, Any, bytes]]]:
137 if fd not in self.by_fd:
138 log.debug("Client at fd %d gone, ingore event", fd)
140 clnt = self.by_fd[fd]
145 for when, peeraddr, packet in msgs:
146 assert clnt.pmod is not None
147 if clnt.imei is None:
148 imei = clnt.pmod.imei_from_packet(packet)
150 log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
152 oldclnt = self.by_imei.get(clnt.imei)
153 if oldclnt is not None:
154 oldfd = oldclnt.sock.fileno()
155 log.info("Removing stale connection on fd %d", oldfd)
158 self.by_imei[clnt.imei] = clnt
159 result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
161 "Received from %s (IMEI %s): %s",
168 def response(self, resp: Resp) -> Optional[ProtoModule]:
169 if resp.imei in self.by_imei:
170 clnt = self.by_imei[resp.imei]
171 clnt.send(resp.packet)
174 log.info("Not connected (IMEI %s)", resp.imei)
178 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
179 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
180 zctx = zmq.Context() # type: ignore
181 zpub = zctx.socket(zmq.PUB) # type: ignore
182 zpull = zctx.socket(zmq.PULL) # type: ignore
183 oldmask = umask(0o117)
184 zpub.bind(conf.get("collector", "publishurl"))
185 zpull.bind(conf.get("collector", "listenurl"))
187 tcpl = socket(AF_INET6, SOCK_STREAM)
188 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
189 tcpl.bind(("", conf.getint("collector", "port")))
191 tcpfd = tcpl.fileno()
192 poller = zmq.Poller() # type: ignore
193 poller.register(zpull, flags=zmq.POLLIN)
194 poller.register(tcpfd, flags=zmq.POLLIN)
196 pollingfds: Set[int] = set()
199 tosend: List[Resp] = []
200 toadd: List[Tuple[socket, Any]] = []
201 events = poller.poll(1000)
202 for sk, fl in events:
206 msg = zpull.recv(zmq.NOBLOCK)
212 clntsock, clntaddr = tcpl.accept()
213 clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
214 toadd.append((clntsock, clntaddr))
215 elif fl & zmq.POLLIN:
216 received = clients.recv(sk)
218 log.debug("Terminal gone from fd %d", sk)
221 for pmod, imei, when, peeraddr, packet in received:
222 proto = pmod.proto_of_message(packet)
233 pmod.is_goodbye_packet(packet)
237 "Goodbye from fd %d (IMEI %s)",
242 respmsg = pmod.inline_response(packet)
243 if respmsg is not None:
245 Resp(imei=imei, when=when, packet=respmsg)
248 log.debug("Stray event: %s on socket %s", fl, sk)
249 # poll queue consumed, make changes now
251 log.debug("Sending to the client: %s", zmsg)
252 rpmod = clients.response(zmsg)
253 if rpmod is not None:
257 proto=rpmod.proto_of_message(zmsg.packet),
263 for fd in pollingfds - clients.fds():
264 poller.unregister(fd) # type: ignore
265 for clntsock, clntaddr in toadd:
266 fd = clients.add(clntsock, clntaddr)
267 for fd in clients.fds() - pollingfds:
268 poller.register(fd, flags=zmq.POLLIN)
269 pollingfds = clients.fds()
270 except KeyboardInterrupt:
273 zctx.destroy() # type: ignore
277 if __name__.endswith("__main__"):
278 runserver(common.init(log))