setattr(self, k, v)
-def maybe_int(x: Optional[int]) -> Optional[int]:
- return None if x is None else int(x)
+def maybe(typ: type) -> Callable[[Any], Any]:
+ return lambda x: None if x is None else typ(x)
def intx(x: Union[str, int]) -> int:
@property
def packed(self) -> bytes:
payload = self.encode()
- length = len(payload) + 1
+ length = getattr(self, "length", len(payload) + 1)
return pack("BB", length, self.PROTO) + payload
("ver", int, 0),
("timezone", int, 0),
("intvl", int, 0),
- ("signal", maybe_int, None),
+ ("signal", maybe(int), None),
)
OUT_KWARGS = (("upload_interval", int, 25),)
self.signal = None
def in_encode(self) -> bytes:
- return (
- pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
- if self.signal is None
- else pack("B", self.signal)
+ return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
+ b"" if self.signal is None else pack("B", self.signal)
)
def out_encode(self) -> bytes: # Set interval in minutes
class _WIFI_POSITIONING(GPS303Pkt):
+ IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
+ # IN_KWARGS = (
+ ("dtime", bytes, b"\0\0\0\0\0\0"),
+ ("wifi_aps", list, []),
+ ("mcc", int, 0),
+ ("mnc", int, 0),
+ ("gsm_cells", list, []),
+ )
+
def in_decode(self, length: int, payload: bytes) -> None:
self.dtime = payload[:6]
if self.dtime == b"\0\0\0\0\0\0":
)
self.gsm_cells.append((locac, cellid, -sigstr))
+ def in_encode(self) -> bytes:
+ self.length = len(self.wifi_aps)
+ return b"".join(
+ [
+ self.dtime,
+ b"".join(
+ [
+ bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
+ + pack("B", -sigstr)
+ for mac, sigstr in self.wifi_aps
+ ]
+ ),
+ pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
+ b"".join(
+ [
+ pack("!HHB", locac, cellid, -sigstr)
+ for locac, cellid, sigstr in self.gsm_cells
+ ]
+ ),
+ ]
+ )
+
class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x17
+ [b";".join([el.encode() for el in self.phonenumbers])]
)
+ def in_encode(self) -> bytes:
+ return b""
+
class SYNCHRONOUS_WHITELIST(GPS303Pkt):
PROTO = 0x58
import unittest
from .common import send_and_drain, TestWithServers
from gps303.gps303proto import *
+from gps303.ocid_dload import SCHEMA
class Storage(TestWithServers):
def setUp(self, *args: str, **kwargs: Any) -> None:
super().setUp("collector", "storage", "lookaside", "termconfig")
+ with connect(self.conf.get("opencellid", "dbfn")) as ldb:
+ ldb.execute(SCHEMA)
+ ldb.executemany(
+ """insert into cells
+ (radio, mcc, net, area, cell, unit, lon, lat, range,
+ samples, changeable, created, updated, averageSignal)
+ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ (
+ (
+ "GSM",
+ 262,
+ 3,
+ 24420,
+ 16594,
+ -1,
+ 12.681939,
+ 53.52603,
+ 22733,
+ 1999,
+ 1,
+ 1556575612,
+ 1653387028,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 3,
+ 24420,
+ 36243,
+ -1,
+ 12.66442,
+ 53.527534,
+ 21679,
+ 1980,
+ 1,
+ 1540870608,
+ 1653387028,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 3,
+ 24420,
+ 17012,
+ -1,
+ 12.741093,
+ 53.529854,
+ 23463,
+ 874,
+ 1,
+ 1563404603,
+ 1653268184,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 3,
+ 24420,
+ 26741,
+ -1,
+ 12.658822,
+ 53.530832,
+ 18809,
+ 1687,
+ 1,
+ 1539939964,
+ 1653265176,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 2,
+ 24420,
+ 36243,
+ -1,
+ 12.61111,
+ 53.536626,
+ 1000,
+ 4,
+ 1,
+ 1623218739,
+ 1652696033,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 1,
+ 24420,
+ 36243,
+ -1,
+ 12.611135,
+ 53.536636,
+ 1000,
+ 3,
+ 1,
+ 1568587946,
+ 1628827437,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 2,
+ 24420,
+ 17012,
+ -1,
+ 12.829655,
+ 53.536654,
+ 1000,
+ 2,
+ 1,
+ 1609913384,
+ 1612934718,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 3,
+ 24000,
+ 35471,
+ -1,
+ 11.505135,
+ 53.554216,
+ 11174,
+ 829,
+ 1,
+ 1544494558,
+ 1651063300,
+ 0,
+ ),
+ (
+ "GSM",
+ 262,
+ 3,
+ 24420,
+ 37156,
+ -1,
+ 11.918188,
+ 53.870522,
+ 1000,
+ 1,
+ 1,
+ 1550199983,
+ 1550199983,
+ 0,
+ ),
+ ),
+ )
+ ldb.commit()
for fam, typ, pro, cnm, skadr in getaddrinfo(
"127.0.0.1",
self.conf.getint("collector", "port"),
def test_storage(self) -> None:
for buf in (
LOGIN.In(imei="9999123456780000", ver=9).packed,
- STATUS.In().packed,
+ WIFI_POSITIONING.In(
+ mnc=3,
+ mcc=262,
+ wifi_aps=[
+ ("02:03:04:05:06:07", -89),
+ ("92:93:94:95:96:97", -70),
+ ],
+ gsm_cells=[
+ (24420, 27178, -90),
+ (24420, 36243, -78),
+ (24420, 17012, -44),
+ ],
+ ).packed,
+ SETUP.In().packed,
+ STATUS.In(signal=87).packed,
HIBERNATION.In().packed,
):
send_and_drain(self.sock, b"xx" + buf + b"\r\n")
"select is_incoming, packet from events"
):
msg = parse_message(packet, is_incoming=is_incoming)
- # print(msg)
+ print(msg)
got.add(type(msg))
self.assertEqual(
- got, {LOGIN.Out, HIBERNATION.In, LOGIN.In, STATUS.Out, STATUS.In}
+ got,
+ {
+ LOGIN.Out,
+ HIBERNATION.In,
+ LOGIN.In,
+ SETUP.In,
+ SETUP.Out,
+ STATUS.Out,
+ STATUS.In,
+ WIFI_POSITIONING.In,
+ WIFI_POSITIONING.Out,
+ },
)