X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fwsgateway.py;h=c5dcf5a785a03d13e8748bbfb441afec5b21492d;hb=a4a6606a30e8ba743c269083f1922222b0e1e81a;hp=b6d10e84798f89e7b36e322074f9afb115d50489;hpb=9bf81b19b7b790bc2115ac08dc1f3c112aede976;p=loctrkd.git diff --git a/loctrkd/wsgateway.py b/loctrkd/wsgateway.py index b6d10e8..c5dcf5a 100644 --- a/loctrkd/wsgateway.py +++ b/loctrkd/wsgateway.py @@ -22,44 +22,22 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .evstore import initdb, fetch +from .evstore import initdb, fetch, fetchpmod from .protomodule import ProtoModule -from .zmsg import Bcast, topic +from .zmsg import Rept, Resp, rtopic log = getLogger("loctrkd/wsgateway") - htmlfile = None -pmods: List[ProtoModule] = [] -selector: List[Tuple[bool, str]] = [] def backlog(imei: str, numback: int) -> List[Dict[str, Any]]: result = [] - for is_incoming, timestamp, proto, packet in fetch( - imei, - selector, - numback, - ): - for pmod in pmods: - if pmod.proto_handled(proto): - msg = pmod.parse_message(packet, is_incoming=is_incoming) - result.append( - { - "type": "location", - "imei": imei, - "timestamp": str( - datetime.fromtimestamp(timestamp).astimezone( - tz=timezone.utc - ) - ), - "longitude": msg.longitude, - "latitude": msg.latitude, - "accuracy": "gps" - if True # TODO isinstance(msg, GPS_POSITIONING) - else "approximate", - } - ) + for report in fetch(imei, numback): + report["type"] = "location" + timestamp = report.pop("devtime") + report["timestamp"] = timestamp + result.append(report) return result @@ -123,6 +101,9 @@ class Client: self.ready = False self.imeis: Set[str] = set() + def __str__(self) -> str: + return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})" + def close(self) -> None: log.debug("Closing fd %d", self.sock.fileno()) self.sock.close() @@ -231,16 +212,28 @@ class Clients: clnt.close() del self.by_fd[fd] - def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]: + def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]: clnt = self.by_fd[fd] - return clnt.recv() + return (clnt, clnt.recv()) - def send(self, msg: Dict[str, Any]) -> Set[int]: + def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]: towrite = set() - for fd, clnt in self.by_fd.items(): - if clnt.wants(msg["imei"]): + if clnt is None: + for fd, cl in self.by_fd.items(): + if cl.wants(msg["imei"]): + cl.send(msg) + towrite.add(fd) + else: + fd = clnt.sock.fileno() + if self.by_fd.get(fd, None) == clnt: clnt.send(msg) towrite.add(fd) + else: + log.info( + "Trying to send %s to client at %d, not in service", + msg, + fd, + ) return towrite def write(self, towrite: Set[int]) -> Set[int]: @@ -257,22 +250,52 @@ class Clients: return result +def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]: + imei = wsmsg.pop("imei", None) + cmd = wsmsg.pop("type", None) + if imei is None or cmd is None: + log.info("Unhandled message %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": "Did not get imei or cmd", + } + pmod = fetchpmod(imei) + if pmod is None: + log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": "Type of the terminal is unknown", + } + tmsg = common.make_response(pmod, cmd, imei, **wsmsg) + if tmsg is None: + log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": f"{cmd} unimplemented for terminal protocol {pmod}", + } + resp = Resp(imei=imei, when=time(), packet=tmsg.packed) + log.debug("Response: %s", resp) + zpush.send(resp.packed) + return { + "type": "cmdresult", + "imei": imei, + "result": f"{cmd} sent to {imei}", + } + + def runserver(conf: ConfigParser) -> None: - global htmlfile, pmods, selector - pmods = [ - cast(ProtoModule, import_module("." + modnm, __package__)) - for modnm in conf.get("common", "protocols").split(",") - ] - for pmod in pmods: - for proto, is_incoming in pmod.exposed_protos(): - if proto != "ZX:STATUS": # TODO make it better - selector.append((is_incoming, proto)) + global htmlfile initdb(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile", fallback=None) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) + zsub.connect(conf.get("rectifier", "publishurl")) + zpush = zctx.socket(zmq.PUSH) # type: ignore + zpush.connect(conf.get("collector", "listenurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -288,21 +311,13 @@ def runserver(conf: ConfigParser) -> None: towait: Set[int] = set() while True: neededsubs = clients.subs() - for pmod in pmods: - for proto, is_incoming in pmod.exposed_protos(): - for imei in neededsubs - activesubs: - zsub.setsockopt( - zmq.SUBSCRIBE, - topic(proto, is_incoming, imei), - ) - for imei in activesubs - neededsubs: - zsub.setsockopt( - zmq.UNSUBSCRIBE, - topic(proto, is_incoming, imei), - ) + for imei in neededsubs - activesubs: + zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei)) + for imei in activesubs - neededsubs: + zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei)) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) - tosend = [] + tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = [] topoll = [] tostop = [] towrite = set() @@ -311,50 +326,18 @@ def runserver(conf: ConfigParser) -> None: if sk is zsub: while True: try: - zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) - for pmod in pmods: - if pmod.proto_handled(zmsg.proto): - msg = pmod.parse_message( - zmsg.packet, zmsg.is_incoming - ) - log.debug("Got %s with %s", zmsg, msg) - if zmsg.proto == "ZX:STATUS": - tosend.append( - { - "type": "status", - "imei": zmsg.imei, - "timestamp": str( - datetime.fromtimestamp( - zmsg.when - ).astimezone(tz=timezone.utc) - ), - "battery": msg.batt, - } - ) - else: - tosend.append( - { - "type": "location", - "imei": zmsg.imei, - "timestamp": str( - datetime.fromtimestamp( - zmsg.when - ).astimezone(tz=timezone.utc) - ), - "longitude": msg.longitude, - "latitude": msg.latitude, - "accuracy": "gps" - if zmsg.is_incoming - else "approximate", - } - ) + zmsg = Rept(zsub.recv(zmq.NOBLOCK)) + msg = loads(zmsg.payload) + msg["imei"] = zmsg.imei + log.debug("Got %s, sending %s", zmsg, msg) + tosend.append((None, msg)) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) elif fl & zmq.POLLIN: - received = clients.recv(sk) + clnt, received = clients.recv(sk) if received is None: log.debug("Client gone from fd %d", sk) tostop.append(sk) @@ -367,7 +350,14 @@ def runserver(conf: ConfigParser) -> None: imeis = cast(List[str], wsmsg.get("imei")) numback: int = wsmsg.get("backlog", 5) for imei in imeis: - tosend.extend(backlog(imei, numback)) + tosend.extend( + [ + (clnt, msg) + for msg in backlog(imei, numback) + ] + ) + else: + tosend.append((clnt, sendcmd(zpush, wsmsg))) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk) @@ -379,9 +369,9 @@ def runserver(conf: ConfigParser) -> None: for fd in tostop: poller.unregister(fd) # type: ignore clients.stop(fd) - for wsmsg in tosend: - log.debug("Sending to the clients: %s", wsmsg) - towrite |= clients.send(wsmsg) + for towhom, wsmsg in tosend: + log.debug("Sending to the client %s: %s", towhom, wsmsg) + towrite |= clients.send(towhom, wsmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN)