[collector]
port = 4303
publishurl = ipc:///tmp/collected
-listenurl = ipc:///responses
+listenurl = ipc:///tmp/responses
-[daemon]
-port = 4303
+[storage]
dbfn = gps303.sqlite
[opencellid]
log.setLevel(DEBUG if "-d" in opts else INFO)
log.info("starting with options: %s", opts)
- initdb(conf.get("daemon", "dbfn"))
+ initdb(conf.get("storage", "dbfn"))
set_config(conf)
ctlsock = socket(AF_INET, SOCK_STREAM)
ctlsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
- ctlsock.bind(("", conf.getint("daemon", "port")))
+ ctlsock.bind(("", conf.getint("collector", "port")))
ctlsock.listen(5)
ctlfd = ctlsock.fileno()
pollset = poll()
""" TCP server that communicates with terminals """
from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging import getLogger
from logging.handlers import SysLogHandler
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
-import sys
+from struct import pack
import zmq
-from .config import readconfig
-from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
log = getLogger("gps303/collector")
class Bcast:
"""Zmq message to broadcast what was received from the terminal"""
+
def __init__(self, imei, msg):
- self.as_bytes = imei.encode() + msg.to_packet()
+ self.as_bytes = (
+ pack("B", proto_of_message(msg))
+ + ("0000000000000000" if imei is None else imei).encode()
+ + msg
+ )
class Resp:
"""Zmq message received from a third party to send to the terminal"""
- def __init__(self, msg):
- self.imei = msg[:16].decode()
- self.payload = msg[16:]
+
+ def __init__(self, *args, **kwargs):
+ if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
+ self.imei = msg[:16].decode()
+ self.payload = msg[16:]
+ elif len(args) == 0:
+ self.imei = kwargs["imei"]
+ self.payload = kwargs["payload"]
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
+
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
self.imei = None
def recv(self):
- """ Read from the socket and parse complete messages """
+ """Read from the socket and parse complete messages"""
try:
segment = self.sock.recv(4096)
except OSError:
- log.warning("Reading from fd %d (IMEI %s): %s",
- self.sock.fileno(), self.imei, e)
+ log.warning(
+ "Reading from fd %d (IMEI %s): %s",
+ self.sock.fileno(),
+ self.imei,
+ e,
+ )
return None
if not segment: # Terminal has closed connection
- log.info("EOF reading from fd %d (IMEI %s)",
- self.sock.fileno(), self.imei)
+ log.info(
+ "EOF reading from fd %d (IMEI %s)",
+ self.sock.fileno(),
+ self.imei,
+ )
return None
when = time()
self.buffer += segment
if framestart == -1: # No frames, return whatever we have
break
if framestart > 0: # Should not happen, report
- log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
- self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+ log.warning(
+ 'Undecodable data "%s" from fd %d (IMEI %s)',
+ self.buffer[:framestart].hex(),
+ self.sock.fileno(),
+ self.imei,
+ )
self.buffer = self.buffer[framestart:]
# At this point, buffer starts with a packet
frameend = self.buffer.find(b"\r\n", 4)
if frameend == -1: # Incomplete frame, return what we have
break
- msg = parse_message(self.buffer[2:frameend])
- self.buffer = self.buffer[frameend+2:]
- if isinstance(msg, LOGIN):
- self.imei = msg.imei
- log.info("LOGIN from fd %d: IMEI %s",
- self.sock.fileno(), self.imei)
- msgs.append(msg)
+ packet = self.buffer[2:frameend]
+ self.buffer = self.buffer[frameend + 2 :]
+ if proto_of_message(packet) == LOGIN.PROTO:
+ self.imei = parse_message(packet).imei
+ log.info(
+ "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+ )
+ msgs.append(packet)
return msgs
def send(self, buffer):
try:
self.sock.send(b"xx" + buffer + b"\r\n")
except OSError as e:
- log.error("Sending to fd %d (IMEI %s): %s",
- self.sock.fileno, self.imei, e)
+ log.error(
+ "Sending to fd %d (IMEI %s): %s",
+ self.sock.fileno,
+ self.imei,
+ e,
+ )
+
class Clients:
def __init__(self):
def recv(self, fd):
clnt = self.by_fd[fd]
msgs = clnt.recv()
+ if msgs is None:
+ return None
result = []
for msg in msgs:
- if isinstance(msg, LOGIN):
+ if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly...
self.by_imei[clnt.imei] = clnt
result.append((clnt.imei, msg))
return result
self.by_imei[resp.imei].send(resp.payload)
-def runserver(opts, conf):
+def runserver(conf):
zctx = zmq.Context()
zpub = zctx.socket(zmq.PUB)
zpub.bind(conf.get("collector", "publishurl"))
else:
for imei, msg in clients.recv(sk):
zpub.send(Bcast(imei, msg).as_bytes)
- if msg is None or isinstance(msg, HIBERNATION):
- log.debug("HIBERNATION from fd %d", sk)
+ if (
+ msg is None
+ or proto_of_message(msg) == HIBERNATION.PROTO
+ ):
+ log.debug(
+ "HIBERNATION from fd %d (IMEI %s)", sk, imei
+ )
tostop.append(sk)
+ elif proto_of_message(msg) == LOGIN.PROTO:
+ clients.response(Resp(imei=imei, payload=LOGIN.response()))
# poll queue consumed, make changes now
for fd in tostop:
poller.unregister(fd)
if __name__.endswith("__main__"):
- opts, _ = getopt(sys.argv[1:], "c:d")
- opts = dict(opts)
- conf = readconfig(opts["-c"] if "-c" in opts else CONF)
- if sys.stdout.isatty():
- log.addHandler(StreamHandler(sys.stderr))
- else:
- log.addHandler(SysLogHandler(address="/dev/log"))
- log.setLevel(DEBUG if "-d" in opts else INFO)
- log.info("starting with options: %s", opts)
- runserver(opts, conf)
+ runserver(common.init(log))
--- /dev/null
+""" Common housekeeping for all daemons """
+
+from configparser import ConfigParser
+from getopt import getopt
+from logging import getLogger, StreamHandler, DEBUG, INFO
+from sys import argv, stderr, stdout
+
+CONF = "/etc/gps303.conf"
+PORT = 4303
+DBFN = "/var/lib/gps303/gps303.sqlite"
+
+def init(log):
+ opts, _ = getopt(argv[1:], "c:d")
+ opts = dict(opts)
+ conf = readconfig(opts["-c"] if "-c" in opts else CONF)
+ if stdout.isatty():
+ log.addHandler(StreamHandler(stderr))
+ else:
+ log.addHandler(SysLogHandler(address="/dev/log"))
+ log.setLevel(DEBUG if "-d" in opts else INFO)
+ log.info("starting with options: %s", opts)
+ return conf
+
+def readconfig(fname):
+ config = ConfigParser()
+ config["collector"] = {
+ "port": PORT,
+ }
+ config["storage"] = {
+ "dbfn": DBFN,
+ }
+ config["device"] = {}
+ #_print_config(config)
+ #print("now reading", fname)
+ config.read(fname)
+ #_print_config(config)
+ return config
+
+if __name__ == "__main__":
+ from sys import argv
+
+ def _print_config(conf):
+ for section in conf.sections():
+ print("section", section)
+ for option in conf.options(section):
+ print(" ", option, conf[section][option])
+
+ conf = readconfig(argv[1])
+ _print_config(conf)
+ print("binaryswitch", int(conf.get("device", "binaryswitch"), 0))
+++ /dev/null
-from configparser import ConfigParser
-
-PORT = 4303
-DBFN = "/var/lib/gps303/gps303.sqlite"
-
-def readconfig(fname):
- config = ConfigParser()
- config["daemon"] = {
- "port": PORT,
- "dbfn": DBFN,
- }
- config["device"] = {}
- #_print_config(config)
- #print("now reading", fname)
- config.read(fname)
- #_print_config(config)
- return config
-
-def _print_config(conf):
- for section in conf.sections():
- print("section", section)
- for option in conf.options(section):
- print(" ", option, conf[section][option])
-
-if __name__ == "__main__":
- from sys import argv
- conf = readconfig(argv[1])
- _print_config(conf)
- print("binaryswitch", int(conf.get("device", "binaryswitch"), 0))
-from logging import getLogger
from sqlite3 import connect
__all__ = ("initdb", "stow")
-log = getLogger("gps303")
-
DB = None
SCHEMA = """create table if not exists events (
def initdb(dbname):
global DB
- log.info('Using Sqlite3 database "%s"', dbname)
DB = connect(dbname)
DB.execute(SCHEMA)
def to_packet(self):
return pack("BB", self.length, self.PROTO) + self.payload
- def response(self, *args):
+ @classmethod
+ def response(cls, *args):
if len(args) == 0:
return None
assert len(args) == 1 and isinstance(args[0], bytes)
length = len(payload) + 1
if length > 6:
length -= 6
- return pack("BB", length, self.PROTO) + payload
+ return pack("BB", length, cls.PROTO) + payload
class UNKNOWN(GPS303Pkt):
self.ver = unpack("B", payload[-1:])[0]
return self
- def response(self):
+ @classmethod
+ def response(cls):
return super().response(b"")
class SUPERVISION(GPS303Pkt): # Server sends supervision number status
PROTO = 0x05
- def response(self, supnum=0):
+ @classmethod
+ def response(cls, supnum=0):
# 1: The device automatically answers Pickup effect
# 2: Automatically Answering Two-way Calls
# 3: Ring manually answer the two-way call
self.gps_nb_sat = payload[6] & 0x0F
lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
- flip_lon = bool(flags & 0b0000100000000000) # bit 4
- flip_lat = not bool(flags & 0b0000010000000000) # bit 5
- self.heading = flags & 0b0000001111111111 # bits 6 - last
+ flip_lon = bool(flags & 0b0000100000000000) # bit 4
+ flip_lat = not bool(flags & 0b0000010000000000) # bit 5
+ self.heading = flags & 0b0000001111111111 # bits 6 - last
self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
self.speed = speed
def proto_of_message(packet):
- return unpack("B", packet[1:2])
+ return unpack("B", packet[1:2])[0]
def make_object(length, proto, payload):
--- /dev/null
+""" Store zmq broadcasts to sqlite """
+
+from getopt import getopt
+from logging import getLogger
+from logging.handlers import SysLogHandler
+import sys
+from time import time
+import zmq
+
+from . import common
+from .evstore import initdb, stow
+from .gps303proto import parse_message
+
+log = getLogger("gps303/storage")
+
+def runserver(conf):
+ dbname = conf.get("storage", "dbfn")
+ log.info('Using Sqlite3 database "%s"', dbname)
+ initdb(dbname)
+ zctx = zmq.Context()
+ zsub = zctx.socket(zmq.SUB)
+ zsub.connect(conf.get("collector", "publishurl"))
+ zsub.setsockopt(zmq.SUBSCRIBE, b"")
+
+ try:
+ while True:
+ zmsg = zsub.recv()
+ imei = zmsg[1:17].decode()
+ packet = zmsg[17:]
+ msg = parse_message(packet)
+ log.debug("From IMEI %s: %s", imei, msg)
+ stow("", time(), imei, msg.length, msg.PROTO, msg.payload)
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__.endswith("__main__"):
+ runserver(common.init(log))