""" sqlite event store """
-from sqlite3 import connect
+from sqlite3 import connect, OperationalError
+from typing import Any, List, Tuple
__all__ = "fetch", "initdb", "stow"
tstamp real not null,
imei text,
peeraddr text not null,
+ is_incoming int not null default TRUE,
proto int not null,
packet blob
)"""
-def initdb(dbname):
+def initdb(dbname: str) -> None:
global DB
DB = connect(dbname)
- DB.execute(SCHEMA)
+ try:
+ DB.execute(
+ """alter table events add column
+ is_incoming int not null default TRUE"""
+ )
+ except OperationalError:
+ DB.execute(SCHEMA)
-def stow(**kwargs):
+def stow(**kwargs: Any) -> None:
assert DB is not None
parms = {
k: kwargs[k] if k in kwargs else v
for k, v in (
+ ("is_incoming", True),
("peeraddr", None),
("when", 0.0),
("imei", None),
assert len(kwargs) <= len(parms)
DB.execute(
"""insert or ignore into events
- (tstamp, imei, peeraddr, proto, packet)
+ (tstamp, imei, peeraddr, proto, packet, is_incoming)
values
- (:when, :imei, :peeraddr, :proto, :packet)
+ (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
""",
parms,
)
DB.commit()
-def fetch(imei, protos, backlog):
+
+def fetch(
+ imei: str, matchlist: List[Tuple[bool, int]], backlog: int
+) -> List[Tuple[bool, float, bytes]]:
+ # matchlist is a list of tuples (is_incoming, proto)
+ # returns a list of tuples (is_incoming, timestamp, packet)
assert DB is not None
- protosel = ", ".join(["?" for _ in range(len(protos))])
+ selector = " or ".join(
+ (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
+ )
cur = DB.cursor()
- cur.execute(f"""select packet from events
- where proto in ({protosel}) and imei = ?
+ cur.execute(
+ f"""select is_incoming, tstamp, packet from events
+ where ({selector}) and imei = ?
order by tstamp desc limit ?""",
- protos + (imei, backlog))
- result = [row[0] for row in cur]
+ tuple(item for sublist in matchlist for item in sublist)
+ + (imei, backlog),
+ )
+ result = list(cur)
cur.close()
- return result
+ return list(reversed(result))