from configparser import ConfigParser, SectionProxy
from contextlib import closing, ExitStack
+from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
+from logging import DEBUG, StreamHandler
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
socket,
SocketType,
)
+from sys import exit, stderr
from tempfile import mkstemp
from time import sleep
from typing import Optional
class TestWithServers(TestCase):
- def setUp(self, *args: str, httpd: bool = False) -> None:
+ def setUp(
+ self, *args: str, httpd: bool = False, verbose: bool = False
+ ) -> None:
freeports = []
with ExitStack() as stack:
for _ in range(NUMPORTS):
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
"listenurl": "ipc://" + self.tmpfilebase + ".pul",
+ "protocols": "zx303proto",
}
self.conf["storage"] = {
"dbfn": self.tmpfilebase + ".storage.sqlite",
}
self.conf["opencellid"] = {
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
+ "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
self.conf["lookaside"] = {
"backend": "opencellid",
kwargs = {"handle_hibernate": False}
else:
kwargs = {}
- cls = import_module("gps303." + srvname, package=".")
+ cls = import_module("loctrkd." + srvname, package=".")
+ if verbose:
+ cls.log.addHandler(StreamHandler(stderr))
+ cls.log.setLevel(DEBUG)
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))
if httpd:
- pass
+ server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
+
+ def run(server: HTTPServer) -> None:
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ # TODO: this still leaves unclosed socket in the server
+ server.shutdown()
+
+ p = Process(target=run, args=(server,))
+ p.start()
+ self.children.append(("httpd", p))
sleep(1)
def tearDown(self) -> None:
self.assertEqual(
p.exitcode,
0,
- srvname + " terminated with non-zero return code",
+ f"{srvname} terminated with return code {p.exitcode}",
)
for sfx in (
"",