[opencellid]
dbfn = opencellid.sqlite
-[device]
+[termconfig]
+statusIntervalMinutes = 25
uploadIntervalSeconds = 0x0300
binarySwitch = 0b00110001
alarms =
PORT = 4303
DBFN = "/var/lib/gps303/gps303.sqlite"
+
def init(log):
opts, _ = getopt(argv[1:], "c:d")
opts = dict(opts)
log.info("starting with options: %s", opts)
return conf
+
def readconfig(fname):
config = ConfigParser()
config["collector"] = {
config["storage"] = {
"dbfn": DBFN,
}
- config["device"] = {}
- #_print_config(config)
- #print("now reading", fname)
+ config["termconfig"] = {}
config.read(fname)
- #_print_config(config)
return config
+
+def normconf(section):
+ result = {}
+ for key, val in section.items():
+ vals = val.split("\n")
+ if len(vals) > 1 and vals[0] == "":
+ vals = vals[1:]
+ lst = []
+ for el in vals:
+ try:
+ el = int(el, 0)
+ except ValueError:
+ if el[0] == '"' and el[-1] == '"':
+ el = el.strip('"').rstrip('"')
+ lst.append(el)
+ if len(lst) == 1:
+ [lst] = lst
+ result[key] = lst
+ return result
+
+
if __name__ == "__main__":
from sys import argv
conf = readconfig(argv[1])
_print_config(conf)
- print("binaryswitch", int(conf.get("device", "binaryswitch"), 0))
+ print(normconf(conf["termconfig"]))
def response(
self,
- uploadIntervalSeconds=0x0300,
- binarySwitch=0b00110001,
+ uploadintervalseconds=0x0300,
+ binaryswitch=0b00110001,
alarms=[0, 0, 0],
- dndTimeSwitch=0,
- dndTimes=[0, 0, 0],
- gpsTimeSwitch=0,
- gpsTimeStart=0,
- gpsTimeStop=0,
- phoneNumbers=["", "", ""],
+ dndtimeswitch=0,
+ dndtimes=[0, 0, 0],
+ gpstimeswitch=0,
+ gpstimestart=0,
+ gpstimestop=0,
+ phonenumbers=["", "", ""],
):
def pack3b(x):
return pack("!I", x)[1:]
payload = b"".join(
[
- pack("!H", uploadIntervalSeconds),
- pack("B", binarySwitch),
+ pack("!H", uploadintervalseconds),
+ pack("B", binaryswitch),
]
+ [pack3b(el) for el in alarms]
+ [
- pack("B", dndTimeSwitch),
+ pack("B", dndtimeswitch),
]
- + [pack3b(el) for el in dndTimes]
+ + [pack3b(el) for el in dndtimes]
+ [
- pack("B", gpsTimeSwitch),
- pack("!H", gpsTimeStart),
- pack("!H", gpsTimeStop),
+ pack("B", gpstimeswitch),
+ pack("!H", gpstimestart),
+ pack("!H", gpstimestop),
]
- + [b";".join([el.encode() for el in phoneNumbers])]
+ + [b";".join([el.encode() for el in phonenumbers])]
)
return self.make_packet(payload)
import zmq
from . import common
-from .gps303proto import parse_message, proto_by_name
+from .gps303proto import *
from .zmsg import Bcast, Resp
log = getLogger("gps303/termconfig")
def runserver(conf):
+ termconfig = common.normconf(conf["termconfig"])
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
zsub.connect(conf.get("collector", "publishurl"))
datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
msg,
)
- # TODO get data from the config
- resp = Resp(imei=zmsg.imei, packet=msg.response())
+ kwargs = {}
+ if isinstance(msg, STATUS):
+ kwargs = {
+ "upload_interval": termconf.get(
+ "statusintervalminutes", 25
+ )
+ }
+ elif isinstance(msg, SETUP):
+ for key in (
+ "uploadintervalseconds",
+ "binaryswitch",
+ "alarms",
+ "dndtimeswitch",
+ "dndtimes",
+ "gpstimeswitch",
+ "gpstimestart",
+ "gpstimestop",
+ "phonenumbers",
+ ):
+ if key in termconfig:
+ kwargs[key] = termconfig[key]
+ resp = Resp(imei=zmsg.imei, packet=msg.response(**kwargs))
log.debug("Response: %s", resp)
zpush.send(resp.packed)