-
-__all__ = "Bcast", "Resp"
-
-def pack_peer(peeraddr):
- saddr, port = peeraddr
- addr = ip.ip_address(saddr)
- return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
-
-def unpack_peer(buffer):
- version = buffer[0]
- if version not in (4, 6):
- return None
- if version == 4:
- addr = ip.IPv4Address(buffer[1:5])
- else:
- addr = ip.IPv6Address(buffer[1:17])
- port = unpack("!H", buffer[17:19])[0]
- return (addr, port)
+from typing import Any, cast, Optional, Tuple, Type, Union
+
+__all__ = "Bcast", "Resp", "topic"
+
+
+def pack_peer(
+ peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
+) -> bytes:
+ if peeraddr is None:
+ addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
+ port = 0
+ elif len(peeraddr) == 2:
+ peeraddr = cast(Tuple[str, int], peeraddr)
+ saddr, port = peeraddr
+ addr = ip.ip_address(saddr)
+ elif len(peeraddr) == 4:
+ peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
+ saddr, port, _x, _y = peeraddr
+ addr = ip.ip_address(saddr)
+ if isinstance(addr, ip.IPv4Address):
+ addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
+ return addr.packed + pack("!H", port)
+
+
+def unpack_peer(
+ buffer: bytes,
+) -> Tuple[str, int]:
+ a6 = ip.IPv6Address(buffer[:16])
+ port = unpack("!H", buffer[16:])[0]
+ a4 = a6.ipv4_mapped
+ if a4 is not None:
+ return (str(a4), port)
+ elif a6 == ip.IPv6Address("::"):
+ return ("", 0)
+ return (str(a6), port)