1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing
5 from importlib import import_module
6 from multiprocessing import Process
7 from os import kill, unlink
8 from signal import SIGINT
18 from tempfile import mkstemp
19 from time import sleep
20 from typing import Optional
21 from unittest import TestCase
24 class TestWithServers(TestCase):
25 def setUp(self, *args: str) -> None:
26 with closing(socket(AF_INET6, SOCK_DGRAM)) as sock1, closing(
27 socket(AF_INET6, SOCK_DGRAM)
30 for sock in sock1, sock2:
32 sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
33 freeports.append(sock.getsockname()[1])
34 _, self.tmpfilebase = mkstemp()
35 self.conf = ConfigParser()
36 self.conf["collector"] = {
37 "port": str(freeports[0]),
38 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
39 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
41 self.conf["storage"] = {
42 "dbfn": self.tmpfilebase + ".storage.sqlite",
44 self.conf["opencellid"] = {
45 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
47 self.conf["lookaside"] = {
48 "backend": "opencellid",
50 self.conf["wsgateway"] = {
51 "port": str(freeports[1]),
55 if srvname == "collector":
56 kwargs = {"handle_hibernate": False}
59 cls = import_module("gps303." + srvname, package=".")
60 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
62 self.children.append((srvname, p))
65 def tearDown(self) -> None:
66 for srvname, p in self.children:
73 srvname + " terminated with non-zero return code",
83 unlink(self.tmpfilebase + sfx)
88 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
92 sock.recv(4096, MSG_DONTWAIT)
93 except BlockingIOError: