]> average.org Git - loctrkd.git/commitdiff
typing: make zmsg.py typecheck
authorEugene Crosser <crosser@average.org>
Fri, 27 May 2022 20:51:09 +0000 (22:51 +0200)
committerEugene Crosser <crosser@average.org>
Fri, 27 May 2022 21:18:40 +0000 (23:18 +0200)
gps303/gps303proto.py
gps303/storage.py
gps303/zmsg.py

index 8036d65e92a1a2e789b16b68774817c7a303a13c..b70cacf1eb328571171272657c92c97d0f04b2f1 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from typing import Any, Callable, Tuple
 
 __all__ = (
     "class_by_prefix",
@@ -153,8 +154,8 @@ class Respond(Enum):
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
-    IN_KWARGS = ()
-    OUT_KWARGS = ()
+    IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
+    OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
 
     def __init__(self, *args, **kwargs):
         """
index 5368efc575e9910bae5da767c0d1adcede239624..14afc49e1fde494f5aebcf96bb2582f572fe1de7 100644 (file)
@@ -32,14 +32,9 @@ def runserver(conf):
                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
                 zmsg.packet.hex(),
             )
-            if zmsg.peeraddr is not None:
-                addr, port = zmsg.peeraddr
-                peeraddr = str((str(addr), port))
-            else:
-                peeraddr = None
             stow(
                 is_incoming=zmsg.is_incoming,
-                peeraddr=peeraddr,
+                peeraddr=str(zmsg.peeraddr),
                 when=zmsg.when,
                 imei=zmsg.imei,
                 proto=proto_of_message(zmsg.packet),
index 1fe18123c441739a9891b1433ec8bc276b5c876a..73a4f801aa273079540ef9a83441b5d166feecbb 100644 (file)
@@ -2,36 +2,47 @@
 
 import ipaddress as ip
 from struct import pack, unpack
+from typing import Any, cast, Optional, Tuple, Type, Union
 
 __all__ = "Bcast", "Resp", "topic"
 
 
-def pack_peer(peeraddr):
-    try:
-        if peeraddr is None:
-            saddr = "::"
-            port = 0
-        else:
-            saddr, port, _x, _y = peeraddr
-        addr = ip.ip_address(saddr)
-    except ValueError:
+def pack_peer(
+    peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
+) -> bytes:
+    if peeraddr is None:
+        addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
+        port = 0
+    elif len(peeraddr) == 2:
+        peeraddr = cast(Tuple[str, int], peeraddr)
         saddr, port = peeraddr
-        a4 = ip.ip_address(saddr)
-        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
+        addr = ip.ip_address(saddr)
+    elif len(peeraddr) == 4:
+        peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
+        saddr, port, _x, _y = peeraddr
+        addr = ip.ip_address(saddr)
+    if isinstance(addr, ip.IPv4Address):
+        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
     return addr.packed + pack("!H", port)
 
 
-def unpack_peer(buffer):
+def unpack_peer(
+    buffer: bytes,
+) -> Tuple[str, int]:
     a6 = ip.IPv6Address(buffer[:16])
     port = unpack("!H", buffer[16:])[0]
-    addr = a6.ipv4_mapped
-    if addr is None:
-        addr = a6
-    return (addr, port)
+    a4 = a6.ipv4_mapped
+    if a4 is not None:
+        return (str(a4), port)
+    elif a6 == ip.IPv6Address("::"):
+        return ("", 0)
+    return (str(a6), port)
 
 
 class _Zmsg:
-    def __init__(self, *args, **kwargs):
+    KWARGS: Tuple[Tuple[str, Any], ...]
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         if len(args) == 1:
             self.decode(args[0])
         elif bool(kwargs):
@@ -46,7 +57,7 @@ class _Zmsg:
                 + str(kwargs)
             )
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({})".format(
             self.__class__.__name__,
             ", ".join(
@@ -62,24 +73,28 @@ class _Zmsg:
             ),
         )
 
-    def __eq__(self, other):
-        return all(
-            [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
-        )
+    def __eq__(self, other: object) -> bool:
+        if isinstance(other, self.__class__):
+            return all(
+                [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+            )
+        return NotImplemented
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `decode()` method"
         )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `packed()` property"
         )
 
 
-def topic(proto, is_incoming=True, imei=None):
+def topic(
+    proto: int, is_incoming: bool = True, imei: Optional[str] = None
+) -> bytes:
     return pack("BB", is_incoming, proto) + (
         b"" if imei is None else pack("16s", imei.encode())
     )
@@ -98,7 +113,7 @@ class Bcast(_Zmsg):
     )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
             pack(
                 "BB16s",
@@ -117,10 +132,10 @@ class Bcast(_Zmsg):
             + self.packet
         )
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         self.is_incoming = bool(buffer[0])
         self.proto = buffer[1]
-        self.imei = buffer[2:18].decode()
+        self.imei: Optional[str] = buffer[2:18].decode()
         if self.imei == "0000000000000000":
             self.imei = None
         self.when = unpack("!d", buffer[18:26])[0]
@@ -134,7 +149,7 @@ class Resp(_Zmsg):
     KWARGS = (("imei", None), ("when", None), ("packet", b""))
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
             pack(
                 "16s",
@@ -150,7 +165,7 @@ class Resp(_Zmsg):
             + self.packet
         )
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         self.imei = buffer[:16].decode()
         self.when = unpack("!d", buffer[16:24])[0]
         self.packet = buffer[24:]