]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
test: use IPv4 because github actions
[loctrkd.git] / gps303 / gps303proto.py
index da5796f988eeb281f4fa2e85c64e55e12c7e44f1..0d02b082c18688777aa9fa4a7bbbbc0ae3727276 100755 (executable)
@@ -18,7 +18,17 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
-from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Tuple,
+    Type,
+    TYPE_CHECKING,
+    Union,
+)
 
 __all__ = (
     "class_by_prefix",
 
 __all__ = (
     "class_by_prefix",
@@ -75,12 +85,26 @@ class DecodeError(Exception):
             setattr(self, k, v)
 
 
             setattr(self, k, v)
 
 
+def maybe_int(x: Optional[int]) -> Optional[int]:
+    return None if x is None else int(x)
+
+
 def intx(x: Union[str, int]) -> int:
     if isinstance(x, str):
         x = int(x, 0)
     return x
 
 
 def intx(x: Union[str, int]) -> int:
     if isinstance(x, str):
         x = int(x, 0)
     return x
 
 
+def boolx(x: Union[str, bool]) -> bool:
+    if isinstance(x, str):
+        if x.upper() in ("ON", "TRUE", "1"):
+            return True
+        if x.upper() in ("OFF", "FALSE", "0"):
+            return False
+        raise ValueError(str(x) + " could not be parsed as a Boolean")
+    return x
+
+
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
@@ -92,17 +116,55 @@ def hhmm(x: str) -> str:
     return x
 
 
     return x
 
 
+def hhmmhhmm(x: str) -> str:
+    """Check for the string that represents hours and minutes twice"""
+    if not isinstance(x, str) or len(x) != 8:
+        raise ValueError(str(x) + " is not an eight-character string")
+    return hhmm(x[:4]) + hhmm(x[4:])
+
+
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
+def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
+    def alrmspec(sub: str) -> Tuple[int, str]:
+        if len(sub) != 7:
+            raise ValueError(sub + " does not represent day and time")
+        return (
+            {
+                "MON": 1,
+                "TUE": 2,
+                "WED": 3,
+                "THU": 4,
+                "FRI": 5,
+                "SAT": 6,
+                "SUN": 7,
+            }[sub[:3].upper()],
+            sub[3:],
+        )
+
+    if isinstance(x, str):
+        lx = [alrmspec(sub) for sub in x.split(",")]
+    else:
+        lx = x
+    lx.extend([(0, "0000") for _ in range(3 - len(lx))])
+    if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
+        raise ValueError(str(lx) + " is a wrong alarms specification")
+    return [(d, hhmm(tm)) for d, tm in lx]
+
+
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
@@ -120,37 +182,40 @@ class MetaPkt(type):
     respectively.
     """
 
     respectively.
     """
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __new__(
     def __new__(
-        cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
+        cls: Type["MetaPkt"],
+        name: str,
+        bases: Tuple[type, ...],
+        attrs: Dict[str, Any],
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
-        setattr(
-            newcls,
-            "In",
-            super().__new__(
-                cls,
-                name + ".In",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.IN_KWARGS,
-                    "decode": newcls.in_decode,
-                    "encode": newcls.in_encode,
-                },
-            ),
+        newcls.In = super().__new__(
+            cls,
+            name + ".In",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.IN_KWARGS,
+                "decode": newcls.in_decode,
+                "encode": newcls.in_encode,
+            },
         )
         )
-        setattr(
-            newcls,
-            "Out",
-            super().__new__(
-                cls,
-                name + ".Out",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.OUT_KWARGS,
-                    "decode": newcls.out_decode,
-                    "encode": newcls.out_encode,
-                },
-            ),
+        newcls.Out = super().__new__(
+            cls,
+            name + ".Out",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.OUT_KWARGS,
+                "decode": newcls.out_decode,
+                "encode": newcls.out_encode,
+            },
         )
         return newcls
 
         )
         return newcls
 
@@ -170,6 +235,14 @@ class GPS303Pkt(metaclass=MetaPkt):
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
@@ -242,10 +315,16 @@ class LOGIN(GPS303Pkt):
     PROTO = 0x01
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
     PROTO = 0x01
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
+    IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
 
     def in_decode(self, length: int, payload: bytes) -> None:
 
     def in_decode(self, length: int, payload: bytes) -> None:
