]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
test: include lookaside and termconfig in the loop
[loctrkd.git] / gps303 / gps303proto.py
index da5796f988eeb281f4fa2e85c64e55e12c7e44f1..0a222d05634930679995e6e4807d939d35104346 100755 (executable)
@@ -18,7 +18,17 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
-from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Tuple,
+    Type,
+    TYPE_CHECKING,
+    Union,
+)
 
 __all__ = (
     "class_by_prefix",
 
 __all__ = (
     "class_by_prefix",
@@ -75,12 +85,26 @@ class DecodeError(Exception):
             setattr(self, k, v)
 
 
             setattr(self, k, v)
 
 
+def maybe(typ: type) -> Callable[[Any], Any]:
+    return lambda x: None if x is None else typ(x)
+
+
 def intx(x: Union[str, int]) -> int:
     if isinstance(x, str):
         x = int(x, 0)
     return x
 
 
 def intx(x: Union[str, int]) -> int:
     if isinstance(x, str):
         x = int(x, 0)
     return x
 
 
+def boolx(x: Union[str, bool]) -> bool:
+    if isinstance(x, str):
+        if x.upper() in ("ON", "TRUE", "1"):
+            return True
+        if x.upper() in ("OFF", "FALSE", "0"):
+            return False
+        raise ValueError(str(x) + " could not be parsed as a Boolean")
+    return x
+
+
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
@@ -92,17 +116,55 @@ def hhmm(x: str) -> str:
     return x
 
 
     return x
 
 
+def hhmmhhmm(x: str) -> str:
+    """Check for the string that represents hours and minutes twice"""
+    if not isinstance(x, str) or len(x) != 8:
+        raise ValueError(str(x) + " is not an eight-character string")
+    return hhmm(x[:4]) + hhmm(x[4:])
+
+
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
+def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
+    def alrmspec(sub: str) -> Tuple[int, str]:
+        if len(sub) != 7:
+            raise ValueError(sub + " does not represent day and time")
+        return (
+            {
+                "MON": 1,
+                "TUE": 2,
+                "WED": 3,
+                "THU": 4,
+                "FRI": 5,
+                "SAT": 6,
+                "SUN": 7,
+            }[sub[:3].upper()],
+            sub[3:],
+        )
+
+    if isinstance(x, str):
+        lx = [alrmspec(sub) for sub in x.split(",")]
+    else:
+        lx = x
+    lx.extend([(0, "0000") for _ in range(3 - len(lx))])
+    if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
+        raise ValueError(str(lx) + " is a wrong alarms specification")
+    return [(d, hhmm(tm)) for d, tm in lx]
+
+
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
@@ -120,37 +182,40 @@ class MetaPkt(type):
     respectively.
     """
 
     respectively.
     """
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __new__(
     def __new__(
-        cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
+        cls: Type["MetaPkt"],
+        name: str,
+        bases: Tuple[type, ...],
+        attrs: Dict[str, Any],
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
-        setattr(
-            newcls,
-            "In",
-            super().__new__(
-                cls,
-                name + ".In",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.IN_KWARGS,
-                    "decode": newcls.in_decode,
-                    "encode": newcls.in_encode,
-                },
-            ),
+        newcls.In = super().__new__(
+            cls,
+            name + ".In",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.IN_KWARGS,
+                "decode": newcls.in_decode,
+                "encode": newcls.in_encode,
+            },
         )
         )
-        setattr(
-            newcls,
-            "Out",
-            super().__new__(
-                cls,
-                name + ".Out",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.OUT_KWARGS,
-                    "decode": newcls.out_decode,
-                    "encode": newcls.out_encode,
-                },
-            ),
+        newcls.Out = super().__new__(
+            cls,
+            name + ".Out",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.OUT_KWARGS,
+                "decode": newcls.out_decode,
+                "encode": newcls.out_encode,
+            },
         )
         return newcls
 
         )
         return newcls
 
@@ -170,6 +235,14 @@ class GPS303Pkt(metaclass=MetaPkt):
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
@@ -230,7 +303,7 @@ class GPS303Pkt(metaclass=MetaPkt):
     @property
     def packed(self) -> bytes:
         payload = self.encode()
     @property
     def packed(self) -> bytes:
         payload = self.encode()
-        length = len(payload) + 1
+        length = getattr(self, "length", len(payload) + 1)
         return pack("BB", length, self.PROTO) + payload
 
 
         return pack("BB", length, self.PROTO) + payload
 
 
@@ -242,10 +315,16 @@ class LOGIN(GPS303Pkt):
     PROTO = 0x01
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
     PROTO = 0x01
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
+    IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
 
     def in_decode(self, length: int, payload: bytes) -> None:
 
     def in_decode(self, length: int, payload: bytes) -> None:
-        self.imei = payload[:-1].hex()
-        self.ver = unpack("B", payload[-1:])[0]
+        self.imei = payload[:8].ljust(8, b"\0").hex()
+        self.ver = payload[8]
+
+    def in_encode(self) -> bytes:
+        return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
+            "B", self.ver
+        )
 
 
 class SUPERVISION(GPS303Pkt):
 
 
 class SUPERVISION(GPS303Pkt):
@@ -305,6 +384,13 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
 class STATUS(GPS303Pkt):
     PROTO = 0x13
     RESPOND = Respond.EXT
 class STATUS(GPS303Pkt):
     PROTO = 0x13
     RESPOND = Respond.EXT
