--- /dev/null
+""" Common housekeeping for tests that rely on daemons """
+
+from configparser import ConfigParser, SectionProxy
+from contextlib import closing
+from importlib import import_module
+from multiprocessing import Process
+from os import kill, unlink
+from signal import SIGINT
+from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from tempfile import mkstemp
+from time import sleep
+from unittest import TestCase
+
+
+class TestWithServers(TestCase):
+ def setUp(self, *args: str) -> None:
+ with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
+ sock.bind(("", 0))
+ sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ freeport = sock.getsockname()[1]
+ _, self.tmpfilebase = mkstemp()
+ self.conf = ConfigParser()
+ self.conf["collector"] = {
+ "port": str(freeport),
+ "publishurl": "ipc://" + self.tmpfilebase + ".pub",
+ "listenurl": "ipc://" + self.tmpfilebase + ".pul",
+ }
+ self.children = []
+ for srvname in args:
+ if srvname == "collector":
+ kwargs = {"handle_hibernate": False}
+ else:
+ kwargs = {}
+ cls = import_module("gps303." + srvname, package=".")
+ p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
+ p.start()
+ self.children.append((srvname, p))
+ sleep(1)
+
+ def tearDown(self) -> None:
+ sleep(1)
+ for srvname, p in self.children:
+ if p.pid is not None:
+ kill(p.pid, SIGINT)
+ p.join()
+ print(srvname, "terminated with return code", p.exitcode)
+ for sfx in (".pub", ".pul"):
+ unlink(self.tmpfilebase + sfx)
""" Send junk to the collector """
from random import Random
-from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
-from unittest import TestCase
+from socket import getaddrinfo, socket, AF_INET6, MSG_DONTWAIT, SOCK_STREAM
+import unittest
+from .common import TestWithServers
REPEAT: int = 1000000
-class Fuzz(TestCase):
- def setUp(self) -> None:
+class Fuzz(TestWithServers):
+ def setUp(self, *args: str) -> None:
+ super().setUp("collector")
self.rnd = Random()
for fam, typ, pro, cnm, skadr in getaddrinfo(
"::1",
- 4303,
+ self.conf.getint("collector", "port"),
family=AF_INET6,
type=SOCK_STREAM,
):
def tearDown(self) -> None:
self.sock.close()
+ print("finished fuzzing")
+ super().tearDown()
def test_fuzz(self) -> None:
for _ in range(REPEAT):
size = self.rnd.randint(1, 5000)
buf = self.rnd.randbytes(size)
self.sock.send(buf)
+ try:
+ self.sock.recv(4096, MSG_DONTWAIT)
+ except BlockingIOError:
+ pass
+
+
+if __name__ == "__main__":
+ unittest.main()