1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
5 from sqlite3 import connect
9 from .common import send_and_drain, TestWithServers
10 from loctrkd.zx303proto import *
11 from loctrkd.zx303proto import (
14 WIFI_OFFLINE_POSITIONING,
20 from loctrkd.ocid_dload import SCHEMA
23 class Storage(TestWithServers):
24 def setUp(self, *args: str, **kwargs: Any) -> None:
26 "collector", "storage", "rectifier", "termconfig", verbose=True
28 with connect(self.conf.get("opencellid", "dbfn")) as ldb:
32 (radio, mcc, net, area, cell, unit, lon, lat, range,
33 samples, changeable, created, updated, averageSignal)
34 values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
183 for fam, typ, pro, cnm, skadr in getaddrinfo(
185 self.conf.getint("collector", "port"),
189 break # Just take the first element
190 self.sock = socket(AF_INET, SOCK_STREAM)
191 self.sock.connect(skadr)
193 def tearDown(self) -> None:
194 sleep(1) # give collector some time
197 def test_storage(self) -> None:
199 LOGIN.In(imei="9999123456780000", ver=9),
204 ("02:03:04:05:06:07", -89),
205 ("92:93:94:95:96:97", -70),
214 STATUS.In(signal=87),
218 send_and_drain(self.sock, b"xx" + msg.packed + b"\r\n")
222 with connect(self.conf.get("storage", "dbfn")) as db:
223 for is_incoming, packet in db.execute(
224 "select is_incoming, packet from events"
226 msg = parse_message(packet, is_incoming=is_incoming)
227 print("Stored:", msg)
240 WIFI_POSITIONING.Out,
245 if __name__ == "__main__":