def fetch(
imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, bytes]]:
+) -> List[Tuple[bool, float, str, bytes]]:
# matchlist is a list of tuples (is_incoming, proto)
# returns a list of tuples (is_incoming, timestamp, packet)
assert DB is not None
)
cur = DB.cursor()
cur.execute(
- f"""select is_incoming, tstamp, packet from events
+ f"""select is_incoming, tstamp, proto, packet from events
where ({selector}) and imei = ?
order by tstamp desc limit ?""",
tuple(item for sublist in matchlist for item in sublist)
from configparser import ConfigParser
from datetime import datetime, timezone
+from importlib import import_module
from json import dumps, loads
from logging import getLogger
from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from . import common
from .evstore import initdb, fetch
-from .zx303proto import (
- GPS_POSITIONING,
- STATUS,
- WIFI_POSITIONING,
- parse_message,
- proto_name,
-)
from .zmsg import Bcast, topic
log = getLogger("loctrkd/wsgateway")
+
+
+class ProtoModule:
+ @staticmethod
+ def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
+ ...
+
+ @staticmethod
+ def exposed_protos() -> List[Tuple[str, bool]]:
+ ...
+
+ @staticmethod
+ def proto_handled(proto: str) -> bool:
+ ...
+
+
htmlfile = None
+pmods: List[ProtoModule] = []
+selector: List[Tuple[bool, str]] = []
def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
result = []
- for is_incoming, timestamp, packet in fetch(
+ for is_incoming, timestamp, proto, packet in fetch(
imei,
- [
- (True, proto_name(GPS_POSITIONING)),
- (False, proto_name(WIFI_POSITIONING)),
- ],
+ selector,
numback,
):
- msg = parse_message(packet, is_incoming=is_incoming)
+ for pmod in pmods:
+ if pmod.proto_handled(proto):
+ msg = pmod.parse_message(packet, is_incoming=is_incoming)
result.append(
{
"type": "location",
"longitude": msg.longitude,
"latitude": msg.latitude,
"accuracy": "gps"
- if isinstance(msg, GPS_POSITIONING)
+ if True # TODO isinstance(msg, GPS_POSITIONING)
else "approximate",
}
)
def runserver(conf: ConfigParser) -> None:
- global htmlfile
-
+ global htmlfile, pmods, selector
+ pmods = [
+ cast(ProtoModule, import_module("." + modnm, __package__))
+ for modnm in conf.get("collector", "protocols").split(",")
+ ]
+ for pmod in pmods:
+ for proto, is_incoming in pmod.exposed_protos():
+ if proto != "ZX:STATUS": # TODO make it better
+ selector.append((is_incoming, proto))
initdb(conf.get("storage", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
towait: Set[int] = set()
while True:
neededsubs = clients.subs()
- for imei in neededsubs - activesubs:
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(GPS_POSITIONING), True, imei),
- )
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(WIFI_POSITIONING), False, imei),
- )
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(STATUS), True, imei),
- )
- for imei in activesubs - neededsubs:
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(GPS_POSITIONING), True, imei),
- )
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(WIFI_POSITIONING), False, imei),
- )
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(STATUS), True, imei),
- )
+ for pmod in pmods:
+ for proto, is_incoming in pmod.exposed_protos():
+ for imei in neededsubs - activesubs:
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(proto, is_incoming, imei),
+ )
+ for imei in activesubs - neededsubs:
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(proto, is_incoming, imei),
+ )
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
tosend = []
while True:
try:
zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
- msg = parse_message(zmsg.packet, zmsg.is_incoming)
+ for pmod in pmods:
+ if pmod.proto_handled(zmsg.proto):
+ msg = pmod.parse_message(
+ zmsg.packet, zmsg.is_incoming
+ )
log.debug("Got %s with %s", zmsg, msg)
- if isinstance(msg, STATUS):
+ if zmsg.proto == "ZX:STATUS":
tosend.append(
{
"type": "status",