1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from multiprocessing import Process
8 from os import kill, unlink
9 from signal import SIGINT
20 from tempfile import mkstemp
21 from time import sleep
22 from typing import Optional
23 from unittest import TestCase
28 class TestWithServers(TestCase):
29 def setUp(self, *args: str, httpd: bool = False) -> None:
31 with ExitStack() as stack:
32 for _ in range(NUMPORTS):
33 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
35 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
36 freeports.append(sk.getsockname()[1])
37 _, self.tmpfilebase = mkstemp()
38 self.conf = ConfigParser()
39 self.conf["collector"] = {
40 "port": str(freeports[0]),
41 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
42 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
44 self.conf["storage"] = {
45 "dbfn": self.tmpfilebase + ".storage.sqlite",
47 self.conf["opencellid"] = {
48 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
49 "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
51 self.conf["lookaside"] = {
52 "backend": "opencellid",
54 self.conf["wsgateway"] = {
55 "port": str(freeports[1]),
59 if srvname == "collector":
60 kwargs = {"handle_hibernate": False}
63 cls = import_module("gps303." + srvname, package=".")
64 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
66 self.children.append((srvname, p))
68 server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
70 def run(server: HTTPServer) -> None:
72 server.serve_forever()
73 except KeyboardInterrupt:
74 # TODO: this still leaves unclosed socket in the server
77 p = Process(target=run, args=(server,))
79 self.children.append(("httpd", p))
82 def tearDown(self) -> None:
83 for srvname, p in self.children:
90 srvname + " terminated with non-zero return code",
100 unlink(self.tmpfilebase + sfx)
105 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
109 sock.recv(4096, MSG_DONTWAIT)
110 except BlockingIOError: