from typing import Any, cast, List, Tuple
from . import common
+from .protomodule import ProtoModule
log = getLogger("loctrkd/qry")
-class ProtoModule:
- @staticmethod
- def proto_handled(proto: str) -> bool:
- ...
-
- @staticmethod
- def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
- ...
-
-
pmods: List[ProtoModule] = []
global pmods
pmods = [
cast(ProtoModule, import_module("." + modnm, __package__))
- for modnm in conf.get("collector", "protocols").split(",")
+ for modnm in conf.get("common", "protocols").split(",")
]
db = connect(conf.get("storage", "dbfn"))
c = db.cursor()
else:
proto = ""
selector = ""
+ dopts = dict(opts)
+ if len(args) > 1 and "-o" in dopts:
+ attr = args[1]
+ fn = dopts["-o"]
+ else:
+ attr = ""
+ fn = ""
c.execute(
"""select tstamp, imei, peeraddr, is_incoming, proto, packet
peeraddr,
msg,
)
+ if fn and hasattr(msg, attr):
+ with open(fn, "wb") as fl: # TODO support multiple files
+ fl.write(getattr(msg, attr))
if __name__.endswith("__main__"):
- opts, args = getopt(argv[1:], "c:d")
+ opts, args = getopt(argv[1:], "o:c:d")
main(common.init(log, opts=opts), opts, args)