X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=385285494d7f0782bfc3c8ab20eff1ece22bbf7e;hb=f85690956cf13e342ac02dea298fe876c4163c95;hp=345d0b99a0de7048fc75f8a1a515007d2233318d;hpb=2c60838ac44f02802480eeca626040231014caaa;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 345d0b9..3852854 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,36 +1,49 @@ """ TCP server that communicates with terminals """ -from getopt import getopt -from logging import getLogger, StreamHandler, DEBUG, INFO -from logging.handlers import SysLogHandler +from logging import getLogger from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time -import sys +from struct import pack import zmq -from .config import readconfig -from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config - -CONF = "/etc/gps303.conf" +from . import common +from .gps303proto import ( + HIBERNATION, + LOGIN, + inline_response, + parse_message, + proto_of_message, +) log = getLogger("gps303/collector") class Bcast: """Zmq message to broadcast what was received from the terminal""" + def __init__(self, imei, msg): - self.as_bytes = imei.encode() + msg.to_packet() + self.as_bytes = ( + pack("B", proto_of_message(msg)) + + ("0000000000000000" if imei is None else imei).encode() + + msg + ) class Resp: """Zmq message received from a third party to send to the terminal""" - def __init__(self, msg): - self.imei = msg[:16].decode() - self.payload = msg[16:] + + def __init__(self, *args, **kwargs): + if not kwargs and len(args) == 1 and isinstance(args[0], bytes): + self.imei = msg[:16].decode() + self.payload = msg[16:] + elif len(args) == 0: + self.imei = kwargs["imei"] + self.payload = kwargs["payload"] class Client: """Connected socket to the terminal plus buffer and metadata""" + def __init__(self, sock, addr): self.sock = sock self.addr = addr @@ -44,16 +57,23 @@ class Client: self.imei = None def recv(self): - """ Read from the socket and parse complete messages """ + """Read from the socket and parse complete messages""" try: segment = self.sock.recv(4096) except OSError: - log.warning("Reading from fd %d (IMEI %s): %s", - self.sock.fileno(), self.imei, e) + log.warning( + "Reading from fd %d (IMEI %s): %s", + self.sock.fileno(), + self.imei, + e, + ) return None if not segment: # Terminal has closed connection - log.info("EOF reading from fd %d (IMEI %s)", - self.sock.fileno(), self.imei) + log.info( + "EOF reading from fd %d (IMEI %s)", + self.sock.fileno(), + self.imei, + ) return None when = time() self.buffer += segment @@ -63,28 +83,38 @@ class Client: if framestart == -1: # No frames, return whatever we have break if framestart > 0: # Should not happen, report - log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)", - self.buffer[:framestart].hex(), self.sock.fileno(), self.imei) + log.warning( + 'Undecodable data "%s" from fd %d (IMEI %s)', + self.buffer[:framestart].hex(), + self.sock.fileno(), + self.imei, + ) self.buffer = self.buffer[framestart:] # At this point, buffer starts with a packet frameend = self.buffer.find(b"\r\n", 4) if frameend == -1: # Incomplete frame, return what we have break - msg = parse_message(self.buffer[2:frameend]) - self.buffer = self.buffer[frameend+2:] - if isinstance(msg, LOGIN): - self.imei = msg.imei - log.info("LOGIN from fd %d: IMEI %s", - self.sock.fileno(), self.imei) - msgs.append(msg) + packet = self.buffer[2:frameend] + self.buffer = self.buffer[frameend + 2 :] + if proto_of_message(packet) == LOGIN.PROTO: + self.imei = parse_message(packet).imei + log.info( + "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei + ) + msgs.append(packet) return msgs def send(self, buffer): try: self.sock.send(b"xx" + buffer + b"\r\n") except OSError as e: - log.error("Sending to fd %d (IMEI %s): %s", - self.sock.fileno, self.imei, e) + log.error( + "Sending to fd %d (IMEI %s): %s", + self.sock.fileno, + self.imei, + e, + ) + class Clients: def __init__(self): @@ -107,9 +137,11 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] msgs = clnt.recv() + if msgs is None: + return None result = [] for msg in msgs: - if isinstance(msg, LOGIN): + if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... self.by_imei[clnt.imei] = clnt result.append((clnt.imei, msg)) return result @@ -119,7 +151,7 @@ class Clients: self.by_imei[resp.imei].send(resp.payload) -def runserver(opts, conf): +def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) zpub.bind(conf.get("collector", "publishurl")) @@ -152,11 +184,27 @@ def runserver(opts, conf): clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) else: - for imei, msg in clients.recv(sk): - zpub.send(Bcast(imei, msg).as_bytes) - if msg is None or isinstance(msg, HIBERNATION): - log.debug("HIBERNATION from fd %d", sk) - tostop.append(sk) + received = clients.recv(sk) + if received is None: + log.debug( + "Terminal gone from fd %d (IMEI %s)", sk, imei + ) + tostop.append(sk) + else: + for imei, msg in received: + zpub.send(Bcast(imei, msg).as_bytes) + if proto_of_message(msg) == HIBERNATION.PROTO: + log.debug( + "HIBERNATION from fd %d (IMEI %s)", + sk, + imei, + ) + tostop.append(sk) + respmsg = inline_response(msg) + if respmsg is not None: + clients.response( + Resp(imei=imei, payload=respmsg) + ) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd) @@ -171,13 +219,4 @@ def runserver(opts, conf): if __name__.endswith("__main__"): - opts, _ = getopt(sys.argv[1:], "c:d") - opts = dict(opts) - conf = readconfig(opts["-c"] if "-c" in opts else CONF) - if sys.stdout.isatty(): - log.addHandler(StreamHandler(sys.stderr)) - else: - log.addHandler(SysLogHandler(address="/dev/log")) - log.setLevel(DEBUG if "-d" in opts else INFO) - log.info("starting with options: %s", opts) - runserver(opts, conf) + runserver(common.init(log))