from types import SimpleNamespace
from .protomodule import ProtoClass
+from .common import (
+ CoordReport,
+ HintReport,
+ StatusReport,
+ Report,
+)
__all__ = (
"Stream",
(self, "pedometer", int),
(self, "tubmling_times", int),
(self, "device_status", lambda x: int(x, 16)),
- (self, "base_stations_number", int),
+ (self, "gsm_cells_number", int),
(self, "connect_base_station_number", int),
(self, "mcc", int),
(self, "mnc", int),
setattr(obj, attr, func(val)) # type: ignore
rest_args = args[20:]
# (area_id, cell_id, strength)*
- self.base_stations = [
- tuple(int(el) for el in rest_args[i * 3 : 3 + i * 3])
- for i in range(self.base_stations_number)
+ self.gsm_cells: List[Tuple[int, int, int]] = [
+ tuple(int(el) for el in rest_args[i * 3 : 3 + i * 3]) # type: ignore
+ for i in range(self.gsm_cells_number)
]
- rest_args = rest_args[3 * self.base_stations_number :]
+ rest_args = rest_args[3 * self.gsm_cells_number :]
self.wifi_aps_number = int(rest_args[0])
# (SSID, MAC, strength)*
self.wifi_aps = [
self.latitude = p.lat * p.nors
self.longitude = p.lon * p.eorw
- def rectified(self) -> SimpleNamespace: # JSON-able dict
+ def rectified(self) -> Report:
if self.gps_valid:
- return SimpleNamespace(
- type="location",
+ return CoordReport(
devtime=str(self.devtime),
battery_percentage=self.battery_percentage,
accuracy=self.positioning_accuracy,
longitude=self.longitude,
)
else:
- return SimpleNamespace(
- type="approximate_location",
+ return HintReport(
devtime=str(self.devtime),
battery_percentage=self.battery_percentage,
mcc=self.mcc,
mnc=self.mnc,
- base_stations=self.base_stations,
+ gsm_cells=self.gsm_cells,
wifi_aps=self.wifi_aps,
)
def exposed_protos() -> List[Tuple[str, bool]]:
return [
- (proto_name(cls), False)
+ (proto_name(cls)[:16], False)
for cls in CLASSES.values()
if hasattr(cls, "rectified")
]
from configparser import ConfigParser
from importlib import import_module
from getopt import getopt
+from json import dumps
from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
from logging.handlers import SysLogHandler
from pkg_resources import get_distribution, DistributionNotFound
from sys import argv, stderr, stdout
from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from types import SimpleNamespace
from .protomodule import ProtoModule
def exposed_protos() -> List[Tuple[str, bool]]:
return [item for pmod in pmods for item in pmod.exposed_protos()]
+
+
+class Report(SimpleNamespace):
+ TYPE: str
+
+ @property
+ def json(self) -> str:
+ self.type = self.TYPE
+ return dumps(self.__dict__)
+
+
+class CoordReport(Report):
+ TYPE = "location"
+
+ def __init__(
+ self,
+ *,
+ devtime: str,
+ battery_percentage: int,
+ accuracy: float,
+ altitude: float,
+ speed: float,
+ direction: float,
+ latitude: float,
+ longitude: float
+ ) -> None:
+ super().__init__(
+ devtime=devtime,
+ battery_percentage=battery_percentage,
+ accuracy=accuracy,
+ altitude=altitude,
+ speed=speed,
+ direction=direction,
+ latitude=latitude,
+ longitude=longitude,
+ )
+
+
+class HintReport(Report):
+ TYPE = "approximate_location"
+
+ def __init__(
+ self,
+ *,
+ devtime: str,
+ battery_percentage: int,
+ mcc: int,
+ mnc: int,
+ gsm_cells: List[Tuple[int, int, int]],
+ wifi_aps: List[Tuple[str, str, int]]
+ ) -> None:
+ super().__init__(
+ devtime=devtime,
+ battery_percentage=battery_percentage,
+ mcc=mcc,
+ mnc=mnc,
+ gsm_cells=gsm_cells,
+ wifi_aps=wifi_aps,
+ )
+
+
+class StatusReport(Report):
+ TYPE = "status"
+
+ def __init__(self, *, battery_percentage: int) -> None:
+ super().__init__(battery_percentage=battery_percentage)
import zmq
from . import common
-from .zmsg import Bcast, Report, Resp, topic
+from .common import CoordReport, HintReport, StatusReport, Report
+from .zmsg import Bcast, Rept, Resp, topic
log = getLogger("loctrkd/rectifier")
datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
msg,
)
- rect = msg.rectified()
+ rect: Report = msg.rectified()
log.debug("rectified: %s", rect)
- if rect.type == "approximate_location":
+ if isinstance(rect, (CoordReport, StatusReport)):
+ zpub.send(Rept(imei=zmsg.imei, payload=rect.json).packed)
+ elif isinstance(rect, HintReport):
try:
lat, lon = qry.lookup(
- rect.mcc, rect.mnc, rect.base_stations, rect.wifi_aps
- )
- resp = Resp(
- imei=zmsg.imei,
- when=zmsg.when, # not the current time, but the original!
- packet=msg.Out(latitude=lat, longitude=lon).packed,
+ rect.mcc, rect.mnc, rect.gsm_cells, rect.wifi_aps
)
log.debug(
- "Response for lat=%s, lon=%s: %s", lat, lon, resp
+ "Approximated lat=%s, lon=%s for %s", lat, lon, rect
+ )
+ if proto_needanswer.get(zmsg.proto, False):
+ resp = Resp(
+ imei=zmsg.imei,
+ when=zmsg.when, # not the current time, but the original!
+ packet=msg.Out(latitude=lat, longitude=lon).packed,
+ )
+ log.debug("Sending reponse %s", resp)
+ zpush.send(resp.packed)
+ zpub.send(
+ Rept(
+ imei=zmsg.imei,
+ payload=CoordReport(
+ devtime=rect.devtime,
+ battery_percentage=rect.battery_percentage,
+ accuracy=-1,
+ altitude=-1,
+ speed=-1,
+ direction=-1,
+ latitude=lat,
+ longitude=lon,
+ ).json,
+ ).packed
)
- zpush.send(resp.packed)
except Exception as e:
- log.warning("Lookup for %s resulted in %s", msg, e)
+ log.warning(
+ "Lookup for %s rectified as %s resulted in %s",
+ msg,
+ rect,
+ e,
+ )
except KeyboardInterrupt:
zsub.close()
self.packet = buffer[24:]
-class Report(_Zmsg):
+class Rept(_Zmsg):
"""Broadcast Zzmq message with "rectified" proto-agnostic json data"""
KWARGS = (("imei", None), ("payload", ""))
Union,
)
+from .common import CoordReport, HintReport, StatusReport
from .protomodule import ProtoClass
__all__ = (
ttup = (tup[0] % 100,) + tup[1:6]
return pack("BBBBBB", *ttup)
- def rectified(self) -> SimpleNamespace: # JSON-able dict
- return SimpleNamespace(
- type="location",
+ def rectified(self) -> CoordReport: # JSON-able dict
+ return CoordReport(
devtime=str(self.devtime),
+ battery_percentage=-1,
+ accuracy=-1.0,
+ altitude=-1.0,
speed=self.speed,
direction=self.heading,
latitude=self.latitude,
def out_encode(self) -> bytes: # Set interval in minutes
return pack("B", self.upload_interval)
+ def rectified(self) -> StatusReport:
+ return StatusReport(battery_percentage=self.batt)
+
class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
PROTO = 0x14
]
)
- def rectified(self) -> SimpleNamespace: # JSON-able dict
- return SimpleNamespace(
- type="approximate_location",
+ def rectified(self) -> HintReport:
+ return HintReport(
devtime=str(self.devtime),
+ battery_percentage=-1,
mcc=self.mcc,
mnc=self.mnc,
- base_stations=self.gsm_cells,
- wifi_aps=self.wifi_aps,
+ gsm_cells=self.gsm_cells,
+ wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
)
def exposed_protos() -> List[Tuple[str, bool]]:
return [
- (proto_name(cls), cls.RESPOND is Respond.EXT)
+ (proto_name(cls)[:16], cls.RESPOND is Respond.EXT)
for cls in CLASSES.values()
if hasattr(cls, "rectified")
]