]> average.org Git - loctrkd.git/blobdiff - test/common.py
test: complete fuzzer unittest
[loctrkd.git] / test / common.py
diff --git a/test/common.py b/test/common.py
new file mode 100644 (file)
index 0000000..dccecfc
--- /dev/null
@@ -0,0 +1,48 @@
+""" Common housekeeping for tests that rely on daemons """
+
+from configparser import ConfigParser, SectionProxy
+from contextlib import closing
+from importlib import import_module
+from multiprocessing import Process
+from os import kill, unlink
+from signal import SIGINT
+from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from tempfile import mkstemp
+from time import sleep
+from unittest import TestCase
+
+
+class TestWithServers(TestCase):
+    def setUp(self, *args: str) -> None:
+        with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
+            sock.bind(("", 0))
+            sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+            freeport = sock.getsockname()[1]
+        _, self.tmpfilebase = mkstemp()
+        self.conf = ConfigParser()
+        self.conf["collector"] = {
+            "port": str(freeport),
+            "publishurl": "ipc://" + self.tmpfilebase + ".pub",
+            "listenurl": "ipc://" + self.tmpfilebase + ".pul",
+        }
+        self.children = []
+        for srvname in args:
+            if srvname == "collector":
+                kwargs = {"handle_hibernate": False}
+            else:
+                kwargs = {}
+            cls = import_module("gps303." + srvname, package=".")
+            p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
+            p.start()
+            self.children.append((srvname, p))
+        sleep(1)
+
+    def tearDown(self) -> None:
+        sleep(1)
+        for srvname, p in self.children:
+            if p.pid is not None:
+                kill(p.pid, SIGINT)
+            p.join()
+            print(srvname, "terminated with return code", p.exitcode)
+        for sfx in (".pub", ".pul"):
+            unlink(self.tmpfilebase + sfx)