""" sqlite event store """
from datetime import datetime
-from json import dumps
-from sqlite3 import connect, OperationalError
-from typing import Any, Dict, List, Tuple
+from json import dumps, loads
+from sqlite3 import connect, OperationalError, Row
+from typing import Any, Dict, List, Optional, Tuple
__all__ = "fetch", "initdb", "stow", "stowloc"
latitude real,
longitude real,
remainder text
+)""",
+ """create table if not exists pmodmap (
+ imei text not null unique,
+ pmod text not null,
+ tstamp real not null default (unixepoch())
)""",
)
def initdb(dbname: str) -> None:
global DB
DB = connect(dbname)
+ DB.row_factory = Row
+ need_populate_pmodmap = False
try:
+ DB.execute("select count(pmod) from pmodmap")
+ try:
+ DB.execute("select count(tstamp) from pmodmap")
+ except OperationalError:
+ need_populate_pmodmap = True
+ DB.execute("alter table pmodmap rename to old_pmodmap")
+ except OperationalError:
+ pass # DB was empty
+ for stmt in SCHEMA:
+ DB.execute(stmt)
+ if need_populate_pmodmap:
DB.execute(
- """alter table events add column
- is_incoming int not null default TRUE"""
+ """insert into pmodmap(imei, pmod)
+ select imei, pmod from old_pmodmap"""
)
- except OperationalError:
- for stmt in SCHEMA:
- DB.execute(stmt)
+ DB.execute("drop table old_pmodmap")
+ DB.commit()
def stow(**kwargs: Any) -> None:
DB.commit()
-def fetch(
- imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, str, bytes]]:
- # matchlist is a list of tuples (is_incoming, proto)
- # returns a list of tuples (is_incoming, timestamp, packet)
+def stowpmod(imei: str, pmod: str) -> None:
assert DB is not None
- selector = " or ".join(
- (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
+ DB.execute(
+ """insert or replace into pmodmap
+ (imei, pmod) values (:imei, :pmod)
+ """,
+ {"imei": imei, "pmod": pmod},
)
+ DB.commit()
+
+
+def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
+ assert DB is not None
cur = DB.cursor()
cur.execute(
- f"""select is_incoming, tstamp, proto, packet from events
- where ({selector}) and imei = ?
- order by tstamp desc limit ?""",
- tuple(item for sublist in matchlist for item in sublist)
- + (imei, backlog),
+ """select imei, devtime, accuracy, latitude, longitude, remainder
+ from reports where imei = ?
+ order by devtime desc limit ?""",
+ (imei, backlog),
)
- result = list(cur)
+ result = []
+ for row in cur:
+ dic = dict(row)
+ remainder = loads(dic.pop("remainder"))
+ dic.update(remainder)
+ result.append(dic)
cur.close()
return list(reversed(result))
+
+
+def fetchpmod(imei: str) -> Optional[Any]:
+ assert DB is not None
+ ret = None
+ cur = DB.cursor()
+ cur.execute(
+ """select pmod from pmodmap where imei = ?
+ and tstamp > unixepoch() - 3600.0""",
+ (imei,),
+ )
+ result = cur.fetchone()
+ if result:
+ ret = result[0]
+ cur.close()
+ return ret