-        self.imei = payload[:-1].hex()
-        self.ver = unpack("B", payload[-1:])[0]
+        self.imei = payload[:8].ljust(8, b"\0").hex()
+        self.ver = payload[8]
+
+    def in_encode(self) -> bytes:
+        return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
+            "B", self.ver
+        )
 
 
 class SUPERVISION(GPS303Pkt):
 
 
 class SUPERVISION(GPS303Pkt):
@@ -305,6 +384,13 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
 class STATUS(GPS303Pkt):
     PROTO = 0x13
     RESPOND = Respond.EXT
 class STATUS(GPS303Pkt):
     PROTO = 0x13
     RESPOND = Respond.EXT
+    IN_KWARGS = (
+        ("batt", int, 100),
+        ("ver", int, 0),
+        ("timezone", int, 0),
+        ("intvl", int, 0),
+        ("signal", maybe_int, None),
+    )
     OUT_KWARGS = (("upload_interval", int, 25),)
 
     def in_decode(self, length: int, payload: bytes) -> None:
     OUT_KWARGS = (("upload_interval", int, 25),)
 
     def in_decode(self, length: int, payload: bytes) -> None:
@@ -316,6 +402,13 @@ class STATUS(GPS303Pkt):
         else:
             self.signal = None
 
         else:
             self.signal = None
 
+    def in_encode(self) -> bytes:
+        return (
+            pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
+            if self.signal is None
+            else pack("B", self.signal)
+        )
+
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
@@ -323,6 +416,9 @@ class STATUS(GPS303Pkt):
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
+    def in_encode(self) -> bytes:
+        return b""
+
 
 class RESET(GPS303Pkt):
     # Device sends when it got reset SMS
 
 class RESET(GPS303Pkt):
     # Device sends when it got reset SMS
@@ -391,17 +487,28 @@ class PROHIBIT_LBS(GPS303Pkt):
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
-    # Data is in packed decimal
-    # 00/01 - GPS on/off
-    # 00/01 - Don't set / Set upload period
-    # HHMMHHMM - Upload period
-    # 00/01 - LBS on/off
-    # 00/01 - Don't set / Set time of boot
-    # HHMM  - Time of boot
-    # 00/01 - Don't set / Set time of shutdown
-    # HHMM  - Time of shutdown
+    OUT_KWARGS = (
+        ("gps_off", boolx, False),  # Clarify the meaning of 0/1
+        ("gps_interval_set", boolx, False),
+        ("gps_interval", hhmmhhmm, "00000000"),
+        ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
+        ("boot_time_set", boolx, False),
+        ("boot_time", hhmm, "0000"),
+        ("shut_time_set", boolx, False),
+        ("shut_time", hhmm, "0000"),
+    )
+
     def out_encode(self) -> bytes:
     def out_encode(self) -> bytes:
-        return b""  # TODO
+        return (
+            pack("B", self.gps_off)
+            + pack("B", self.gps_interval_set)
+            + bytes.fromhex(self.gps_interval)
+            + pack("B", self.lbs_off)
+            + pack("B", self.boot_time_set)
+            + bytes.fromhex(self.boot_time)
+            + pack("B", self.shut_time_set)
+            + bytes.fromhex(self.shut_time)
+        )
 
 
 class _SET_PHONE(GPS303Pkt):
 
 
 class _SET_PHONE(GPS303Pkt):
@@ -492,12 +599,15 @@ class DEVICE(GPS303Pkt):
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
+    OUT_KWARGS: Tuple[
+        Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
+    ] = (
+        ("alarms", l3alarms, []),
+    )
 
     def out_encode(self) -> bytes:
 
     def out_encode(self) -> bytes:
-        # TODO implement parsing kwargs
-        alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
         return b"".join(
         return b"".join(
-            pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
+            pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
         )
 
 
         )
 
 
@@ -644,7 +754,9 @@ if True:  # just to indent the code, sorry!
             PROTOS[cls.__name__] = cls.PROTO
 
 
             PROTOS[cls.__name__] = cls.PROTO
 
 
-def class_by_prefix(prefix: str) -> Union[type, List[Tuple[str, int]]]:
+def class_by_prefix(
+    prefix: str,
+) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
@@ -678,7 +790,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
-        cause: Union[DecodeError, ValueError] = ValueError(
+        cause: Union[DecodeError, ValueError, IndexError] = ValueError(
             f"Proto {proto} is unknown"
         )
     else:
             f"Proto {proto} is unknown"
         )
     else:
@@ -687,7 +799,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
-        except DecodeError as e:
+        except (DecodeError, ValueError, IndexError) as e:
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)