"proto_handled",
"parse_message",
"probe_buffer",
- "proto_name",
"DecodeError",
"Respond",
)
### Parser/Constructor ###
+class classproperty:
+ def __init__(self, f: Callable[[Any], str]) -> None:
+ self.f = f
+
+ def __get__(self, obj: Any, owner: Any) -> str:
+ return self.f(owner)
+
+
class DecodeError(Exception):
def __init__(self, e: Exception, **kwargs: Any) -> None:
super().__init__(e)
# Overridden in subclasses, otherwise command verb only
return ""
- @property
- def PROTO(self) -> str:
+ @classproperty
+ def PROTO(cls: "BeeSurePkt") -> str:
+ """Name of the class without possible .In / .Out suffix"""
+ proto: str
try:
- proto, _ = self.__class__.__name__.split(".")
+ proto, _ = cls.__name__.split(".")
except ValueError:
- proto = self.__class__.__name__
+ proto = cls.__name__
return proto
+ @classmethod
+ def proto_name(cls) -> str:
+ """Name of the command as used externally"""
+ return PROTO_PREFIX + cls.PROTO[:16]
+
@property
def packed(self) -> bytes:
data = self.encode()
return proto.startswith(PROTO_PREFIX)
-def proto_name(obj: Union[Type[BeeSurePkt], BeeSurePkt]) -> str:
- return PROTO_PREFIX + (
- obj.__class__.__name__ if isinstance(obj, BeeSurePkt) else obj.__name__
- )
-
-
def proto_of_message(packet: bytes) -> str:
return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
def exposed_protos() -> List[Tuple[str, bool]]:
return [
- (proto_name(cls)[:16], False)
+ (cls.proto_name(), False)
for cls in CLASSES.values()
if hasattr(cls, "rectified")
]
return [item for pmod in pmods for item in pmod.exposed_protos()]
-class Report(SimpleNamespace):
+class Report:
TYPE: str
+ def __repr__(self) -> str:
+ return (
+ self.__class__.__name__
+ + "("
+ + ", ".join([f"{k}={v}" for k, v in self.__dict__.items()])
+ + ")"
+ )
+
@property
def json(self) -> str:
self.type = self.TYPE
speed: float,
direction: float,
latitude: float,
- longitude: float
+ longitude: float,
) -> None:
- super().__init__(
- devtime=devtime,
- battery_percentage=battery_percentage,
- accuracy=accuracy,
- altitude=altitude,
- speed=speed,
- direction=direction,
- latitude=latitude,
- longitude=longitude,
- )
+ self.devtime = devtime
+ self.battery_percentage = battery_percentage
+ self.accuracy = accuracy
+ self.altitude = altitude
+ self.speed = speed
+ self.direction = direction
+ self.latitude = latitude
+ self.longitude = longitude
class HintReport(Report):
mcc: int,
mnc: int,
gsm_cells: List[Tuple[int, int, int]],
- wifi_aps: List[Tuple[str, str, int]]
+ wifi_aps: List[Tuple[str, str, int]],
) -> None:
- super().__init__(
- devtime=devtime,
- battery_percentage=battery_percentage,
- mcc=mcc,
- mnc=mnc,
- gsm_cells=gsm_cells,
- wifi_aps=wifi_aps,
- )
+ self.devtime = devtime
+ self.battery_percentage = battery_percentage
+ self.mcc = mcc
+ self.mnc = mnc
+ self.gsm_cells = gsm_cells
+ self.wifi_aps = wifi_aps
class StatusReport(Report):
TYPE = "status"
def __init__(self, *, battery_percentage: int) -> None:
- super().__init__(battery_percentage=battery_percentage)
+ self.battery_percentage = battery_percentage
return result["location"]["lat"], result["location"]["lng"]
else:
raise ValueError("google geolocation: " + str(result))
-
-
-if __name__.endswith("__main__"):
- from datetime import datetime, timezone
- from sqlite3 import connect
- import sys
- from .zx303proto import *
- from .zx303proto import WIFI_POSITIONING, WIFI_OFFLINE_POSITIONING
-
- db = connect(sys.argv[1])
- c = db.cursor()
- c.execute(
- """select tstamp, packet from events
- where proto in (?, ?)""",
- (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
- )
- init({"googlemaps": {"accesstoken": sys.argv[2]}})
- count = 0
- for timestamp, packet in c:
- obj = parse_message(packet)
- print(obj)
- avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
- print(
- "{} {:+#010.8g},{:+#010.8g}".format(
- datetime.fromtimestamp(timestamp), avlat, avlon
- )
- )
- count += 1
- if count > 10:
- break
lc.execute("""detach database mem""")
lc.close()
return avlat, avlon
-
-
-if __name__.endswith("__main__"):
- from datetime import datetime, timezone
- import sys
- from .zx303proto import *
- from .zx303proto import WIFI_POSITIONING, WIFI_OFFLINE_POSITIONING
-
- db = connect(sys.argv[1])
- c = db.cursor()
- c.execute(
- """select tstamp, packet from events
- where proto in (?, ?)""",
- (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
- )
- init({"opencellid": {"dbfn": sys.argv[2]}})
- for timestamp, packet in c:
- obj = parse_message(packet)
- avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
- print(
- "{} {:+#010.8g},{:+#010.8g}".format(
- datetime.fromtimestamp(timestamp), avlat, avlon
- )
- )
IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
+ @classmethod
+ def proto_name(cls) -> str:
+ ...
+
class In:
def __init__(self, *args: Any, **kwargs: Any) -> None:
...
from logging import getLogger
from os import umask
from struct import pack
+from typing import cast, List, Tuple
import zmq
from . import common
log = getLogger("loctrkd/rectifier")
+class QryModule:
+ @staticmethod
+ def init(conf: ConfigParser) -> None:
+ ...
+
+ @staticmethod
+ def shut() -> None:
+ ...
+
+ @staticmethod
+ def lookup(
+ mcc: int,
+ mnc: int,
+ gsm_cells: List[Tuple[int, int, int]],
+ wifi_aps: List[Tuple[str, int]],
+ ) -> Tuple[float, float]:
+ ...
+
+
def runserver(conf: ConfigParser) -> None:
- qry = import_module("." + conf.get("rectifier", "lookaside"), __package__)
+ qry = cast(
+ QryModule,
+ import_module("." + conf.get("rectifier", "lookaside"), __package__),
+ )
qry.init(conf)
proto_needanswer = dict(common.exposed_protos())
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
elif isinstance(rect, HintReport):
try:
lat, lon = qry.lookup(
- rect.mcc, rect.mnc, rect.gsm_cells, rect.wifi_aps
+ rect.mcc,
+ rect.mnc,
+ rect.gsm_cells,
+ list((mac, strng) for _, mac, strng in rect.wifi_aps),
)
log.debug(
"Approximated lat=%s, lon=%s for %s", lat, lon, rect
zsub = zctx.socket(zmq.SUB) # type: ignore
zsub.connect(conf.get("collector", "publishurl"))
for proto in (
- proto_name(STATUS),
- proto_name(SETUP),
- proto_name(POSITION_UPLOAD_INTERVAL),
+ STATUS.proto_name(),
+ SETUP.proto_name(),
+ POSITION_UPLOAD_INTERVAL.proto_name(),
):
zsub.setsockopt(zmq.SUBSCRIBE, topic(proto))
zpush = zctx.socket(zmq.PUSH) # type: ignore
"proto_handled",
"parse_message",
"probe_buffer",
- "proto_name",
"DecodeError",
"Respond",
)
-PROTO_PREFIX = "ZX:"
+PROTO_PREFIX: str = "ZX:"
### Deframer ###
# Overridden in subclasses, otherwise make empty payload
return b""
+ @classmethod
+ def proto_name(cls) -> str:
+ """Name of the command as used externally"""
+ return (PROTO_PREFIX + cls.__name__)[:16]
+
@property
def packed(self) -> bytes:
payload = self.encode()
return proto.startswith(PROTO_PREFIX)
-def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
- return PROTO_PREFIX + (
- obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
- )
-
-
def proto_of_message(packet: bytes) -> str:
- return proto_name(CLASSES.get(packet[1], UNKNOWN))
+ return CLASSES.get(packet[1], UNKNOWN).proto_name()
def imei_from_packet(packet: bytes) -> Optional[str]:
def exposed_protos() -> List[Tuple[str, bool]]:
return [
- (proto_name(cls)[:16], cls.RESPOND is Respond.EXT)
+ (cls.proto_name(), cls.RESPOND is Respond.EXT)
for cls in CLASSES.values()
if hasattr(cls, "rectified")
]