]> average.org Git - loctrkd.git/blobdiff - loctrkd/common.py
beesure: consider valid if enough satellites
[loctrkd.git] / loctrkd / common.py
index 02b520f620d7d82582df949361b572327de283ac..941a93e008dbf793cad604d0df1fd86a81809b63 100644 (file)
@@ -3,11 +3,13 @@
 from configparser import ConfigParser
 from importlib import import_module
 from getopt import getopt
 from configparser import ConfigParser
 from importlib import import_module
 from getopt import getopt
+from json import dumps
 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
 from logging.handlers import SysLogHandler
 from pkg_resources import get_distribution, DistributionNotFound
 from sys import argv, stderr, stdout
 from typing import Any, cast, Dict, List, Optional, Tuple, Union
 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
 from logging.handlers import SysLogHandler
 from pkg_resources import get_distribution, DistributionNotFound
 from sys import argv, stderr, stdout
 from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from types import SimpleNamespace
 
 from .protomodule import ProtoModule
 
 
 from .protomodule import ProtoModule
 
@@ -67,3 +69,84 @@ def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
         if pmod.proto_handled(proto):
             return pmod
     return None
         if pmod.proto_handled(proto):
             return pmod
     return None
+
+
+def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
+    pmod = pmod_for_proto(proto)
+    return pmod.parse_message(packet, is_incoming) if pmod else None
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [item for pmod in pmods for item in pmod.exposed_protos()]
+
+
+class Report:
+    TYPE: str
+
+    def __repr__(self) -> str:
+        return (
+            self.__class__.__name__
+            + "("
+            + ", ".join(
+                [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]
+            )
+            + ")"
+        )
+
+    @property
+    def json(self) -> str:
+        self.type = self.TYPE
+        return dumps(self.__dict__)
+
+
+class CoordReport(Report):
+    TYPE = "location"
+
+    def __init__(
+        self,
+        *,
+        devtime: str,
+        battery_percentage: Optional[int],
+        accuracy: Optional[float],
+        altitude: Optional[float],
+        speed: Optional[float],
+        direction: Optional[float],
+        latitude: float,
+        longitude: float,
+    ) -> None:
+        self.devtime = devtime
+        self.battery_percentage = battery_percentage
+        self.accuracy = accuracy
+        self.altitude = altitude
+        self.speed = speed
+        self.direction = direction
+        self.latitude = latitude
+        self.longitude = longitude
+
+
+class HintReport(Report):
+    TYPE = "approximate_location"
+
+    def __init__(
+        self,
+        *,
+        devtime: str,
+        battery_percentage: Optional[int],
+        mcc: int,
+        mnc: int,
+        gsm_cells: List[Tuple[int, int, int]],
+        wifi_aps: List[Tuple[str, str, int]],
+    ) -> None:
+        self.devtime = devtime
+        self.battery_percentage = battery_percentage
+        self.mcc = mcc
+        self.mnc = mnc
+        self.gsm_cells = gsm_cells
+        self.wifi_aps = wifi_aps
+
+
+class StatusReport(Report):
+    TYPE = "status"
+
+    def __init__(self, *, battery_percentage: int) -> None:
+        self.battery_percentage = battery_percentage