1 """ sqlite event store """
3 from datetime import datetime
5 from sqlite3 import connect, OperationalError
6 from typing import Any, Dict, List, Tuple
8 __all__ = "fetch", "initdb", "stow", "stowloc"
13 """create table if not exists events (
16 peeraddr text not null,
17 is_incoming int not null default TRUE,
21 """create table if not exists reports (
23 devtime text not null,
32 def initdb(dbname: str) -> None:
37 """alter table events add column
38 is_incoming int not null default TRUE"""
40 except OperationalError:
45 def stow(**kwargs: Any) -> None:
48 k: kwargs[k] if k in kwargs else v
50 ("is_incoming", True),
58 assert len(kwargs) <= len(parms)
60 """insert or ignore into events
61 (tstamp, imei, peeraddr, proto, packet, is_incoming)
63 (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
70 def stowloc(**kwargs: Dict[str, Any]) -> None:
73 k: kwargs.pop(k) if k in kwargs else v
76 ("devtime", str(datetime.now())),
82 parms["remainder"] = dumps(kwargs)
84 """insert or ignore into reports
85 (imei, devtime, accuracy, latitude, longitude, remainder)
87 (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
95 imei: str, matchlist: List[Tuple[bool, str]], backlog: int
96 ) -> List[Tuple[bool, float, str, bytes]]:
97 # matchlist is a list of tuples (is_incoming, proto)
98 # returns a list of tuples (is_incoming, timestamp, packet)
100 selector = " or ".join(
101 (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
105 f"""select is_incoming, tstamp, proto, packet from events
106 where ({selector}) and imei = ?
107 order by tstamp desc limit ?""",
108 tuple(item for sublist in matchlist for item in sublist)
113 return list(reversed(result))