1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
20 from sys import exit, stderr
21 from tempfile import mkstemp
22 from time import sleep
23 from typing import Optional
24 from unittest import TestCase
26 from loctrkd.common import init_protocols
31 class TestWithServers(TestCase):
33 self, *args: str, httpd: bool = False, verbose: bool = False
36 with ExitStack() as stack:
37 for _ in range(NUMPORTS):
38 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
40 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
41 freeports.append(sk.getsockname()[1])
42 _, self.tmpfilebase = mkstemp()
43 self.conf = ConfigParser()
44 self.conf["common"] = {
45 "protocols": "zx303proto",
47 self.conf["collector"] = {
48 "port": str(freeports[0]),
49 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
50 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
52 self.conf["storage"] = {
53 "dbfn": self.tmpfilebase + ".storage.sqlite",
56 self.conf["opencellid"] = {
57 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
58 "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
60 self.conf["rectifier"] = {
61 "lookaside": "opencellid",
62 "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
64 self.conf["wsgateway"] = {
65 "port": str(freeports[1]),
67 init_protocols(self.conf)
70 if srvname == "collector":
71 kwargs = {"handle_hibernate": False}
74 cls = import_module("loctrkd." + srvname, package=".")
76 cls.log.addHandler(StreamHandler(stderr))
77 cls.log.setLevel(DEBUG)
78 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
80 self.children.append((srvname, p))
82 server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
84 def run(server: HTTPServer) -> None:
86 server.serve_forever()
87 except KeyboardInterrupt:
88 # TODO: this still leaves unclosed socket in the server
91 p = Process(target=run, args=(server,))
93 self.children.append(("httpd", p))
96 def tearDown(self) -> None:
97 for srvname, p in self.children:
104 f"{srvname} terminated with return code {p.exitcode}",
112 ".opencellid.sqlite",
115 unlink(self.tmpfilebase + sfx)
120 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
124 sock.recv(4096, MSG_DONTWAIT)
125 except BlockingIOError: