-from configparser import NoOptionError
+from configparser import ConfigParser, NoOptionError
import csv
from logging import getLogger
import requests
from sqlite3 import connect
-from zlib import decompressobj, MAX_WBITS
+from typing import IO, Optional
+from zlib import decompressobj, MAX_WBITS, _Decompress
from . import common
and yelds them as strings.
"""
- def __init__(self, zstream):
+ def __init__(self, zstream: IO[bytes]) -> None:
self.zstream = zstream
- self.decoder = decompressobj(16 + MAX_WBITS)
+ self.decoder: Optional[_Decompress] = decompressobj(16 + MAX_WBITS)
self.outdata = b""
self.line = b""
- def read(self, n=None):
+ def read(self, n: int = 1024) -> bytes:
if self.decoder is None:
return b""
while len(self.outdata) < n:
return data
return b""
- def __next__(self):
+ def __next__(self) -> str:
while True:
splittry = self.line.split(b"\n", maxsplit=1)
if len(splittry) > 1:
self.line = rest
return line.decode("utf-8")
- def __iter__(self):
+ def __iter__(self) -> "unzipped":
return self
-def main(conf):
+def main(conf: ConfigParser) -> None:
try:
url = conf.get("opencellid", "downloadurl")
mcc = "<unspecified>"