sleep(1) # give collector some time
send_and_drain(self.sock, None)
self.sock.close()
- print("finished fuzzing")
+ sleep(1) # Let the server close their side
super().tearDown()
def test_stream(self) -> None:
from random import Random
from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
-from sqlite3 import connect
+from sqlite3 import connect, Row
from time import sleep
import unittest
from .common import send_and_drain, TestWithServers
def tearDown(self) -> None:
sleep(1) # give collector some time
- send_and_drain(self.sock, None)
- self.sock.close()
super().tearDown()
def test_storage(self) -> None:
buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
send_and_drain(self.sock, buf)
+ self.sock.close()
# TODO: make a proper sequence
+ sleep(1)
+ print("checking database")
with connect(self.conf.get("storage", "dbfn")) as db:
- c = db.cursor()
- c.execute("select * from events")
- events = [dict(row) for row in c]
- print(events)
+ db.row_factory = Row
+ for row in db.execute("select * from events"):
+ print(dict(row))
if __name__ == "__main__":