""" Websocket Gateway """
-from json import loads
+from datetime import datetime, timezone
+from json import dumps, loads
from logging import getLogger
from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
import zmq
from . import common
-from .backlog import blinit, backlog
+from .evstore import initdb, fetch
from .gps303proto import (
GPS_POSITIONING,
+ STATUS,
WIFI_POSITIONING,
parse_message,
)
htmlfile = None
+def backlog(imei, numback):
+ result = []
+ for is_incoming, timestamp, packet in fetch(
+ imei,
+ ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)),
+ numback,
+ ):
+ msg = parse_message(packet, is_incoming=is_incoming)
+ result.append(
+ {
+ "type": "location",
+ "imei": imei,
+ "timestamp": str(
+ datetime.fromtimestamp(timestamp).astimezone(
+ tz=timezone.utc
+ )
+ ),
+ "longitude": msg.longitude,
+ "latitude": msg.latitude,
+ "accuracy": "gps"
+ if isinstance(msg, GPS_POSITIONING)
+ else "approximate",
+ }
+ )
+ return result
+
+
def try_http(data, fd, e):
global htmlfile
try:
fd,
headers,
)
- try:
- pos = resource.index("?")
- resource = resource[:pos]
- except ValueError:
- pass
if op == "GET":
if htmlfile is None:
return (
f"{proto} 500 No data configured\r\n"
f"Content-Type: text/plain\r\n\r\n"
- f"HTML data not configure on the server\r\n".encode()
+ f"HTML data not configured on the server\r\n".encode()
)
- elif resource == "/":
+ else:
try:
with open(htmlfile, "rb") as fl:
htmldata = fl.read()
f"Content-Type: text/plain\r\n\r\n"
f"HTML file could not be opened\r\n".encode()
)
- else:
- return (
- f"{proto} 404 File not found\r\n"
- f"Content-Type: text/plain\r\n\r\n"
- f'We can only serve "/"\r\n'.encode()
- )
else:
return (
f"{proto} 400 Bad request\r\n"
log.debug("%s on fd %d", event, self.sock.fileno())
self.ws_data += self.ws.send(event.response())
elif isinstance(event, TextMessage):
- # TODO: save imei "subscription"
log.debug("%s on fd %d", event, self.sock.fileno())
msg = loads(event.data)
msgs.append(msg)
self.imeis,
self.sock.fileno(),
)
- return True # TODO: check subscriptions
+ return imei in self.imeis
def send(self, message):
- if self.ready and message.imei in self.imeis:
- self.ws_data += self.ws.send(Message(data=message.json))
+ if self.ready and message["imei"] in self.imeis:
+ self.ws_data += self.ws.send(Message(data=dumps(message)))
def write(self):
if self.ws_data:
def send(self, msg):
towrite = set()
for fd, clnt in self.by_fd.items():
- if clnt.wants(msg.imei):
+ if clnt.wants(msg["imei"]):
clnt.send(msg)
towrite.add(fd)
return towrite
def runserver(conf):
global htmlfile
- blinit(conf.get("storage", "dbfn"))
+ initdb(conf.get("storage", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile")
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
for imei in neededsubs - activesubs:
zsub.setsockopt(
zmq.SUBSCRIBE,
- topic(GPS_POSITIONING.PROTO, True),
+ topic(GPS_POSITIONING.PROTO, True, imei),
+ )
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False, imei),
)
zsub.setsockopt(
zmq.SUBSCRIBE,
- topic(WIFI_POSITIONING.PROTO, False),
+ topic(STATUS.PROTO, True, imei),
)
for imei in activesubs - neededsubs:
zsub.setsockopt(
zmq.UNSUBSCRIBE,
- topic(GPS_POSITIONING.PROTO, True),
+ topic(GPS_POSITIONING.PROTO, True, imei),
+ )
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False, imei),
)
zsub.setsockopt(
zmq.UNSUBSCRIBE,
- topic(WIFI_POSITIONING.PROTO, False),
+ topic(STATUS.PROTO, True, imei),
)
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
while True:
try:
zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
- msg = parse_message(zmsg.packet)
- tosend.append(zmsg)
- log.debug("Got %s", zmsg)
+ msg = parse_message(zmsg.packet, zmsg.is_incoming)
+ log.debug("Got %s with %s", zmsg, msg)
+ if isinstance(msg, STATUS):
+ tosend.append(
+ {
+ "type": "status",
+ "imei": zmsg.imei,
+ "timestamp": str(
+ datetime.fromtimestamp(
+ zmsg.when
+ ).astimezone(tz=timezone.utc)
+ ),
+ "battery": msg.batt,
+ }
+ )
+ else:
+ tosend.append(
+ {
+ "type": "location",
+ "imei": zmsg.imei,
+ "timestamp": str(
+ datetime.fromtimestamp(
+ zmsg.when
+ ).astimezone(tz=timezone.utc)
+ ),
+ "longitude": msg.longitude,
+ "latitude": msg.latitude,
+ "accuracy": "gps"
+ if zmsg.is_incoming
+ else "approximate",
+ }
+ )
except zmq.Again:
break
elif sk == tcpfd:
for msg in received:
log.debug("Received from %d: %s", sk, msg)
if msg.get("type", None) == "subscribe":
- imei = msg.get("imei")
+ imeis = msg.get("imei")
numback = msg.get("backlog", 5)
- for elem in imei:
- tosend.extend(backlog(elem, numback))
+ for imei in imeis:
+ tosend.extend(backlog(imei, numback))
towrite.add(sk)
elif fl & zmq.POLLOUT:
log.debug("Write now open for fd %d", sk)