)
from struct import pack
from time import time
-from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
import zmq
from . import common
MAXBUFFER: int = 4096
-pmods: List[ProtoModule] = []
-
-
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
)
return None
if self.stream is None:
- for pmod in pmods:
- if pmod.probe_buffer(segment):
- self.pmod = pmod
- self.stream = pmod.Stream()
- break
+ self.pmod = common.probe_pmod(segment)
+ if self.pmod is not None:
+ self.stream = self.pmod.Stream()
if self.stream is None:
log.info(
"unrecognizable %d bytes of data %s from fd %d",
def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
- global pmods
- pmods = [
- cast(ProtoModule, import_module("." + modnm, __package__))
- for modnm in conf.get("collector", "protocols").split(",")
- ]
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zpub = zctx.socket(zmq.PUB) # type: ignore