gclient = gmaps.Client(key=token)
+def shut() -> None:
+ return
+
+
def lookup(
mcc: int,
mnc: int,
# Default response for ACK, can also respond with STOP_UPLOAD
def in_decode(self, length: int, payload: bytes) -> None:
- self.imei = payload[:16].hex()
- self.ver = payload[16]
+ self.imei = payload[:8].hex()
+ self.ver = payload[8]
class SUPERVISION(GPS303Pkt):
log.warning("Lookup for %s resulted in %s", msg, e)
except KeyboardInterrupt:
- pass
+ zsub.close()
+ zpush.close()
+ zctx.destroy() # type: ignore
+ qry.shut()
if __name__.endswith("__main__"):
ldb = connect(conf["opencellid"]["dbfn"])
+def shut() -> None:
+ if ldb is not None:
+ ldb.close()
+
+
def lookup(
mcc: int, mnc: int, gsm_cells: List[Tuple[int, int, int]], __: Any
) -> Tuple[float, float]:
packet=zmsg.packet,
)
except KeyboardInterrupt:
- pass
+ zsub.close()
+ zctx.destroy() # type: ignore
if __name__.endswith("__main__"):
zpush.send(resp.packed)
except KeyboardInterrupt:
- pass
+ zsub.close()
+ zpush.close()
+ zctx.destroy() # type: ignore
if __name__.endswith("__main__"):
global htmlfile
initdb(conf.get("storage", "dbfn"))
- htmlfile = conf.get("wsgateway", "htmlfile")
+ htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zsub = zctx.socket(zmq.SUB) # type: ignore
towait &= trywrite
towait |= morewait
except KeyboardInterrupt:
- pass
+ zsub.close()
+ zctx.destroy() # type: ignore
+ tcpl.close()
if __name__.endswith("__main__"):
class TestWithServers(TestCase):
def setUp(self, *args: str) -> None:
- with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
- sock.bind(("", 0))
- sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
- freeport = sock.getsockname()[1]
+ with closing(socket(AF_INET6, SOCK_DGRAM)) as sock1, closing(
+ socket(AF_INET6, SOCK_DGRAM)
+ ) as sock2:
+ freeports = []
+ for sock in sock1, sock2:
+ sock.bind(("", 0))
+ sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ freeports.append(sock.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
self.conf["collector"] = {
- "port": str(freeport),
+ "port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
"listenurl": "ipc://" + self.tmpfilebase + ".pul",
}
+ self.conf["storage"] = {
+ "dbfn": self.tmpfilebase + ".storage.sqlite",
+ }
+ self.conf["opencellid"] = {
+ "dbfn": self.tmpfilebase + ".opencellid.sqlite",
+ }
+ self.conf["lookaside"] = {
+ "backend": "opencellid",
+ }
+ self.conf["wsgateway"] = {
+ "port": str(freeports[1]),
+ }
self.children = []
for srvname in args:
if srvname == "collector":
0,
srvname + " terminated with non-zero return code",
)
- for sfx in (".pub", ".pul"):
- unlink(self.tmpfilebase + sfx)
+ for sfx in (
+ "",
+ ".pub",
+ ".pul",
+ ".storage.sqlite",
+ ".opencellid.sqlite",
+ ):
+ try:
+ unlink(self.tmpfilebase + sfx)
+ except OSError:
+ pass
def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
--- /dev/null
+""" Send junk to the collector """
+
+from random import Random
+from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
+from sqlite3 import connect
+from time import sleep
+import unittest
+from .common import send_and_drain, TestWithServers
+
+
+class Storage(TestWithServers):
+ def setUp(self, *args: str) -> None:
+ super().setUp("collector", "storage", "lookaside", "termconfig")
+ for fam, typ, pro, cnm, skadr in getaddrinfo(
+ "::1",
+ self.conf.getint("collector", "port"),
+ family=AF_INET6,
+ type=SOCK_STREAM,
+ ):
+ break # Just take the first element
+ self.sock = socket(AF_INET6, SOCK_STREAM)
+ self.sock.connect(skadr)
+
+ def tearDown(self) -> None:
+ sleep(1) # give collector some time
+ send_and_drain(self.sock, None)
+ self.sock.close()
+ super().tearDown()
+
+ def test_storage(self) -> None:
+ buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
+ send_and_drain(self.sock, buf)
+ # TODO: make a proper sequence
+ with connect(self.conf.get("storage", "dbfn")) as db:
+ c = db.cursor()
+ c.execute("select * from events")
+ events = [dict(row) for row in c]
+ print(events)
+
+
+if __name__ == "__main__":
+ unittest.main()