+    IN_KWARGS = (
+        ("batt", int, 100),
+        ("ver", int, 0),
+        ("timezone", int, 0),
+        ("intvl", int, 0),
+        ("signal", maybe(int), None),
+    )
     OUT_KWARGS = (("upload_interval", int, 25),)
 
     def in_decode(self, length: int, payload: bytes) -> None:
     OUT_KWARGS = (("upload_interval", int, 25),)
 
     def in_decode(self, length: int, payload: bytes) -> None:
@@ -316,6 +402,11 @@ class STATUS(GPS303Pkt):
         else:
             self.signal = None
 
         else:
             self.signal = None
 
+    def in_encode(self) -> bytes:
+        return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
+            b"" if self.signal is None else pack("B", self.signal)
+        )
+
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
@@ -323,6 +414,9 @@ class STATUS(GPS303Pkt):
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
+    def in_encode(self) -> bytes:
+        return b""
+
 
 class RESET(GPS303Pkt):
     # Device sends when it got reset SMS
 
 class RESET(GPS303Pkt):
     # Device sends when it got reset SMS
@@ -339,6 +433,15 @@ class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
+    IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
+        # IN_KWARGS = (
+        ("dtime", bytes, b"\0\0\0\0\0\0"),
+        ("wifi_aps", list, []),
+        ("mcc", int, 0),
+        ("mnc", int, 0),
+        ("gsm_cells", list, []),
+    )
+
     def in_decode(self, length: int, payload: bytes) -> None:
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
     def in_decode(self, length: int, payload: bytes) -> None:
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
@@ -363,6 +466,28 @@ class _WIFI_POSITIONING(GPS303Pkt):
             )
             self.gsm_cells.append((locac, cellid, -sigstr))
 
             )
             self.gsm_cells.append((locac, cellid, -sigstr))
 
+    def in_encode(self) -> bytes:
+        self.length = len(self.wifi_aps)
+        return b"".join(
+            [
+                self.dtime,
+                b"".join(
+                    [
+                        bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
+                        + pack("B", -sigstr)
+                        for mac, sigstr in self.wifi_aps
+                    ]
+                ),
+                pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
+                b"".join(
+                    [
+                        pack("!HHB", locac, cellid, -sigstr)
+                        for locac, cellid, sigstr in self.gsm_cells
+                    ]
+                ),
+            ]
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -391,17 +516,28 @@ class PROHIBIT_LBS(GPS303Pkt):
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
-    # Data is in packed decimal
-    # 00/01 - GPS on/off
-    # 00/01 - Don't set / Set upload period
-    # HHMMHHMM - Upload period
-    # 00/01 - LBS on/off
-    # 00/01 - Don't set / Set time of boot
-    # HHMM  - Time of boot
-    # 00/01 - Don't set / Set time of shutdown
-    # HHMM  - Time of shutdown
+    OUT_KWARGS = (
+        ("gps_off", boolx, False),  # Clarify the meaning of 0/1
+        ("gps_interval_set", boolx, False),
+        ("gps_interval", hhmmhhmm, "00000000"),
+        ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
+        ("boot_time_set", boolx, False),
+        ("boot_time", hhmm, "0000"),
+        ("shut_time_set", boolx, False),
+        ("shut_time", hhmm, "0000"),
+    )
+
     def out_encode(self) -> bytes:
     def out_encode(self) -> bytes:
-        return b""  # TODO
+        return (
+            pack("B", self.gps_off)
+            + pack("B", self.gps_interval_set)
+            + bytes.fromhex(self.gps_interval)
+            + pack("B", self.lbs_off)
+            + pack("B", self.boot_time_set)
+            + bytes.fromhex(self.boot_time)
+            + pack("B", self.shut_time_set)
+            + bytes.fromhex(self.shut_time)
+        )
 
 
 class _SET_PHONE(GPS303Pkt):
 
 
 class _SET_PHONE(GPS303Pkt):
@@ -492,12 +628,15 @@ class DEVICE(GPS303Pkt):
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
+    OUT_KWARGS: Tuple[
+        Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
+    ] = (
+        ("alarms", l3alarms, []),
+    )
 
     def out_encode(self) -> bytes:
 
     def out_encode(self) -> bytes:
-        # TODO implement parsing kwargs
-        alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
         return b"".join(
         return b"".join(
-            pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
+            pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
         )
 
 
         )
 
 
@@ -545,6 +684,9 @@ class SETUP(GPS303Pkt):
             + [b";".join([el.encode() for el in self.phonenumbers])]
         )
 
             + [b";".join([el.encode() for el in self.phonenumbers])]
         )
 
+    def in_encode(self) -> bytes:
+        return b""
+
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
     PROTO = 0x58
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
     PROTO = 0x58
@@ -644,7 +786,9 @@ if True:  # just to indent the code, sorry!
             PROTOS[cls.__name__] = cls.PROTO
 
 
             PROTOS[cls.__name__] = cls.PROTO
 
 
-def class_by_prefix(prefix: str) -> Union[type, List[Tuple[str, int]]]:
+def class_by_prefix(
+    prefix: str,
+) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
@@ -678,7 +822,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
-        cause: Union[DecodeError, ValueError] = ValueError(
+        cause: Union[DecodeError, ValueError, IndexError] = ValueError(
             f"Proto {proto} is unknown"
         )
     else:
             f"Proto {proto} is unknown"
         )
     else:
@@ -687,7 +831,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
-        except DecodeError as e:
+        except (DecodeError, ValueError, IndexError) as e:
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)