-def readconfig(fname: str) -> ConfigParser:
- config = ConfigParser()
- config["collector"] = {
- "port": str(PORT),
- }
- config["storage"] = {
- "dbfn": DBFN,
- }
- config["termconfig"] = {}
- config.read(fname)
- return config
-
-
-def normconf(section: SectionProxy) -> Dict[str, Any]:
- result: Dict[str, Any] = {}
- for key, val in section.items():
- vals = val.split("\n")
- if len(vals) > 1 and vals[0] == "":
- vals = vals[1:]
- lst: List[Union[str, int]] = []
- for el in vals:
- try:
- lst.append(int(el, 0))
- except ValueError:
- if el[0] == '"' and el[-1] == '"':
- el = el.strip('"').rstrip('"')
- lst.append(el)
- if not (
- all([isinstance(x, int) for x in lst])
- or all([isinstance(x, str) for x in lst])
- ):
- raise ValueError(
- "Values of %s - %s are of different type", key, vals
+def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
+ for pmod in pmods:
+ if pmod.probe_buffer(segment):
+ return pmod
+ return None
+
+
+def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
+ for pmod in pmods:
+ if pmod.proto_handled(proto):
+ return pmod
+ return None
+
+
+def pmod_by_name(pmodname: str) -> Optional[ProtoModule]:
+ for pmod in pmods:
+ if pmod.PMODNAME == pmodname:
+ return pmod
+ return None
+
+
+def make_response(
+ pmodname: str, cmd: str, imei: str, **kwargs: Any
+) -> Optional[ProtoClass.Out]:
+ pmod = pmod_by_name(pmodname)
+ if pmod is None:
+ return None
+ return pmod.make_response(cmd, imei, **kwargs)
+
+
+def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
+ pmod = pmod_for_proto(proto)
+ return pmod.parse_message(packet, is_incoming) if pmod else None
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+ return [item for pmod in pmods for item in pmod.exposed_protos()]
+
+
+class Report:
+ TYPE: str
+
+ def __repr__(self) -> str:
+ return (
+ self.__class__.__name__
+ + "("
+ + ", ".join(
+ [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]