1 """ TCP server that communicates with terminals """
3 from logging import getLogger
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from struct import pack
10 from .gps303proto import (
18 log = getLogger("gps303/collector")
22 """Zmq message to broadcast what was received from the terminal"""
24 def __init__(self, imei, msg):
26 pack("B", proto_of_message(msg))
27 + ("0000000000000000" if imei is None else imei).encode()
33 """Zmq message received from a third party to send to the terminal"""
35 def __init__(self, *args, **kwargs):
36 if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
37 self.imei = msg[:16].decode()
38 self.payload = msg[16:]
40 self.imei = kwargs["imei"]
41 self.payload = kwargs["payload"]
45 """Connected socket to the terminal plus buffer and metadata"""
47 def __init__(self, sock, addr):
54 log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
60 """Read from the socket and parse complete messages"""
62 segment = self.sock.recv(4096)
65 "Reading from fd %d (IMEI %s): %s",
71 if not segment: # Terminal has closed connection
73 "EOF reading from fd %d (IMEI %s)",
79 self.buffer += segment
82 framestart = self.buffer.find(b"xx")
83 if framestart == -1: # No frames, return whatever we have
85 if framestart > 0: # Should not happen, report
87 'Undecodable data "%s" from fd %d (IMEI %s)',
88 self.buffer[:framestart].hex(),
92 self.buffer = self.buffer[framestart:]
93 # At this point, buffer starts with a packet
94 frameend = self.buffer.find(b"\r\n", 4)
95 if frameend == -1: # Incomplete frame, return what we have
97 packet = self.buffer[2:frameend]
98 self.buffer = self.buffer[frameend + 2 :]
99 if proto_of_message(packet) == LOGIN.PROTO:
100 self.imei = parse_message(packet).imei
102 "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
107 def send(self, buffer):
109 self.sock.send(b"xx" + buffer + b"\r\n")
112 "Sending to fd %d (IMEI %s): %s",
124 def add(self, clntsock, clntaddr):
125 fd = clntsock.fileno()
126 self.by_fd[fd] = Client(clntsock, clntaddr)
130 clnt = self.by_fd[fd]
131 log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
134 del self.by_imei[clnt.imei]
138 clnt = self.by_fd[fd]
144 if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly...
145 self.by_imei[clnt.imei] = clnt
146 result.append((clnt.imei, msg))
149 def response(self, resp):
150 if resp.imei in self.by_imei:
151 self.by_imei[resp.imei].send(resp.payload)
156 zpub = zctx.socket(zmq.PUB)
157 zpub.bind(conf.get("collector", "publishurl"))
158 zsub = zctx.socket(zmq.SUB)
159 zsub.connect(conf.get("collector", "listenurl"))
160 tcpl = socket(AF_INET, SOCK_STREAM)
161 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
162 tcpl.bind(("", conf.getint("collector", "port")))
164 tcpfd = tcpl.fileno()
165 poller = zmq.Poller()
166 poller.register(zsub, flags=zmq.POLLIN)
167 poller.register(tcpfd, flags=zmq.POLLIN)
174 events = poller.poll(10)
175 for sk, fl in events:
179 msg = zsub.recv(zmq.NOBLOCK)
180 tosend.append(Resp(msg))
184 clntsock, clntaddr = tcpl.accept()
185 topoll.append((clntsock, clntaddr))
187 received = clients.recv(sk)
190 "Terminal gone from fd %d (IMEI %s)", sk, imei
194 for imei, msg in received:
195 zpub.send(Bcast(imei, msg).as_bytes)
196 if proto_of_message(msg) == HIBERNATION.PROTO:
198 "HIBERNATION from fd %d (IMEI %s)",
203 respmsg = inline_response(msg)
204 if respmsg is not None:
206 Resp(imei=imei, payload=respmsg)
208 # poll queue consumed, make changes now
210 poller.unregister(fd)
213 clients.response(zmsg)
214 for clntsock, clntaddr in topoll:
215 fd = clients.add(clntsock, clntaddr)
217 except KeyboardInterrupt:
221 if __name__.endswith("__main__"):
222 runserver(common.init(log))