from configparser import ConfigParser
from datetime import datetime, timezone
from getopt import getopt
+from importlib import import_module
from logging import getLogger
from sys import argv
from time import time
-from typing import List, Tuple
+from typing import Any, cast, List, Tuple, Type, Union
import zmq
from . import common
-from .zx303proto import *
from .zmsg import Bcast, Resp
log = getLogger("loctrkd")
+class ProtoModule:
+ @staticmethod
+ def proto_handled(proto: str) -> bool:
+ ...
+
+ @staticmethod
+ def class_by_prefix(prefix: str) -> Any:
+ ...
+
+
+pmods: List[ProtoModule] = []
+
+
def main(
conf: ConfigParser, opts: List[Tuple[str, str]], args: List[str]
) -> None:
+ global pmods
+ pmods = [
+ cast(ProtoModule, import_module("." + modnm, __package__))
+ for modnm in conf.get("collector", "protocols").split(",")
+ ]
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zpush = zctx.socket(zmq.PUSH) # type: ignore
imei = args[0]
cmd = args[1]
args = args[2:]
- cls = class_by_prefix(cmd)
+ handled = False
+ for pmod in pmods:
+ if pmod.proto_handled(cmd):
+ handled = True
+ break
+ if not handled:
+ raise NotImplementedError(f"No protocol can handle {cmd}")
+ cls = pmod.class_by_prefix(cmd)
if isinstance(cls, list):
raise ValueError("Prefix does not select a single class: " + str(cls))
kwargs = dict([arg.split("=") for arg in args])
return x
+def l3str(x: Union[str, List[str]]) -> List[str]:
+ if isinstance(x, str):
+ lx = x.split(",")
+ else:
+ lx = x
+ if len(lx) != 3 or not all(isinstance(el, str) for el in x):
+ raise ValueError(str(lx) + " is not a list of three strings")
+ return lx
+
+
class MetaPkt(type):
"""
For each class corresponding to a message, automatically create
def out_encode(self) -> str:
# Overridden in subclasses, otherwise command verb only
- return self.PROTO
+ return ""
@property
def packed(self) -> bytes:
- buffer = self.encode().encode()
- return f"[LT*0000000000*{len(buffer):04X}*".encode() + buffer + b"]"
+ data = self.encode()
+ payload = self.PROTO + "," + data if data else self.PROTO
+ return f"[LT*0000000000*{len(payload):04X}*{payload}]".encode()
class UNKNOWN(BeeSurePkt):
RESPOND = Respond.INL
+class CR(BeeSurePkt):
+ PROTO = "CR"
+
+
+class FLOWER(BeeSurePkt):
+ PROTO = "FLOWER"
+ OUT_KWARGS = (("number", int, 1),)
+
+ def out_encode(self) -> str:
+ self.number: int
+ return str(self.number)
+
+
+class POWEROFF(BeeSurePkt):
+ PROTO = "POWEROFF"
+
+
+class RESET(BeeSurePkt):
+ PROTO = "RESET"
+
+
+class SOS(BeeSurePkt):
+ PROTO = "SOS"
+ OUT_KWARGS = (("phonenumbers", l3str, ["", "", ""]),)
+
+ def out_encode(self) -> str:
+ self.phonenumbers: List[str]
+ return ",".join(self.phonenumbers)
+
+
+class _SET_PHONE(BeeSurePkt):
+ OUT_KWARGS = (("phonenumber", str, ""),)
+
+ def out_encode(self) -> str:
+ self.phonenumber: str
+ return self.phonenumber
+
+
+class SOS1(_SET_PHONE):
+ PROTO = "SOS1"
+
+
+class SOS2(_SET_PHONE):
+ PROTO = "SOS2"
+
+
+class SOS3(_SET_PHONE):
+ PROTO = "SOS3"
+
+
# Build dicts protocol number -> class and class name -> protocol number
CLASSES = {}
PROTOS = {}
def class_by_prefix(
prefix: str,
) -> Union[Type[BeeSurePkt], List[Tuple[str, str]]]:
+ if prefix.startswith(PROTO_PREFIX):
+ pname = prefix[len(PROTO_PREFIX) :].upper()
+ else:
+ raise KeyError(pname)
lst = [
(name, proto)
for name, proto in PROTOS.items()
- if name.upper().startswith(prefix.upper())
+ if name.upper().startswith(pname)
]
- if len(lst) != 1:
- return lst
- _, proto = lst[0]
- return CLASSES[proto]
+ for _, proto in lst:
+ if len(lst) == 1: # unique prefix match
+ return CLASSES[proto]
+ if proto == pname: # exact match
+ return CLASSES[proto]
+ return lst
def proto_handled(proto: str) -> bool: