""" Common housekeeping for all daemons """
-from configparser import ConfigParser, SectionProxy
+from configparser import ConfigParser
+from importlib import import_module
from getopt import getopt
+from json import dumps
from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
from logging.handlers import SysLogHandler
from pkg_resources import get_distribution, DistributionNotFound
from sys import argv, stderr, stdout
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from types import SimpleNamespace
+
+from .protomodule import ProtoModule
CONF = "/etc/loctrkd.conf"
-PORT = 4303
-DBFN = "/var/lib/loctrkd/loctrkd.sqlite"
+pmods: List[ProtoModule] = []
try:
version = get_distribution("loctrkd").version
version = "<local>"
+def init_protocols(conf: ConfigParser) -> None:
+ global pmods
+ pmods = [
+ cast(ProtoModule, import_module("." + modnm, __package__))
+ for modnm in conf.get("common", "protocols").split(",")
+ ]
+
+
def init(
log: Logger, opts: Optional[List[Tuple[str, str]]] = None
) -> ConfigParser:
if opts is None:
opts, _ = getopt(argv[1:], "c:d")
dopts = dict(opts)
- conf = readconfig(dopts["-c"] if "-c" in dopts else CONF)
+ conf = ConfigParser()
+ conf.read(dopts["-c"] if "-c" in dopts else CONF)
log.setLevel(DEBUG if "-d" in dopts else INFO)
if stdout.isatty():
fhdl = StreamHandler(stderr)
)
log.addHandler(lhdl)
log.info("%s starting with options: %s", version, dopts)
+ init_protocols(conf)
return conf
-def readconfig(fname: str) -> ConfigParser:
- config = ConfigParser()
- config["collector"] = {
- "port": str(PORT),
- }
- config["storage"] = {
- "dbfn": DBFN,
- }
- config["termconfig"] = {}
- config.read(fname)
- return config
-
-
-def normconf(section: SectionProxy) -> Dict[str, Any]:
- result: Dict[str, Any] = {}
- for key, val in section.items():
- vals = val.split("\n")
- if len(vals) > 1 and vals[0] == "":
- vals = vals[1:]
- lst: List[Union[str, int]] = []
- for el in vals:
- try:
- lst.append(int(el, 0))
- except ValueError:
- if el[0] == '"' and el[-1] == '"':
- el = el.strip('"').rstrip('"')
- lst.append(el)
- if not (
- all([isinstance(x, int) for x in lst])
- or all([isinstance(x, str) for x in lst])
- ):
- raise ValueError(
- "Values of %s - %s are of different type", key, vals
+def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
+ for pmod in pmods:
+ if pmod.probe_buffer(segment):
+ return pmod
+ return None
+
+
+def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
+ for pmod in pmods:
+ if pmod.proto_handled(proto):
+ return pmod
+ return None
+
+
+def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
+ pmod = pmod_for_proto(proto)
+ return pmod.parse_message(packet, is_incoming) if pmod else None
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+ return [item for pmod in pmods for item in pmod.exposed_protos()]
+
+
+class Report:
+ TYPE: str
+
+ def __repr__(self) -> str:
+ return (
+ self.__class__.__name__
+ + "("
+ + ", ".join(
+ [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]
)
- if len(lst) == 1:
- result[key] = lst[0]
- else:
- result[key] = lst
- return result
+ + ")"
+ )
+
+ @property
+ def json(self) -> str:
+ self.type = self.TYPE
+ return dumps(self.__dict__)
+
+
+class CoordReport(Report):
+ TYPE = "location"
+
+ def __init__(
+ self,
+ *,
+ devtime: str,
+ battery_percentage: int,
+ accuracy: float,
+ altitude: float,
+ speed: float,
+ direction: float,
+ latitude: float,
+ longitude: float,
+ ) -> None:
+ self.devtime = devtime
+ self.battery_percentage = battery_percentage
+ self.accuracy = accuracy
+ self.altitude = altitude
+ self.speed = speed
+ self.direction = direction
+ self.latitude = latitude
+ self.longitude = longitude
+
+
+class HintReport(Report):
+ TYPE = "approximate_location"
+ def __init__(
+ self,
+ *,
+ devtime: str,
+ battery_percentage: int,
+ mcc: int,
+ mnc: int,
+ gsm_cells: List[Tuple[int, int, int]],
+ wifi_aps: List[Tuple[str, str, int]],
+ ) -> None:
+ self.devtime = devtime
+ self.battery_percentage = battery_percentage
+ self.mcc = mcc
+ self.mnc = mnc
+ self.gsm_cells = gsm_cells
+ self.wifi_aps = wifi_aps
-if __name__ == "__main__":
- from sys import argv
- def _print_config(conf: ConfigParser) -> None:
- for section in conf.sections():
- print("section", section)
- for option in conf.options(section):
- print(" ", option, conf[section][option])
+class StatusReport(Report):
+ TYPE = "status"
- conf = readconfig(argv[1])
- _print_config(conf)
- print(normconf(conf["termconfig"]))
+ def __init__(self, *, battery_percentage: int) -> None:
+ self.battery_percentage = battery_percentage