""" sqlite event store """
from datetime import datetime
-from json import dumps
-from sqlite3 import connect, OperationalError
+from json import dumps, loads
+from sqlite3 import connect, OperationalError, Row
from typing import Any, Dict, List, Tuple
__all__ = "fetch", "initdb", "stow", "stowloc"
def initdb(dbname: str) -> None:
global DB
DB = connect(dbname)
- try:
- DB.execute(
- """alter table events add column
- is_incoming int not null default TRUE"""
- )
- except OperationalError:
- for stmt in SCHEMA:
- DB.execute(stmt)
+ DB.row_factory = Row
+ for stmt in SCHEMA:
+ DB.execute(stmt)
def stow(**kwargs: Any) -> None:
DB.commit()
-def fetch(
- imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, str, bytes]]:
- # matchlist is a list of tuples (is_incoming, proto)
- # returns a list of tuples (is_incoming, timestamp, packet)
+def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
assert DB is not None
- selector = " or ".join(
- (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
- )
cur = DB.cursor()
cur.execute(
- f"""select is_incoming, tstamp, proto, packet from events
- where ({selector}) and imei = ?
- order by tstamp desc limit ?""",
- tuple(item for sublist in matchlist for item in sublist)
- + (imei, backlog),
+ """select imei, devtime, accuracy, latitude, longitude, remainder
+ from reports where imei = ?
+ order by devtime desc limit ?""",
+ (imei, backlog),
)
- result = list(cur)
+ result = []
+ for row in cur:
+ dic = dict(row)
+ remainder = loads(dic.pop("remainder"))
+ dic.update(remainder)
+ result.append(dic)
cur.close()
return list(reversed(result))
from . import common
from .evstore import initdb, fetch
from .protomodule import ProtoModule
-from .zmsg import Bcast, topic
+from .zmsg import Rept, rtopic
log = getLogger("loctrkd/wsgateway")
-
htmlfile = None
-pmods: List[ProtoModule] = []
-selector: List[Tuple[bool, str]] = []
def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
result = []
- for is_incoming, timestamp, proto, packet in fetch(
- imei,
- selector,
- numback,
- ):
- for pmod in pmods:
- if pmod.proto_handled(proto):
- msg = pmod.parse_message(packet, is_incoming=is_incoming)
- result.append(
- {
- "type": "location",
- "imei": imei,
- "timestamp": str(
- datetime.fromtimestamp(timestamp).astimezone(
- tz=timezone.utc
- )
- ),
- "longitude": msg.longitude,
- "latitude": msg.latitude,
- "accuracy": "gps"
- if True # TODO isinstance(msg, GPS_POSITIONING)
- else "approximate",
- }
- )
+ for report in fetch(imei, numback):
+ report["type"] = "location"
+ timestamp = report.pop("devtime")
+ report["timestamp"] = timestamp
+ result.append(report)
return result
def runserver(conf: ConfigParser) -> None:
- global htmlfile, pmods, selector
- pmods = [
- cast(ProtoModule, import_module("." + modnm, __package__))
- for modnm in conf.get("common", "protocols").split(",")
- ]
- for pmod in pmods:
- for proto, is_incoming in pmod.exposed_protos():
- if proto != "ZX:STATUS": # TODO make it better
- selector.append((is_incoming, proto))
+ global htmlfile
initdb(conf.get("storage", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zsub = zctx.socket(zmq.SUB) # type: ignore
- zsub.connect(conf.get("collector", "publishurl"))
+ zsub.connect(conf.get("rectifier", "publishurl"))
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setblocking(False)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
towait: Set[int] = set()
while True:
neededsubs = clients.subs()
- for pmod in pmods:
- for proto, is_incoming in pmod.exposed_protos():
- for imei in neededsubs - activesubs:
- zsub.setsockopt(
- zmq.SUBSCRIBE,
- topic(proto, is_incoming, imei),
- )
- for imei in activesubs - neededsubs:
- zsub.setsockopt(
- zmq.UNSUBSCRIBE,
- topic(proto, is_incoming, imei),
- )
+ for imei in neededsubs - activesubs:
+ zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
+ for imei in activesubs - neededsubs:
+ zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
tosend = []
if sk is zsub:
while True:
try:
- zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
- for pmod in pmods:
- if pmod.proto_handled(zmsg.proto):
- msg = pmod.parse_message(
- zmsg.packet, zmsg.is_incoming
- )
- log.debug("Got %s with %s", zmsg, msg)
- if zmsg.proto == "ZX:STATUS":
- tosend.append(
- {
- "type": "status",
- "imei": zmsg.imei,
- "timestamp": str(
- datetime.fromtimestamp(
- zmsg.when
- ).astimezone(tz=timezone.utc)
- ),
- "battery": msg.batt,
- }
- )
- else:
- tosend.append(
- {
- "type": "location",
- "imei": zmsg.imei,
- "timestamp": str(
- datetime.fromtimestamp(
- zmsg.when
- ).astimezone(tz=timezone.utc)
- ),
- "longitude": msg.longitude,
- "latitude": msg.latitude,
- "accuracy": "gps"
- if zmsg.is_incoming
- else "approximate",
- }
- )
+ zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+ msg = loads(zmsg.payload)
+ msg["imei"] = zmsg.imei
+ log.debug("Got %s, sending %s", zmsg, msg)
+ tosend.append(msg)
except zmq.Again:
break
elif sk == tcpfd: