X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=7ffa7526cbce36f3ff98d4d88570b97ee8c84e94;hb=8be4295a5027349ebbf5242d131c5a942181f7a6;hp=d8ca86c0791a2b8c0cda3dae053153b649c863ce;hpb=4c9597cff07443042f51c2330a050b7ee5bc9bcb;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index d8ca86c..7ffa752 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,3 +1,5 @@ +""" TCP server that communicates with terminals """ + from getopt import getopt from logging import getLogger, StreamHandler, DEBUG, INFO from logging.handlers import SysLogHandler @@ -7,7 +9,7 @@ import sys import zmq from .config import readconfig -from .GT06mod import handle_packet, make_response, LOGIN, set_config +from .gps303proto import handle_packet, make_response, LOGIN, set_config CONF = "/etc/gps303.conf" @@ -15,42 +17,74 @@ log = getLogger("gps303/collector") class Bcast: + """Zmq message to broadcast what was received from the terminal""" def __init__(self, imei, msg): self.as_bytes = imei.encode() + msg.encode() -class Zmsg: +class Resp: + """Zmq message received from a third party to send to the terminal""" def __init__(self, msg): self.imei = msg[:16].decode() self.payload = msg[16:] class Client: - def __init__(self, clntsock, clntaddr): - self.clntsock = clntsock - self.clntaddr = clntaddr + """Connected socket to the terminal plus buffer and metadata""" + def __init__(self, sock, addr): + self.sock = sock + self.addr = addr self.buffer = b"" self.imei = None def close(self): - self.clntsock.close() + log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) + self.sock.close() + self.buffer = b"" + self.imei = None def recv(self): - packet = self.clntsock.recv(4096) - if not packet: + """ Read from the socket and parse complete messages """ + try: + segment = self.sock.recv(4096) + except OSError: + log.warning("Reading from fd %d (IMEI %s): %s", + self.sock.fileno(), self.imei, e) + return None + if not segment: # Terminal has closed connection + log.info("EOF reading from fd %d (IMEI %s)", + self.sock.fileno(), self.imei) return None when = time() - self.buffer += packet - # implement framing properly - msg = handle_packet(packet, self.clntaddr, when) - self.buffer = self.buffer[len(packet):] - if isinstance(msg, LOGIN): - self.imei = msg.imei - return msg + self.buffer += segment + msgs = [] + while True: + framestart = self.buffer.find(b"xx") + if framestart == -1: # No frames, return whatever we have + break + if framestart > 0: # Should not happen, report + log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)", + self.buffer[:framestart].hex(), self.sock.fileno(), self.imei) + self.buffer = self.buffer[framestart:] + # At this point, buffer starts with a packet + frameend = self.buffer.find(b"\r\n", 4) + if frameend == -1: # Incomplete frame, return what we have + break + msg = parse_message(self.buffer[:frameend]) + self.buffer = self.buffer[frameend+2:] + if isinstance(msg, LOGIN): + self.imei = msg.imei + log.info("LOGIN from fd %d: IMEI %s", + self.sock.fileno(), self.imei) + msgs.append(msg) + return msgs def send(self, buffer): - self.clntsock.send(buffer) - + try: + self.sock.send(b"xx" + buffer + b"\r\n") + except OSError as e: + log.error("Sending to fd %d (IMEI %s): %s", + self.sock.fileno, self.imei, e) class Clients: def __init__(self): @@ -64,6 +98,7 @@ class Clients: def stop(self, fd): clnt = by_fd[fd] + log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei) clnt.close() if clnt.imei: del self.by_imei[clnt.imei] @@ -71,14 +106,17 @@ class Clients: def recv(self, fd): clnt = by_fd[fd] - msg = clnt.recv() - if isinstance(msg, LOGIN): - self.by_imei[clnt.imei] = clnt - return clnt.imei, msg + msgs = clnt.recv() + result = [] + for msg in msgs: + if isinstance(msg, LOGIN): + self.by_imei[clnt.imei] = clnt + result.append(clnt.imei, msg) + return result - def response(self, zmsg): - if zmsg.imei in self.by_imei: - clnt = self.by_imei[zmsg.imei].send(zmsg.payload) + def response(self, resp): + if resp.imei in self.by_imei: + self.by_imei[resp.imei].send(resp.payload) def runserver(opts, conf): @@ -107,7 +145,7 @@ def runserver(opts, conf): while True: try: msg = zsub.recv(zmq.NOBLOCK) - tosend.append(Zmsg(msg)) + tosend.append(Resp(msg)) except zmq.Again: break elif sk == tcpfd: @@ -117,6 +155,7 @@ def runserver(opts, conf): imei, msg = clients.recv(sk) zpub.send(Bcast(imei, msg).as_bytes) if msg is None or isinstance(msg, HIBERNATION): + log.debug("HIBERNATION from fd %d", sk) tostop.append(sk) # poll queue consumed, make changes now for fd in tostop: