1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
20 from sys import exit, stderr
21 from tempfile import mkstemp
22 from time import sleep
23 from typing import Optional
24 from unittest import TestCase
29 class TestWithServers(TestCase):
31 self, *args: str, httpd: bool = False, verbose: bool = False
34 with ExitStack() as stack:
35 for _ in range(NUMPORTS):
36 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
38 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
39 freeports.append(sk.getsockname()[1])
40 _, self.tmpfilebase = mkstemp()
41 self.conf = ConfigParser()
42 self.conf["collector"] = {
43 "port": str(freeports[0]),
44 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
45 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
46 "protocols": "zx303proto",
48 self.conf["storage"] = {
49 "dbfn": self.tmpfilebase + ".storage.sqlite",
51 self.conf["opencellid"] = {
52 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
53 "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
55 self.conf["lookaside"] = {
56 "backend": "opencellid",
58 self.conf["wsgateway"] = {
59 "port": str(freeports[1]),
63 if srvname == "collector":
64 kwargs = {"handle_hibernate": False}
67 cls = import_module("loctrkd." + srvname, package=".")
69 cls.log.addHandler(StreamHandler(stderr))
70 cls.log.setLevel(DEBUG)
71 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
73 self.children.append((srvname, p))
75 server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
77 def run(server: HTTPServer) -> None:
79 server.serve_forever()
80 except KeyboardInterrupt:
81 # TODO: this still leaves unclosed socket in the server
84 p = Process(target=run, args=(server,))
86 self.children.append(("httpd", p))
89 def tearDown(self) -> None:
90 for srvname, p in self.children:
97 f"{srvname} terminated with return code {p.exitcode}",
104 ".opencellid.sqlite",
107 unlink(self.tmpfilebase + sfx)
112 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
116 sock.recv(4096, MSG_DONTWAIT)
117 except BlockingIOError: