from contextlib import closing, ExitStack
from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
from contextlib import closing, ExitStack
from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
from tempfile import mkstemp
from time import sleep
from typing import Optional
from unittest import TestCase
from tempfile import mkstemp
from time import sleep
from typing import Optional
from unittest import TestCase
- def setUp(self, *args: str, httpd: bool = False) -> None:
+ def setUp(
+ self, *args: str, httpd: bool = False, verbose: bool = False
+ ) -> None:
freeports.append(sk.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
freeports.append(sk.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
self.conf["collector"] = {
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
self.conf["collector"] = {
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
"downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
"downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
- self.conf["lookaside"] = {
- "backend": "opencellid",
+ self.conf["rectifier"] = {
+ "lookaside": "opencellid",
+ "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
}
self.conf["wsgateway"] = {
"port": str(freeports[1]),
}
}
self.conf["wsgateway"] = {
"port": str(freeports[1]),
}
- cls = import_module("gps303." + srvname, package=".")
+ cls = import_module("loctrkd." + srvname, package=".")
+ if verbose:
+ cls.log.addHandler(StreamHandler(stderr))
+ cls.log.setLevel(DEBUG)
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))