setattr(self, k, v)
+def maybe_int(x: Optional[int]) -> Optional[int]:
+ return None if x is None else int(x)
+
+
def intx(x: Union[str, int]) -> int:
if isinstance(x, str):
x = int(x, 0)
PROTO = 0x01
RESPOND = Respond.INL
# Default response for ACK, can also respond with STOP_UPLOAD
+ IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
def in_decode(self, length: int, payload: bytes) -> None:
- self.imei = payload[:8].hex()
+ self.imei = payload[:8].ljust(8, b"\0").hex()
self.ver = payload[8]
+ def in_encode(self) -> bytes:
+ return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
+ "B", self.ver
+ )
+
class SUPERVISION(GPS303Pkt):
PROTO = 0x05
class STATUS(GPS303Pkt):
PROTO = 0x13
RESPOND = Respond.EXT
+ IN_KWARGS = (
+ ("batt", int, 100),
+ ("ver", int, 0),
+ ("timezone", int, 0),
+ ("intvl", int, 0),
+ ("signal", maybe_int, None),
+ )
OUT_KWARGS = (("upload_interval", int, 25),)
def in_decode(self, length: int, payload: bytes) -> None:
else:
self.signal = None
+ def in_encode(self) -> bytes:
+ return (
+ pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
+ if self.signal is None
+ else pack("B", self.signal)
+ )
+
def out_encode(self) -> bytes: # Set interval in minutes
return pack("B", self.upload_interval)
class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
PROTO = 0x14
+ def in_encode(self) -> bytes:
+ return b""
+
class RESET(GPS303Pkt):
# Device sends when it got reset SMS
from time import sleep
import unittest
from .common import send_and_drain, TestWithServers
+from gps303.gps303proto import *
class Storage(TestWithServers):
super().tearDown()
def test_storage(self) -> None:
- buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
- send_and_drain(self.sock, buf)
+ for buf in (
+ LOGIN.In(imei="9999123456780000", ver=9).packed,
+ STATUS.In().packed,
+ HIBERNATION.In().packed,
+ ):
+ send_and_drain(self.sock, b"xx" + buf + b"\r\n")
self.sock.close()
- # TODO: make a proper sequence
sleep(1)
- print("checking database")
+ got = set()
with connect(self.conf.get("storage", "dbfn")) as db:
db.row_factory = Row
- for row in db.execute("select * from events"):
- print(dict(row))
+ for is_incoming, packet in db.execute(
+ "select is_incoming, packet from events"
+ ):
+ msg = parse_message(packet, is_incoming=is_incoming)
+ # print(msg)
+ got.add(type(msg))
+ self.assertEqual(
+ got, {LOGIN.Out, HIBERNATION.In, LOGIN.In, STATUS.Out, STATUS.In}
+ )
if __name__ == "__main__":