""" sqlite event store """
-from sqlite3 import connect, OperationalError
-from typing import Any, List, Tuple
+from datetime import datetime
+from json import dumps, loads
+from sqlite3 import connect, OperationalError, Row
+from typing import Any, Dict, List, Optional, Tuple
-__all__ = "fetch", "initdb", "stow"
+__all__ = "fetch", "initdb", "stow", "stowloc"
DB = None
-SCHEMA = """create table if not exists events (
+SCHEMA = (
+ """create table if not exists events (
tstamp real not null,
imei text,
peeraddr text not null,
is_incoming int not null default TRUE,
proto text not null,
packet blob
-)"""
+)""",
+ """create table if not exists reports (
+ imei text,
+ devtime text not null,
+ accuracy real,
+ latitude real,
+ longitude real,
+ remainder text
+)""",
+ """create table if not exists pmodmap (
+ imei text not null unique,
+ pmod text not null
+)""",
+)
def initdb(dbname: str) -> None:
global DB
DB = connect(dbname)
- try:
- DB.execute(
- """alter table events add column
- is_incoming int not null default TRUE"""
- )
- except OperationalError:
- DB.execute(SCHEMA)
+ DB.row_factory = Row
+ for stmt in SCHEMA:
+ DB.execute(stmt)
def stow(**kwargs: Any) -> None:
DB.commit()
-def fetch(
- imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, bytes]]:
- # matchlist is a list of tuples (is_incoming, proto)
- # returns a list of tuples (is_incoming, timestamp, packet)
+def stowloc(**kwargs: Dict[str, Any]) -> None:
assert DB is not None
- selector = " or ".join(
- (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
+ parms = {
+ k: kwargs.pop(k) if k in kwargs else v
+ for k, v in (
+ ("imei", None),
+ ("devtime", str(datetime.now())),
+ ("accuracy", None),
+ ("latitude", None),
+ ("longitude", None),
+ )
+ }
+ parms["remainder"] = dumps(kwargs)
+ DB.execute(
+ """insert or ignore into reports
+ (imei, devtime, accuracy, latitude, longitude, remainder)
+ values
+ (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
+ """,
+ parms,
)
+ DB.commit()
+
+
+def stowpmod(imei: str, pmod: str) -> None:
+ assert DB is not None
+ DB.execute(
+ """insert or replace into pmodmap
+ (imei, pmod) values (:imei, :pmod)
+ """,
+ {"imei": imei, "pmod": pmod},
+ )
+ DB.commit()
+
+
+def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
+ assert DB is not None
cur = DB.cursor()
cur.execute(
- f"""select is_incoming, tstamp, packet from events
- where ({selector}) and imei = ?
- order by tstamp desc limit ?""",
- tuple(item for sublist in matchlist for item in sublist)
- + (imei, backlog),
+ """select imei, devtime, accuracy, latitude, longitude, remainder
+ from reports where imei = ?
+ order by devtime desc limit ?""",
+ (imei, backlog),
)
- result = list(cur)
+ result = []
+ for row in cur:
+ dic = dict(row)
+ remainder = loads(dic.pop("remainder"))
+ dic.update(remainder)
+ result.append(dic)
cur.close()
return list(reversed(result))
+
+
+def fetchpmod(imei: str) -> Optional[Any]:
+ assert DB is not None
+ ret = None
+ cur = DB.cursor()
+ cur.execute("select pmod from pmodmap where imei = ?", (imei,))
+ result = cur.fetchone()
+ if result:
+ ret = result[0]
+ cur.close()
+ return ret