from typing import Any
import unittest
from .common import send_and_drain, TestWithServers
-from gps303.gps303proto import *
-from gps303.ocid_dload import SCHEMA
+from loctrkd.zx303proto import *
+from loctrkd.zx303proto import (
+ STATUS,
+ WIFI_POSITIONING,
+ WIFI_OFFLINE_POSITIONING,
+ WIFI_POSITIONING,
+ LOGIN,
+ HIBERNATION,
+ SETUP,
+)
+from loctrkd.ocid_dload import SCHEMA
class Storage(TestWithServers):
def setUp(self, *args: str, **kwargs: Any) -> None:
- super().setUp("collector", "storage", "lookaside", "termconfig")
+ super().setUp(
+ "collector", "storage", "rectifier", "termconfig", verbose=True
+ )
with connect(self.conf.get("opencellid", "dbfn")) as ldb:
ldb.execute(SCHEMA)
ldb.executemany(
super().tearDown()
def test_storage(self) -> None:
- for buf in (
- LOGIN.In(imei="9999123456780000", ver=9).packed,
+ for msg in (
+ LOGIN.In(imei="9999123456780000", ver=9),
WIFI_POSITIONING.In(
mnc=3,
mcc=262,
(24420, 36243, -78),
(24420, 17012, -44),
],
- ).packed,
- SETUP.In().packed,
- STATUS.In(signal=87).packed,
- HIBERNATION.In().packed,
+ ),
+ SETUP.In(),
+ STATUS.In(signal=87),
+ HIBERNATION.In(),
):
- send_and_drain(self.sock, b"xx" + buf + b"\r\n")
- self.sock.close()
+ print("Send:", msg)
+ send_and_drain(self.sock, b"xx" + msg.packed + b"\r\n")
sleep(1)
+ self.sock.close()
got = set()
with connect(self.conf.get("storage", "dbfn")) as db:
for is_incoming, packet in db.execute(
"select is_incoming, packet from events"
):
msg = parse_message(packet, is_incoming=is_incoming)
- print(msg)
+ print("Stored:", msg)
got.add(type(msg))
self.assertEqual(
got,