Lookaside backend to query local opencellid database
"""
+from configparser import ConfigParser
from sqlite3 import connect
from typing import Any, Dict, List, Tuple
ldb = None
-def init(conf: Dict[str, Any]) -> None:
+def init(conf: ConfigParser) -> None:
global ldb
ldb = connect(conf["opencellid"]["dbfn"])
+ ldb.execute("create temp table seen (locac int, cellid int, signal int)")
def shut() -> None:
def lookup(
mcc: int, mnc: int, gsm_cells: List[Tuple[int, int, int]], __: Any
-) -> Tuple[float, float]:
+) -> Tuple[float, float, float]:
assert ldb is not None
lc = ldb.cursor()
- lc.execute("""attach database ":memory:" as mem""")
- lc.execute("create table mem.seen (locac int, cellid int, signal int)")
lc.executemany(
- """insert into mem.seen (locac, cellid, signal)
- values (?, ?, ?)""",
+ "insert into seen (locac, cellid, signal) values (?, ?, ?)",
gsm_cells,
)
ldb.commit()
lc.execute(
"""select c.lat, c.lon, s.signal
- from main.cells c, mem.seen s
+ from cells c, seen s
where c.mcc = ?
+ and c.net = ?
and c.area = s.locac
and c.cell = s.cellid""",
- (mcc,),
+ (mcc, mnc),
)
data = list(lc.fetchall())
+ # This should be faster than dropping and recreating the temp table
+ # https://www.sqlite.org/lang_delete.html#the_truncate_optimization
+ lc.execute("delete from seen")
+ lc.close()
if not data:
- return 0.0, 0.0
+ raise ValueError("No location data found in opencellid")
sumsig = sum([1 / sig for _, _, sig in data])
nsigs = [1 / sig / sumsig for _, _, sig in data]
avlat = sum([lat * nsig for (lat, _, _), nsig in zip(data, nsigs)])
avlon = sum([lon * nsig for (_, lon, _), nsig in zip(data, nsigs)])
- # lc.execute("drop table mem.seen")
- lc.execute("""detach database mem""")
- lc.close()
- return avlat, avlon
-
-
-if __name__.endswith("__main__"):
- from datetime import datetime, timezone
- import sys
- from .zx303proto import *
-
- db = connect(sys.argv[1])
- c = db.cursor()
- c.execute(
- """select tstamp, packet from events
- where proto in (?, ?)""",
- (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
- )
- init({"opencellid": {"dbfn": sys.argv[2]}})
- for timestamp, packet in c:
- obj = parse_message(packet)
- avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
- print(
- "{} {:+#010.8g},{:+#010.8g}".format(
- datetime.fromtimestamp(timestamp), avlat, avlon
- )
- )
+ return avlat, avlon, 99.9 # TODO estimate accuracy for real