dbfn = /var/lib/gps303/gps303.sqlite
[lookaside]
-# "opencellid" is the only implemented backend for the time being
+# "opencellid" and "googlemaps" can be here. Both require an access token,
+# though googlemaps is only online, while opencellid backend looks up a
+# local database, that can be updated once a week or once a month.
backend = opencellid
[opencellid]
--- /dev/null
+import googlemaps as gmaps
+from sqlite3 import connect
+
+gclient = None
+
+
+def init(conf):
+ global gclient
+ with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl:
+ token = fl.read().rstrip()
+ gclient = gmaps.Client(key=token)
+
+
+def lookup(mcc, mnc, gsm_cells, wifi_aps):
+ kwargs = {
+ "home_mobile_country_code": mcc,
+ "home_mobile_network_code": mnc,
+ "radio_type": "gsm",
+ "carrier": "O2",
+ "consider_ip": False,
+ "cell_towers": [
+ {
+ "locationAreaCode": loc,
+ "cellId": cellid,
+ "signalStrength": sig,
+ }
+ for loc, cellid, sig in gsm_cells
+ ],
+ "wifi_access_points": [
+ {"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps
+ ],
+ }
+ result = gclient.geolocate(**kwargs)
+ if "location" in result:
+ return result["location"]["lat"], result["location"]["lng"]
+ else:
+ raise ValueError("google geolocation: " + str(result))
+
+
+if __name__.endswith("__main__"):
+ from datetime import datetime, timezone
+ import sys
+ from .gps303proto import *
+
+ db = connect(sys.argv[1])
+ c = db.cursor()
+ c.execute(
+ """select tstamp, packet from events
+ where proto in (?, ?)""",
+ (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
+ )
+ init({"googlemaps": {"accesstoken": sys.argv[2]}})
+ count = 0
+ for timestamp, packet in c:
+ obj = parse_message(packet)
+ print(obj)
+ avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
+ print(
+ "{} {:+#010.8g},{:+#010.8g}".format(
+ datetime.fromtimestamp(timestamp), avlat, avlon
+ )
+ )
+ count += 1
+ if count > 10:
+ break
def runserver(conf):
- if conf.get("lookaside", "backend") == "opencellid":
- qry = import_module(".opencellid", __package__)
- else:
- raise NotImplementedError(
- "Lookaside only implements opencellid backend"
- )
+ qry = import_module("." + conf.get("lookaside", "backend"), __package__)
qry.init(conf)
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
msg,
)
- lat, lon = qry.lookup(msg.mcc, msg.gsm_cells, msg.wifi_aps)
- resp = Resp(
- imei=zmsg.imei,
- when=zmsg.when, # not the current time, but the original!
- packet=msg.Out(latitude=lat, longitude=lon).packed,
- )
- log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
- zpush.send(resp.packed)
+ try:
+ lat, lon = qry.lookup(msg.mcc, msg.mnc, msg.gsm_cells, msg.wifi_aps)
+ resp = Resp(
+ imei=zmsg.imei,
+ when=zmsg.when, # not the current time, but the original!
+ packet=msg.Out(latitude=lat, longitude=lon).packed,
+ )
+ log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
+ zpush.send(resp.packed)
+ except Exception as e:
+ log.warning("Lookup for %s resulted in %s", msg, e)
except KeyboardInterrupt:
pass
for tstamp, packet in c:
msg = parse_message(packet)
if isinstance(msg, (WIFI_POSITIONING, WIFI_OFFLINE_POSITIONING)):
- lat, lon = ocid.lookup(msg.mcc, msg.gsm_cells, msg.wifi_aps)
+ lat, lon = ocid.lookup(msg.mcc, msg.mnc, msg.gsm_cells, msg.wifi_aps)
if lat is None or lon is None:
continue
elif isinstance(msg, (GPS_POSITIONING, GPS_OFFLINE_POSITIONING)):
ldb = connect(conf["opencellid"]["dbfn"])
-def lookup(mcc, gsm_cells, _):
+def lookup(mcc, mnc, gsm_cells, __):
lc = ldb.cursor()
lc.execute("""attach database ":memory:" as mem""")
lc.execute("create table mem.seen (locac int, cellid int, signal int)")
init({"opencellid": {"dbfn": sys.argv[2]}})
for timestamp, packet in c:
obj = parse_message(packet)
- avlat, avlon = lookup(obj.mcc, obj.gsm_cells, obj.wifi_aps)
+ avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
print(
"{} {:+#010.8g},{:+#010.8g}".format(
datetime.fromtimestamp(timestamp), avlat, avlon