from configparser import ConfigParser, SectionProxy
from contextlib import closing, ExitStack
+from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
from multiprocessing import Process
from os import kill, unlink
socket,
SocketType,
)
+from sys import exit
from tempfile import mkstemp
from time import sleep
from typing import Optional
}
self.conf["opencellid"] = {
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
+ "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
self.conf["lookaside"] = {
"backend": "opencellid",
p.start()
self.children.append((srvname, p))
if httpd:
- pass
+ server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
+
+ def run(server):
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ # TODO: this still leaves unclosed socket in the server
+ server.shutdown()
+
+ p = Process(target=run, args=(server,))
+ p.start()
+ self.children.append(("httpd", p))
sleep(1)
def tearDown(self) -> None:
--- /dev/null
+""" Send junk to the collector """
+
+from sqlite3 import connect
+from time import sleep
+from typing import Any
+import unittest
+from .common import send_and_drain, TestWithServers
+from gps303 import ocid_dload
+
+
+class Ocid_Dload(TestWithServers):
+ def setUp(self, *args: str, **kwargs: Any) -> None:
+ super().setUp(httpd=True)
+
+ def tearDown(self) -> None:
+ sleep(1) # give collector some time
+ super().tearDown()
+
+ def test_ocid_dload(self) -> None:
+ ocid_dload.main(self.conf)
+ with connect(self.conf.get("opencellid", "dbfn")) as db:
+ (count,) = db.execute("select count(*) from cells").fetchone()
+ self.assertEqual(count, 163)
+
+
+if __name__ == "__main__":
+ unittest.main()
from random import Random
from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
-from sqlite3 import connect, Row
+from sqlite3 import connect
from time import sleep
from typing import Any
import unittest
sleep(1)
got = set()
with connect(self.conf.get("storage", "dbfn")) as db:
- db.row_factory = Row
for is_incoming, packet in db.execute(
"select is_incoming, packet from events"
):