)
from struct import pack
from time import time
-from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
import zmq
from . import common
+from .protomodule import ProtoModule
from .zmsg import Bcast, Resp
log = getLogger("loctrkd/collector")
MAXBUFFER: int = 4096
-class ProtoModule:
- class Stream:
- def recv(self, segment: bytes) -> List[Union[bytes, str]]:
- ...
-
- def close(self) -> bytes:
- ...
-
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- ...
-
- @staticmethod
- def probe_buffer(buffer: bytes) -> bool:
- ...
-
- @staticmethod
- def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
- ...
-
- @staticmethod
- def inline_response(packet: bytes) -> Optional[bytes]:
- ...
-
- @staticmethod
- def is_goodbye_packet(packet: bytes) -> bool:
- ...
-
- @staticmethod
- def imei_from_packet(packet: bytes) -> Optional[str]:
- ...
-
- @staticmethod
- def proto_of_message(packet: bytes) -> str:
- ...
-
-
-pmods: List[ProtoModule] = []
-
-
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
)
return None
if self.stream is None:
- for pmod in pmods:
- if pmod.probe_buffer(segment):
- self.pmod = pmod
- self.stream = pmod.Stream()
- break
+ self.pmod = common.probe_pmod(segment)
+ if self.pmod is not None:
+ self.stream = self.pmod.Stream()
if self.stream is None:
log.info(
"unrecognizable %d bytes of data %s from fd %d",
def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
- global pmods
- pmods = [
- cast(ProtoModule, import_module("." + modnm, __package__))
- for modnm in conf.get("collector", "protocols").split(",")
- ]
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zpub = zctx.socket(zmq.PUB) # type: ignore