from configparser import ConfigParser, SectionProxy
from contextlib import closing, ExitStack
+from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
+from logging import DEBUG, StreamHandler
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
from socket import (
+ AF_INET,
AF_INET6,
+ getaddrinfo,
MSG_DONTWAIT,
SOCK_DGRAM,
+ SOCK_STREAM,
SOL_SOCKET,
SO_REUSEADDR,
socket,
SocketType,
)
+from sys import exit, stderr
+from random import Random
from tempfile import mkstemp
from time import sleep
-from typing import Optional
+from typing import Any, Optional
from unittest import TestCase
+from loctrkd.common import init_protocols
+
NUMPORTS = 3
class TestWithServers(TestCase):
- def setUp(self, *args: str, httpd: bool = False) -> None:
+ def setUp(
+ self, *args: str, httpd: bool = False, verbose: bool = False
+ ) -> None:
freeports = []
with ExitStack() as stack:
for _ in range(NUMPORTS):
freeports.append(sk.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
+ self.conf["common"] = {
+ "protocols": "zx303proto,beesure",
+ }
self.conf["collector"] = {
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
}
self.conf["storage"] = {
"dbfn": self.tmpfilebase + ".storage.sqlite",
+ "events": "yes",
}
self.conf["opencellid"] = {
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
+ "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
- self.conf["lookaside"] = {
- "backend": "opencellid",
+ self.conf["rectifier"] = {
+ "lookaside": "opencellid",
+ "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
}
self.conf["wsgateway"] = {
"port": str(freeports[1]),
}
+ init_protocols(self.conf)
self.children = []
for srvname in args:
if srvname == "collector":
kwargs = {"handle_hibernate": False}
else:
kwargs = {}
- cls = import_module("gps303." + srvname, package=".")
+ cls = import_module("loctrkd." + srvname, package=".")
+ if verbose:
+ cls.log.addHandler(StreamHandler(stderr))
+ cls.log.setLevel(DEBUG)
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))
if httpd:
- pass
+ server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
+
+ def run(server: HTTPServer) -> None:
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ # TODO: this still leaves unclosed socket in the server
+ server.shutdown()
+
+ p = Process(target=run, args=(server,))
+ p.start()
+ self.children.append(("httpd", p))
sleep(1)
def tearDown(self) -> None:
self.assertEqual(
p.exitcode,
0,
- srvname + " terminated with non-zero return code",
+ f"{srvname} terminated with return code {p.exitcode}",
)
for sfx in (
"",
".pub",
+ ".rect.pub",
".pul",
".storage.sqlite",
".opencellid.sqlite",
pass
+class Fuzz(TestWithServers):
+ def setUp(self, *args: str, **kwargs: Any) -> None:
+ super().setUp("collector")
+ self.rnd = Random()
+ for fam, typ, pro, cnm, skadr in getaddrinfo(
+ "127.0.0.1",
+ self.conf.getint("collector", "port"),
+ family=AF_INET,
+ type=SOCK_STREAM,
+ ):
+ break # Just take the first element
+ self.sock = socket(AF_INET, SOCK_STREAM)
+ self.sock.connect(skadr)
+
+ def tearDown(self) -> None:
+ sleep(1) # give collector some time
+ send_and_drain(self.sock, None)
+ self.sock.close()
+ sleep(1) # Let the server close their side
+ super().tearDown()
+
+
def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
if buf is not None:
sock.send(buf)