1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from importlib import import_module
6 from multiprocessing import Process
7 from os import kill, unlink
8 from signal import SIGINT
18 from tempfile import mkstemp
19 from time import sleep
20 from typing import Optional
21 from unittest import TestCase
26 class TestWithServers(TestCase):
27 def setUp(self, *args: str, httpd: bool = False) -> None:
29 with ExitStack() as stack:
30 for _ in range(NUMPORTS):
31 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
33 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
34 freeports.append(sk.getsockname()[1])
35 _, self.tmpfilebase = mkstemp()
36 self.conf = ConfigParser()
37 self.conf["collector"] = {
38 "port": str(freeports[0]),
39 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
40 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
42 self.conf["storage"] = {
43 "dbfn": self.tmpfilebase + ".storage.sqlite",
45 self.conf["opencellid"] = {
46 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
48 self.conf["lookaside"] = {
49 "backend": "opencellid",
51 self.conf["wsgateway"] = {
52 "port": str(freeports[1]),
56 if srvname == "collector":
57 kwargs = {"handle_hibernate": False}
60 cls = import_module("gps303." + srvname, package=".")
61 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
63 self.children.append((srvname, p))
68 def tearDown(self) -> None:
69 for srvname, p in self.children:
76 srvname + " terminated with non-zero return code",
86 unlink(self.tmpfilebase + sfx)
91 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
95 sock.recv(4096, MSG_DONTWAIT)
96 except BlockingIOError: