]> average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
test: message fuzzer and fix found problems
[loctrkd.git] / gps303 / gps303proto.py
index da5796f988eeb281f4fa2e85c64e55e12c7e44f1..2d777d94fa5bfbec0b4c4b15192a0a56f9427954 100755 (executable)
@@ -18,7 +18,17 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
-from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Tuple,
+    Type,
+    TYPE_CHECKING,
+    Union,
+)
 
 __all__ = (
     "class_by_prefix",
 
 __all__ = (
     "class_by_prefix",
@@ -81,6 +91,16 @@ def intx(x: Union[str, int]) -> int:
     return x
 
 
     return x
 
 
+def boolx(x: Union[str, bool]) -> bool:
+    if isinstance(x, str):
+        if x.upper() in ("ON", "TRUE", "1"):
+            return True
+        if x.upper() in ("OFF", "FALSE", "0"):
+            return False
+        raise ValueError(str(x) + " could not be parsed as a Boolean")
+    return x
+
+
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
 def hhmm(x: str) -> str:
     """Check for the string that represents hours and minutes"""
     if not isinstance(x, str) or len(x) != 4:
@@ -92,17 +112,55 @@ def hhmm(x: str) -> str:
     return x
 
 
     return x
 
 
+def hhmmhhmm(x: str) -> str:
+    """Check for the string that represents hours and minutes twice"""
+    if not isinstance(x, str) or len(x) != 8:
+        raise ValueError(str(x) + " is not an eight-character string")
+    return hhmm(x[:4]) + hhmm(x[4:])
+
+
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
 def l3str(x: Union[str, List[str]]) -> List[str]:
     if isinstance(x, str):
         lx = x.split(",")
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
         raise ValueError(str(lx) + " is not a list of three strings")
     return lx
 
 
+def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
+    def alrmspec(sub: str) -> Tuple[int, str]:
+        if len(sub) != 7:
+            raise ValueError(sub + " does not represent day and time")
+        return (
+            {
+                "MON": 1,
+                "TUE": 2,
+                "WED": 3,
+                "THU": 4,
+                "FRI": 5,
+                "SAT": 6,
+                "SUN": 7,
+            }[sub[:3].upper()],
+            sub[3:],
+        )
+
+    if isinstance(x, str):
+        lx = [alrmspec(sub) for sub in x.split(",")]
+    else:
+        lx = x
+    lx.extend([(0, "0000") for _ in range(3 - len(lx))])
+    if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
+        raise ValueError(str(lx) + " is a wrong alarms specification")
+    return [(d, hhmm(tm)) for d, tm in lx]
+
+
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
 def l3int(x: Union[str, List[int]]) -> List[int]:
     if isinstance(x, str):
         lx = [int(el) for el in x.split(",")]
+    else:
+        lx = x
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
         raise ValueError(str(lx) + " is not a list of three integers")
     return lx
@@ -120,37 +178,40 @@ class MetaPkt(type):
     respectively.
     """
 
     respectively.
     """
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __new__(
     def __new__(
-        cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
+        cls: Type["MetaPkt"],
+        name: str,
+        bases: Tuple[type, ...],
+        attrs: Dict[str, Any],
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
     ) -> "MetaPkt":
         newcls = super().__new__(cls, name, bases, attrs)
-        setattr(
-            newcls,
-            "In",
-            super().__new__(
-                cls,
-                name + ".In",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.IN_KWARGS,
-                    "decode": newcls.in_decode,
-                    "encode": newcls.in_encode,
-                },
-            ),
+        newcls.In = super().__new__(
+            cls,
+            name + ".In",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.IN_KWARGS,
+                "decode": newcls.in_decode,
+                "encode": newcls.in_encode,
+            },
         )
         )
-        setattr(
-            newcls,
-            "Out",
-            super().__new__(
-                cls,
-                name + ".Out",
-                (newcls,) + bases,
-                {
-                    "KWARGS": newcls.OUT_KWARGS,
-                    "decode": newcls.out_decode,
-                    "encode": newcls.out_encode,
-                },
-            ),
+        newcls.Out = super().__new__(
+            cls,
+            name + ".Out",
+            (newcls,) + bases,
+            {
+                "KWARGS": newcls.OUT_KWARGS,
+                "decode": newcls.out_decode,
+                "encode": newcls.out_encode,
+            },
         )
         return newcls
 
         )
         return newcls
 
@@ -170,6 +231,14 @@ class GPS303Pkt(metaclass=MetaPkt):
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
     In: Type["GPS303Pkt"]
     Out: Type["GPS303Pkt"]
 
+    if TYPE_CHECKING:
+
+        def __getattr__(self, name: str) -> Any:
+            pass
+
+        def __setattr__(self, name: str, value: Any) -> None:
+            pass
+
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
     def __init__(self, *args: Any, **kwargs: Any):
         """
         Construct the object _either_ from (length, payload),
@@ -244,8 +313,8 @@ class LOGIN(GPS303Pkt):
     # Default response for ACK, can also respond with STOP_UPLOAD
 
     def in_decode(self, length: int, payload: bytes) -> None:
     # Default response for ACK, can also respond with STOP_UPLOAD
 
     def in_decode(self, length: int, payload: bytes) -> None:
-        self.imei = payload[:-1].hex()
-        self.ver = unpack("B", payload[-1:])[0]
+        self.imei = payload[:16].hex()
+        self.ver = payload[16]
 
 
 class SUPERVISION(GPS303Pkt):
 
 
 class SUPERVISION(GPS303Pkt):
@@ -391,17 +460,28 @@ class PROHIBIT_LBS(GPS303Pkt):
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
     PROTO = 0x34
 
-    # Data is in packed decimal
-    # 00/01 - GPS on/off
-    # 00/01 - Don't set / Set upload period
-    # HHMMHHMM - Upload period
-    # 00/01 - LBS on/off
-    # 00/01 - Don't set / Set time of boot
-    # HHMM  - Time of boot
-    # 00/01 - Don't set / Set time of shutdown
-    # HHMM  - Time of shutdown
+    OUT_KWARGS = (
+        ("gps_off", boolx, False),  # Clarify the meaning of 0/1
+        ("gps_interval_set", boolx, False),
+        ("gps_interval", hhmmhhmm, "00000000"),
+        ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
+        ("boot_time_set", boolx, False),
+        ("boot_time", hhmm, "0000"),
+        ("shut_time_set", boolx, False),
+        ("shut_time", hhmm, "0000"),
+    )
+
     def out_encode(self) -> bytes:
     def out_encode(self) -> bytes:
-        return b""  # TODO
+        return (
+            pack("B", self.gps_off)
+            + pack("B", self.gps_interval_set)
+            + bytes.fromhex(self.gps_interval)
+            + pack("B", self.lbs_off)
+            + pack("B", self.boot_time_set)
+            + bytes.fromhex(self.boot_time)
+            + pack("B", self.shut_time_set)
+            + bytes.fromhex(self.shut_time)
+        )
 
 
 class _SET_PHONE(GPS303Pkt):
 
 
 class _SET_PHONE(GPS303Pkt):
@@ -492,12 +572,15 @@ class DEVICE(GPS303Pkt):
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
 
 class ALARM_CLOCK(GPS303Pkt):
     PROTO = 0x50
+    OUT_KWARGS: Tuple[
+        Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
+    ] = (
+        ("alarms", l3alarms, []),
+    )
 
     def out_encode(self) -> bytes:
 
     def out_encode(self) -> bytes:
-        # TODO implement parsing kwargs
-        alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
         return b"".join(
         return b"".join(
-            pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
+            pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
         )
 
 
         )
 
 
@@ -644,7 +727,9 @@ if True:  # just to indent the code, sorry!
             PROTOS[cls.__name__] = cls.PROTO
 
 
             PROTOS[cls.__name__] = cls.PROTO
 
 
-def class_by_prefix(prefix: str) -> Union[type, List[Tuple[str, int]]]:
+def class_by_prefix(
+    prefix: str,
+) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
@@ -678,7 +763,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto not in CLASSES:
-        cause: Union[DecodeError, ValueError] = ValueError(
+        cause: Union[DecodeError, ValueError, IndexError] = ValueError(
             f"Proto {proto} is unknown"
         )
     else:
             f"Proto {proto} is unknown"
         )
     else:
@@ -687,7 +772,7 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
                 return CLASSES[proto].In(length, payload)
             else:
                 return CLASSES[proto].Out(length, payload)
-        except DecodeError as e:
+        except (DecodeError, ValueError, IndexError) as e:
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)
             cause = e
     if is_incoming:
         retobj = UNKNOWN.In(length, payload)