from configparser import ConfigParser
from datetime import datetime, timezone
+from importlib import import_module
from json import dumps, loads
from logging import getLogger
from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
import zmq
from . import common
-from .evstore import initdb, fetch
-from .zx303proto import (
- GPS_POSITIONING,
- STATUS,
- WIFI_POSITIONING,
- parse_message,
- proto_name,
-)
-from .zmsg import Bcast, topic
+from .evstore import initdb, fetch, fetchpmod
+from .protomodule import ProtoModule
+from .zmsg import Rept, Resp, rtopic
log = getLogger("loctrkd/wsgateway")
+
htmlfile = None
def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
result = []
- for is_incoming, timestamp, packet in fetch(
- imei,
- [
- (True, proto_name(GPS_POSITIONING)),
- (False, proto_name(WIFI_POSITIONING)),
- ],
- numback,
- ):
- msg = parse_message(packet, is_incoming=is_incoming)
- result.append(
- {
- "type": "location",
- "imei": imei,
- "timestamp": str(
- datetime.fromtimestamp(timestamp).astimezone(
- tz=timezone.utc
- )
- ),
- "longitude": msg.longitude,
- "latitude": msg.latitude,
- "accuracy": "gps"
- if isinstance(msg, GPS_POSITIONING)
- else "approximate",
- }
- )
+ for report in fetch(imei, numback):
+ report["type"] = "location"
+ timestamp = report.pop("devtime")
+ report["timestamp"] = timestamp
+ result.append(report)
return result
self.ready = False
self.imeis: Set[str] = set()
+ def __str__(self) -> str:
+ return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})"
+
def close(self) -> None:
log.debug("Closing fd %d", self.sock.fileno())
self.sock.close()
clnt.close()
del self.by_fd[fd]
- def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
+ def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
clnt = self.by_fd[fd]
- return clnt.recv()
+ return (clnt, clnt.recv())
- def send(self, msg: Dict[str, Any]) -> Set[int]:
+ def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
towrite = set()
- for fd, clnt in self.by_fd.items():
- if clnt.wants(msg["imei"]):
+ if clnt is None:
+ for fd, cl in self.by_fd.items():
+ if cl.wants(msg["imei"]):
+ cl.send(msg)
+ towrite.add(fd)
+ else:
+ fd = clnt.sock.fileno()
+ if self.by_fd.get(fd, None) == clnt:
clnt.send(msg)
towrite.add(fd)
+ else:
+ log.info(
+ "Trying to send %s to client at %d, not in service",
+ msg,
+ fd,
+ )
return towrite
def write(self, towrite: Set[int]) -> Set[int]:
return result
+def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]:
+ imei = wsmsg.pop("imei", None)
+ cmd = wsmsg.pop("type", None)
+ if imei is None or cmd is None:
+ log.info("Unhandled message %s %s %s", cmd, imei, wsmsg)
+ return {
+ "type": "cmdresult",
+ "imei": imei,
+ "result": "Did not get imei or cmd",
+ }
+ pmod = fetchpmod(imei)
+ if pmod is None:
+ log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg)
+ return {
+ "type": "cmdresult",
+ "imei": imei,
+ "result": "Type of the terminal is unknown",
+ }
+ tmsg = common.make_response(pmod, cmd, imei, **wsmsg)
+ if tmsg is None:
+ log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg)
+ return {
+ "type": "cmdresult",
+ "imei": imei,
+ "result": f"{cmd} unimplemented for terminal protocol {pmod}",
+ }
+ resp = Resp(imei=imei, when=time(), packet=tmsg.packed)
+ log.debug("Response: %s", resp)
+ zpush.send(resp.packed)
+ return {
+ "type": "cmdresult",
+ "imei": imei,
+ "result": f"{cmd} sent to {imei}",
+ }
+
+
def runserver(conf: ConfigParser) -> None:
global htmlfile
-
initdb(conf.get("storage", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zsub = zctx.socket(zmq.SUB) # type: ignore
- zsub.connect(conf.get("collector", "publishurl"))
+ zsub.connect(conf.get("rectifier", "publishurl"))
+ zpush = zctx.socket(zmq.PUSH) # type: ignore
+ zpush.connect(conf.get("collector", "listenurl"))
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setblocking(False)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
while True:
neededsubs = clients.subs()
for imei in neededsubs - activesubs:
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(GPS_POSITIONING), True, imei),
- )
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(WIFI_POSITIONING), False, imei),
- )
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto_name(STATUS), True, imei),
- )
+ zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
for imei in activesubs - neededsubs:
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(GPS_POSITIONING), True, imei),
- )
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(WIFI_POSITIONING), False, imei),
- )
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto_name(STATUS), True, imei),
- )
+ zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
- tosend = []
+ tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
topoll = []
tostop = []
towrite = set()
if sk is zsub:
while True:
try:
- zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
- msg = parse_message(zmsg.packet, zmsg.is_incoming)
- log.debug("Got %s with %s", zmsg, msg)
- if isinstance(msg, STATUS):
- tosend.append(
- {
- "type": "status",
- "imei": zmsg.imei,
- "timestamp": str(
- datetime.fromtimestamp(
- zmsg.when
- ).astimezone(tz=timezone.utc)
- ),
- "battery": msg.batt,
- }
- )
- else:
- tosend.append(
- {
- "type": "location",
- "imei": zmsg.imei,
- "timestamp": str(
- datetime.fromtimestamp(
- zmsg.when
- ).astimezone(tz=timezone.utc)
- ),
- "longitude": msg.longitude,
- "latitude": msg.latitude,
- "accuracy": "gps"
- if zmsg.is_incoming
- else "approximate",
- }
- )
+ zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+ msg = loads(zmsg.payload)
+ msg["imei"] = zmsg.imei
+ log.debug("Got %s, sending %s", zmsg, msg)
+ tosend.append((None, msg))
except zmq.Again:
break
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
topoll.append((clntsock, clntaddr))
elif fl & zmq.POLLIN:
- received = clients.recv(sk)
+ clnt, received = clients.recv(sk)
if received is None:
log.debug("Client gone from fd %d", sk)
tostop.append(sk)
imeis = cast(List[str], wsmsg.get("imei"))
numback: int = wsmsg.get("backlog", 5)
for imei in imeis:
- tosend.extend(backlog(imei, numback))
+ tosend.extend(
+ [
+ (clnt, msg)
+ for msg in backlog(imei, numback)
+ ]
+ )
+ else:
+ tosend.append((clnt, sendcmd(zpush, wsmsg)))
towrite.add(sk)
elif fl & zmq.POLLOUT:
log.debug("Write now open for fd %d", sk)
for fd in tostop:
poller.unregister(fd) # type: ignore
clients.stop(fd)
- for wsmsg in tosend:
- log.debug("Sending to the clients: %s", wsmsg)
- towrite |= clients.send(wsmsg)
+ for towhom, wsmsg in tosend:
+ log.debug("Sending to the client %s: %s", towhom, wsmsg)
+ towrite |= clients.send(towhom, wsmsg)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
poller.register(fd, flags=zmq.POLLIN)