+"""
+Google Maps location service lookaside backend
+"""
+
+from configparser import ConfigParser
import googlemaps as gmaps
-from typing import Any, Dict, List, Tuple
+from typing import Any, Callable, Dict, List, Tuple
gclient = None
-def init(conf: Dict[str, Any]) -> None:
+def init(conf: ConfigParser) -> None:
global gclient
with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl:
token = fl.read().rstrip()
return
-def lookup(
+def _lookup(
mcc: int,
mnc: int,
gsm_cells: List[Tuple[int, int, int]],
wifi_aps: List[Tuple[str, int]],
-) -> Tuple[float, float]:
+) -> Any:
assert gclient is not None
kwargs = {
"home_mobile_country_code": mcc,
{"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps
],
}
- result = gclient.geolocate(**kwargs)
+ return gclient.geolocate(**kwargs)
+
+
+def lookup(
+ mcc: int,
+ mnc: int,
+ gsm_cells: List[Tuple[int, int, int]],
+ wifi_aps: List[Tuple[str, int]],
+) -> Tuple[float, float]:
+ result = _lookup(mcc, mnc, gsm_cells, wifi_aps)
if "location" in result:
return result["location"]["lat"], result["location"]["lng"]
else:
if __name__.endswith("__main__"):
- from datetime import datetime, timezone
- from sqlite3 import connect
- import sys
- from .zx303proto import *
- from .zx303proto import WIFI_POSITIONING, WIFI_OFFLINE_POSITIONING
+ from getopt import getopt
+ from json import loads
+ from logging import getLogger
+ from sys import argv
+ from . import common
- db = connect(sys.argv[1])
- c = db.cursor()
- c.execute(
- """select tstamp, packet from events
- where proto in (?, ?)""",
- (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
- )
- init({"googlemaps": {"accesstoken": sys.argv[2]}})
- count = 0
- for timestamp, packet in c:
- obj = parse_message(packet)
- print(obj)
- avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
- print(
- "{} {:+#010.8g},{:+#010.8g}".format(
- datetime.fromtimestamp(timestamp), avlat, avlon
- )
- )
- count += 1
- if count > 10:
- break
+ def cell_list(s: str) -> List[Tuple[int, int, int]]:
+ return [(int(ac), int(ci), int(sg)) for [ac, ci, sg] in loads(s)]
+
+ def ap_list(s: str) -> List[Tuple[str, int]]:
+ return [(mac, int(sg)) for [mac, sg] in loads(s)]
+
+ log = getLogger("loctrkd/googlemaps")
+ opts, args = getopt(argv[1:], "c:d")
+ conf = common.init(log, opts=opts)
+ init(conf)
+ parms = {}
+ needed: Dict[str, Callable[[Any], Any]] = {
+ "mcc": int,
+ "mnc": int,
+ "gsm_cells": cell_list,
+ "wifi_aps": ap_list,
+ }
+ parms = {k: needed.pop(k)(v) for k, v in [arg.split("=") for arg in args]}
+ if needed:
+ raise ValueError(f"still needed: {needed}")
+ print(_lookup(**parms))