2 Lookaside backend to query local opencellid database
5 from sqlite3 import connect
6 from typing import Any, Dict, List, Tuple
8 __all__ = "init", "lookup"
13 def init(conf: Dict[str, Any]) -> None:
15 ldb = connect(conf["opencellid"]["dbfn"])
24 mcc: int, mnc: int, gsm_cells: List[Tuple[int, int, int]], __: Any
25 ) -> Tuple[float, float]:
26 assert ldb is not None
28 lc.execute("""attach database ":memory:" as mem""")
29 lc.execute("create table mem.seen (locac int, cellid int, signal int)")
31 """insert into mem.seen (locac, cellid, signal)
37 """select c.lat, c.lon, s.signal
38 from main.cells c, mem.seen s
41 and c.cell = s.cellid""",
44 data = list(lc.fetchall())
47 sumsig = sum([1 / sig for _, _, sig in data])
48 nsigs = [1 / sig / sumsig for _, _, sig in data]
49 avlat = sum([lat * nsig for (lat, _, _), nsig in zip(data, nsigs)])
50 avlon = sum([lon * nsig for (_, lon, _), nsig in zip(data, nsigs)])
51 # lc.execute("drop table mem.seen")
52 lc.execute("""detach database mem""")
57 if __name__.endswith("__main__"):
58 from datetime import datetime, timezone
60 from .gps303proto import *
62 db = connect(sys.argv[1])
65 """select tstamp, packet from events
66 where proto in (?, ?)""",
67 (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
69 init({"opencellid": {"dbfn": sys.argv[2]}})
70 for timestamp, packet in c:
71 obj = parse_message(packet)
72 avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
74 "{} {:+#010.8g},{:+#010.8g}".format(
75 datetime.fromtimestamp(timestamp), avlat, avlon