+from configparser import ConfigParser, NoOptionError
import csv
from logging import getLogger
import requests
from sqlite3 import connect
-from zlib import decompressobj, MAX_WBITS
+from typing import IO, Optional
+from zlib import decompressobj, MAX_WBITS, _Decompress
from . import common
RURL = (
"https://opencellid.org/ocid/downloads"
- "?token={token}&type={type}&file={mcc}.csv.gz"
+ "?token={token}&type={dltype}&file={fname}.csv.gz"
)
SCHEMA = """create table if not exists cells (
and yelds them as strings.
"""
- def __init__(self, zstream):
+ def __init__(self, zstream: IO[bytes]) -> None:
self.zstream = zstream
- self.decoder = decompressobj(16 + MAX_WBITS)
+ self.decoder: Optional[_Decompress] = decompressobj(16 + MAX_WBITS)
self.outdata = b""
self.line = b""
- def read(self, n=None):
+ def read(self, n: int = 1024) -> bytes:
if self.decoder is None:
return b""
while len(self.outdata) < n:
return data
return b""
- def __next__(self):
+ def __next__(self) -> str:
while True:
splittry = self.line.split(b"\n", maxsplit=1)
if len(splittry) > 1:
self.line = rest
return line.decode("utf-8")
- def __iter__(self):
+ def __iter__(self) -> "unzipped":
return self
-def main(conf):
+def main(conf: ConfigParser) -> None:
try:
- with open(
- conf.get("opencellid", "downloadtoken"), encoding="ascii"
- ) as fl:
- token = fl.read().strip()
- except FileNotFoundError:
- log.warning("Opencellid access token not configured, cannot download")
- return
-
- mcc = conf.get("opencellid", "downloadmcc")
- url = RURL.format(token=token, type="mcc", mcc=mcc)
- # url = "http://localhost:8000/262.csv.gz" # TESTING
+ url = conf.get("opencellid", "downloadurl")
+ mcc = "<unspecified>"
+ except NoOptionError:
+ try:
+ with open(
+ conf.get("opencellid", "downloadtoken"), encoding="ascii"
+ ) as fl:
+ token = fl.read().strip()
+ except FileNotFoundError:
+ log.warning(
+ "Opencellid access token not configured, cannot download"
+ )
+ return
+ mcc = conf.get("opencellid", "downloadmcc")
+ if mcc == "full":
+ dltype = "full"
+ fname = "cell_towers"
+ else:
+ dltype = "mcc"
+ fname = mcc
+ url = RURL.format(token=token, dltype="mcc", fname=mcc)
dbfn = conf.get("opencellid", "dbfn")
count = 0
with requests.get(url, stream=True) as resp, connect(dbfn) as db:
row,
)
count += 1
- db.execute(DBINDEX)
- log.info("repopulated %s with %d records for MCC %s", dbfn, count, mcc)
+ if count < 1:
+ db.rollback()
+ log.warning("Did not get any data for MCC %s, rollback", mcc)
+ else:
+ db.execute(DBINDEX)
+ db.commit()
+ log.info(
+ "repopulated %s with %d records for MCC %s", dbfn, count, mcc
+ )
if __name__.endswith("__main__"):