+""" sqlite event store """
+
from sqlite3 import connect
-__all__ = ("initdb", "stow")
+__all__ = "initdb", "stow"
DB = None
SCHEMA = """create table if not exists events (
- timestamp real not null,
+ tstamp real not null,
imei text,
- clntaddr text not null,
- length int,
+ peeraddr text not null,
proto int not null,
- payload blob
+ packet blob
)"""
DB.execute(SCHEMA)
-def stow(clntaddr, timestamp, imei, length, proto, payload):
+def stow(**kwargs):
assert DB is not None
- parms = dict(
- zip(
- ("clntaddr", "timestamp", "imei", "length", "proto", "payload"),
- (str(clntaddr), timestamp, imei, length, proto, payload),
+ parms = {
+ k: kwargs[k] if k in kwargs else v
+ for k, v in (
+ ("peeraddr", None),
+ ("when", 0.0),
+ ("imei", None),
+ ("proto", -1),
+ ("packet", b""),
)
- )
+ }
+ assert len(kwargs) <= len(parms)
DB.execute(
"""insert or ignore into events
- (timestamp, imei, clntaddr, length, proto, payload)
+ (tstamp, imei, peeraddr, proto, packet)
values
- (:timestamp, :imei, :clntaddr, :length, :proto, :payload)
+ (:when, :imei, :peeraddr, :proto, :packet)
""",
parms,
)
from sqlite3 import connect
import sys
-from .gps303proto import *
+from .gps303proto import parse_message, proto_by_name
db = connect(sys.argv[1])
c = db.cursor()
selector = ""
c.execute(
- "select timestamp, imei, clntaddr, length, proto, payload from events" +
+ "select tstamp, imei, peeraddr, proto, packet from events" +
selector, {"proto": proto}
)
-for timestamp, imei, clntaddr, length, proto, payload in c:
- msg = make_object(length, proto, payload)
+for tstamp, imei, peeraddr, proto, packet in c:
+ msg = parse_message(packet)
print(
- datetime.fromtimestamp(timestamp)
+ datetime.fromtimestamp(tstamp)
.astimezone(tz=timezone.utc)
.isoformat(),
+ imei,
+ peeraddr,
msg,
)
""" Store zmq broadcasts to sqlite """
from datetime import datetime, timezone
-from getopt import getopt
from logging import getLogger
-from logging.handlers import SysLogHandler
-import sys
-from time import time
import zmq
from . import common
from .evstore import initdb, stow
-from .gps303proto import parse_message
+from .gps303proto import proto_of_message
from .zmsg import Bcast
log = getLogger("gps303/storage")
try:
while True:
zmsg = Bcast(zsub.recv())
- msg = parse_message(zmsg.packet)
- log.debug("IMEI %s from %s at %s: %s", zmsg.imei, zmsg.peeraddr, datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), msg)
+ log.debug(
+ "IMEI %s from %s at %s: %s",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
+ zmsg.packet.hex(),
+ )
if zmsg.peeraddr is not None:
addr, port = zmsg.peeraddr
- peeraddr = str(addr), port
+ peeraddr = str((str(addr), port))
else:
peeraddr = None
stow(
- peeraddr,
- zmsg.when,
- zmsg.imei,
- msg.length,
- msg.PROTO,
- msg.payload,
+ peeraddr=peeraddr,
+ when=zmsg.when,
+ imei=zmsg.imei,
+ proto=proto_of_message(zmsg.packet),
+ packet=zmsg.packet,
)
except KeyboardInterrupt:
pass