From 4c173a5448990cd4da398be1bf3479bef17b1048 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Tue, 19 Apr 2022 00:09:44 +0200 Subject: [PATCH] initial storage service --- gps303.conf | 5 +- gps303/__main__.py | 4 +- gps303/collector.py | 106 ++++++++++++++++++++++++++---------------- gps303/common.py | 50 ++++++++++++++++++++ gps303/config.py | 29 ------------ gps303/evstore.py | 4 -- gps303/gps303proto.py | 19 ++++---- gps303/storage.py | 38 +++++++++++++++ 8 files changed, 169 insertions(+), 86 deletions(-) create mode 100644 gps303/common.py delete mode 100644 gps303/config.py create mode 100644 gps303/storage.py diff --git a/gps303.conf b/gps303.conf index cdad9c7..1a4f181 100644 --- a/gps303.conf +++ b/gps303.conf @@ -1,10 +1,9 @@ [collector] port = 4303 publishurl = ipc:///tmp/collected -listenurl = ipc:///responses +listenurl = ipc:///tmp/responses -[daemon] -port = 4303 +[storage] dbfn = gps303.sqlite [opencellid] diff --git a/gps303/__main__.py b/gps303/__main__.py index 51e462c..461c450 100755 --- a/gps303/__main__.py +++ b/gps303/__main__.py @@ -27,12 +27,12 @@ if __name__.endswith("__main__"): log.setLevel(DEBUG if "-d" in opts else INFO) log.info("starting with options: %s", opts) - initdb(conf.get("daemon", "dbfn")) + initdb(conf.get("storage", "dbfn")) set_config(conf) ctlsock = socket(AF_INET, SOCK_STREAM) ctlsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) - ctlsock.bind(("", conf.getint("daemon", "port"))) + ctlsock.bind(("", conf.getint("collector", "port"))) ctlsock.listen(5) ctlfd = ctlsock.fileno() pollset = poll() diff --git a/gps303/collector.py b/gps303/collector.py index 345d0b9..68c95bf 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,36 +1,45 @@ """ TCP server that communicates with terminals """ from getopt import getopt -from logging import getLogger, StreamHandler, DEBUG, INFO +from logging import getLogger from logging.handlers import SysLogHandler from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time -import sys +from struct import pack import zmq -from .config import readconfig -from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config - -CONF = "/etc/gps303.conf" +from . import common +from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message log = getLogger("gps303/collector") class Bcast: """Zmq message to broadcast what was received from the terminal""" + def __init__(self, imei, msg): - self.as_bytes = imei.encode() + msg.to_packet() + self.as_bytes = ( + pack("B", proto_of_message(msg)) + + ("0000000000000000" if imei is None else imei).encode() + + msg + ) class Resp: """Zmq message received from a third party to send to the terminal""" - def __init__(self, msg): - self.imei = msg[:16].decode() - self.payload = msg[16:] + + def __init__(self, *args, **kwargs): + if not kwargs and len(args) == 1 and isinstance(args[0], bytes): + self.imei = msg[:16].decode() + self.payload = msg[16:] + elif len(args) == 0: + self.imei = kwargs["imei"] + self.payload = kwargs["payload"] class Client: """Connected socket to the terminal plus buffer and metadata""" + def __init__(self, sock, addr): self.sock = sock self.addr = addr @@ -44,16 +53,23 @@ class Client: self.imei = None def recv(self): - """ Read from the socket and parse complete messages """ + """Read from the socket and parse complete messages""" try: segment = self.sock.recv(4096) except OSError: - log.warning("Reading from fd %d (IMEI %s): %s", - self.sock.fileno(), self.imei, e) + log.warning( + "Reading from fd %d (IMEI %s): %s", + self.sock.fileno(), + self.imei, + e, + ) return None if not segment: # Terminal has closed connection - log.info("EOF reading from fd %d (IMEI %s)", - self.sock.fileno(), self.imei) + log.info( + "EOF reading from fd %d (IMEI %s)", + self.sock.fileno(), + self.imei, + ) return None when = time() self.buffer += segment @@ -63,28 +79,38 @@ class Client: if framestart == -1: # No frames, return whatever we have break if framestart > 0: # Should not happen, report - log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)", - self.buffer[:framestart].hex(), self.sock.fileno(), self.imei) + log.warning( + 'Undecodable data "%s" from fd %d (IMEI %s)', + self.buffer[:framestart].hex(), + self.sock.fileno(), + self.imei, + ) self.buffer = self.buffer[framestart:] # At this point, buffer starts with a packet frameend = self.buffer.find(b"\r\n", 4) if frameend == -1: # Incomplete frame, return what we have break - msg = parse_message(self.buffer[2:frameend]) - self.buffer = self.buffer[frameend+2:] - if isinstance(msg, LOGIN): - self.imei = msg.imei - log.info("LOGIN from fd %d: IMEI %s", - self.sock.fileno(), self.imei) - msgs.append(msg) + packet = self.buffer[2:frameend] + self.buffer = self.buffer[frameend + 2 :] + if proto_of_message(packet) == LOGIN.PROTO: + self.imei = parse_message(packet).imei + log.info( + "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei + ) + msgs.append(packet) return msgs def send(self, buffer): try: self.sock.send(b"xx" + buffer + b"\r\n") except OSError as e: - log.error("Sending to fd %d (IMEI %s): %s", - self.sock.fileno, self.imei, e) + log.error( + "Sending to fd %d (IMEI %s): %s", + self.sock.fileno, + self.imei, + e, + ) + class Clients: def __init__(self): @@ -107,9 +133,11 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] msgs = clnt.recv() + if msgs is None: + return None result = [] for msg in msgs: - if isinstance(msg, LOGIN): + if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... self.by_imei[clnt.imei] = clnt result.append((clnt.imei, msg)) return result @@ -119,7 +147,7 @@ class Clients: self.by_imei[resp.imei].send(resp.payload) -def runserver(opts, conf): +def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) zpub.bind(conf.get("collector", "publishurl")) @@ -154,9 +182,16 @@ def runserver(opts, conf): else: for imei, msg in clients.recv(sk): zpub.send(Bcast(imei, msg).as_bytes) - if msg is None or isinstance(msg, HIBERNATION): - log.debug("HIBERNATION from fd %d", sk) + if ( + msg is None + or proto_of_message(msg) == HIBERNATION.PROTO + ): + log.debug( + "HIBERNATION from fd %d (IMEI %s)", sk, imei + ) tostop.append(sk) + elif proto_of_message(msg) == LOGIN.PROTO: + clients.response(Resp(imei=imei, payload=LOGIN.response())) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd) @@ -171,13 +206,4 @@ def runserver(opts, conf): if __name__.endswith("__main__"): - opts, _ = getopt(sys.argv[1:], "c:d") - opts = dict(opts) - conf = readconfig(opts["-c"] if "-c" in opts else CONF) - if sys.stdout.isatty(): - log.addHandler(StreamHandler(sys.stderr)) - else: - log.addHandler(SysLogHandler(address="/dev/log")) - log.setLevel(DEBUG if "-d" in opts else INFO) - log.info("starting with options: %s", opts) - runserver(opts, conf) + runserver(common.init(log)) diff --git a/gps303/common.py b/gps303/common.py new file mode 100644 index 0000000..0e7aac1 --- /dev/null +++ b/gps303/common.py @@ -0,0 +1,50 @@ +""" Common housekeeping for all daemons """ + +from configparser import ConfigParser +from getopt import getopt +from logging import getLogger, StreamHandler, DEBUG, INFO +from sys import argv, stderr, stdout + +CONF = "/etc/gps303.conf" +PORT = 4303 +DBFN = "/var/lib/gps303/gps303.sqlite" + +def init(log): + opts, _ = getopt(argv[1:], "c:d") + opts = dict(opts) + conf = readconfig(opts["-c"] if "-c" in opts else CONF) + if stdout.isatty(): + log.addHandler(StreamHandler(stderr)) + else: + log.addHandler(SysLogHandler(address="/dev/log")) + log.setLevel(DEBUG if "-d" in opts else INFO) + log.info("starting with options: %s", opts) + return conf + +def readconfig(fname): + config = ConfigParser() + config["collector"] = { + "port": PORT, + } + config["storage"] = { + "dbfn": DBFN, + } + config["device"] = {} + #_print_config(config) + #print("now reading", fname) + config.read(fname) + #_print_config(config) + return config + +if __name__ == "__main__": + from sys import argv + + def _print_config(conf): + for section in conf.sections(): + print("section", section) + for option in conf.options(section): + print(" ", option, conf[section][option]) + + conf = readconfig(argv[1]) + _print_config(conf) + print("binaryswitch", int(conf.get("device", "binaryswitch"), 0)) diff --git a/gps303/config.py b/gps303/config.py deleted file mode 100644 index fe4f696..0000000 --- a/gps303/config.py +++ /dev/null @@ -1,29 +0,0 @@ -from configparser import ConfigParser - -PORT = 4303 -DBFN = "/var/lib/gps303/gps303.sqlite" - -def readconfig(fname): - config = ConfigParser() - config["daemon"] = { - "port": PORT, - "dbfn": DBFN, - } - config["device"] = {} - #_print_config(config) - #print("now reading", fname) - config.read(fname) - #_print_config(config) - return config - -def _print_config(conf): - for section in conf.sections(): - print("section", section) - for option in conf.options(section): - print(" ", option, conf[section][option]) - -if __name__ == "__main__": - from sys import argv - conf = readconfig(argv[1]) - _print_config(conf) - print("binaryswitch", int(conf.get("device", "binaryswitch"), 0)) diff --git a/gps303/evstore.py b/gps303/evstore.py index 9bc60d5..b195021 100644 --- a/gps303/evstore.py +++ b/gps303/evstore.py @@ -1,10 +1,7 @@ -from logging import getLogger from sqlite3 import connect __all__ = ("initdb", "stow") -log = getLogger("gps303") - DB = None SCHEMA = """create table if not exists events ( @@ -19,7 +16,6 @@ SCHEMA = """create table if not exists events ( def initdb(dbname): global DB - log.info('Using Sqlite3 database "%s"', dbname) DB = connect(dbname) DB.execute(SCHEMA) diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 1280fe1..6fa7655 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -87,7 +87,8 @@ class GPS303Pkt: def to_packet(self): return pack("BB", self.length, self.PROTO) + self.payload - def response(self, *args): + @classmethod + def response(cls, *args): if len(args) == 0: return None assert len(args) == 1 and isinstance(args[0], bytes) @@ -95,7 +96,7 @@ class GPS303Pkt: length = len(payload) + 1 if length > 6: length -= 6 - return pack("BB", length, self.PROTO) + payload + return pack("BB", length, cls.PROTO) + payload class UNKNOWN(GPS303Pkt): @@ -112,14 +113,16 @@ class LOGIN(GPS303Pkt): self.ver = unpack("B", payload[-1:])[0] return self - def response(self): + @classmethod + def response(cls): return super().response(b"") class SUPERVISION(GPS303Pkt): # Server sends supervision number status PROTO = 0x05 - def response(self, supnum=0): + @classmethod + def response(cls, supnum=0): # 1: The device automatically answers Pickup effect # 2: Automatically Answering Two-way Calls # 3: Ring manually answer the two-way call @@ -145,9 +148,9 @@ class _GPS_POSITIONING(GPS303Pkt): self.gps_nb_sat = payload[6] & 0x0F lat, lon, speed, flags = unpack("!IIBH", payload[7:18]) self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3 - flip_lon = bool(flags & 0b0000100000000000) # bit 4 - flip_lat = not bool(flags & 0b0000010000000000) # bit 5 - self.heading = flags & 0b0000001111111111 # bits 6 - last + flip_lon = bool(flags & 0b0000100000000000) # bit 4 + flip_lat = not bool(flags & 0b0000010000000000) # bit 5 + self.heading = flags & 0b0000001111111111 # bits 6 - last self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1) self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1) self.speed = speed @@ -392,7 +395,7 @@ def proto_by_name(name): def proto_of_message(packet): - return unpack("B", packet[1:2]) + return unpack("B", packet[1:2])[0] def make_object(length, proto, payload): diff --git a/gps303/storage.py b/gps303/storage.py new file mode 100644 index 0000000..9da1d6d --- /dev/null +++ b/gps303/storage.py @@ -0,0 +1,38 @@ +""" Store zmq broadcasts to sqlite """ + +from getopt import getopt +from logging import getLogger +from logging.handlers import SysLogHandler +import sys +from time import time +import zmq + +from . import common +from .evstore import initdb, stow +from .gps303proto import parse_message + +log = getLogger("gps303/storage") + +def runserver(conf): + dbname = conf.get("storage", "dbfn") + log.info('Using Sqlite3 database "%s"', dbname) + initdb(dbname) + zctx = zmq.Context() + zsub = zctx.socket(zmq.SUB) + zsub.connect(conf.get("collector", "publishurl")) + zsub.setsockopt(zmq.SUBSCRIBE, b"") + + try: + while True: + zmsg = zsub.recv() + imei = zmsg[1:17].decode() + packet = zmsg[17:] + msg = parse_message(packet) + log.debug("From IMEI %s: %s", imei, msg) + stow("", time(), imei, msg.length, msg.PROTO, msg.payload) + except KeyboardInterrupt: + pass + + +if __name__.endswith("__main__"): + runserver(common.init(log)) -- 2.43.0