1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
23 from sys import exit, stderr
24 from random import Random
25 from tempfile import mkstemp
26 from time import sleep
27 from typing import Any, Optional
28 from unittest import TestCase
30 from loctrkd.common import init_protocols
35 class TestWithServers(TestCase):
37 self, *args: str, httpd: bool = False, verbose: bool = False
40 with ExitStack() as stack:
41 for _ in range(NUMPORTS):
42 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
44 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
45 freeports.append(sk.getsockname()[1])
46 _, self.tmpfilebase = mkstemp()
47 self.conf = ConfigParser()
48 self.conf["common"] = {
49 "protocols": "zx303proto,beesure",
51 self.conf["collector"] = {
52 "port": str(freeports[0]),
53 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
54 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
56 self.conf["storage"] = {
57 "dbfn": self.tmpfilebase + ".storage.sqlite",
60 self.conf["opencellid"] = {
61 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
62 "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
64 self.conf["rectifier"] = {
65 "lookaside": "opencellid",
66 "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
68 self.conf["wsgateway"] = {
69 "port": str(freeports[1]),
71 init_protocols(self.conf)
74 if srvname == "collector":
75 kwargs = {"handle_hibernate": False}
78 cls = import_module("loctrkd." + srvname, package=".")
80 cls.log.addHandler(StreamHandler(stderr))
81 cls.log.setLevel(DEBUG)
82 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
84 self.children.append((srvname, p))
86 server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
88 def run(server: HTTPServer) -> None:
90 server.serve_forever()
91 except KeyboardInterrupt:
92 # TODO: this still leaves unclosed socket in the server
95 p = Process(target=run, args=(server,))
97 self.children.append(("httpd", p))
100 def tearDown(self) -> None:
101 for srvname, p in self.children:
102 if p.pid is not None:
108 f"{srvname} terminated with return code {p.exitcode}",
116 ".opencellid.sqlite",
119 unlink(self.tmpfilebase + sfx)
124 class Fuzz(TestWithServers):
125 def setUp(self, *args: str, **kwargs: Any) -> None:
126 super().setUp("collector")
128 for fam, typ, pro, cnm, skadr in getaddrinfo(
130 self.conf.getint("collector", "port"),
134 break # Just take the first element
135 self.sock = socket(AF_INET, SOCK_STREAM)
136 self.sock.connect(skadr)
138 def tearDown(self) -> None:
139 sleep(1) # give collector some time
140 send_and_drain(self.sock, None)
142 sleep(1) # Let the server close their side
146 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
150 sock.recv(4096, MSG_DONTWAIT)
151 except BlockingIOError: