from datetime import datetime, timezone
from enum import Enum
from inspect import isclass
-from struct import pack, unpack
+from struct import error, pack, unpack
__all__ = (
"class_by_prefix",
"inline_response",
"parse_message",
"proto_by_name",
+ "DecodeError",
"Respond",
"GPS303Pkt",
"UNKNOWN",
)
+class DecodeError(Exception):
+ def __init__(self, e, **kwargs):
+ super().__init__(e)
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
def intx(x):
if isinstance(x, str):
x = int(x, 0)
assert not args or (len(args) == 2 and not kwargs)
if args: # guaranteed to be two arguments at this point
self.length, self.payload = args
- self.decode(self.length, self.payload)
+ try:
+ self.decode(self.length, self.payload)
+ except error as e:
+ raise DecodeError(e, obj=self)
else:
for kw, typ, dfl in self.KWARGS:
setattr(self, kw, typ(kwargs.pop(kw, dfl)))
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])
payload = packet[2:]
- if proto in CLASSES:
- if is_incoming:
- return CLASSES[proto].In(length, payload)
- else:
- return CLASSES[proto].Out(length, payload)
+ if proto not in CLASSES:
+ cause = ValueError(f"Proto {proto} is unknown")
else:
+ try:
+ if is_incoming:
+ return CLASSES[proto].In(length, payload)
+ else:
+ return CLASSES[proto].Out(length, payload)
+ except DecodeError as e:
+ cause = e
+ if is_incoming:
retobj = UNKNOWN.In(length, payload)
- retobj.PROTO = proto # Override class attr with object attr
- return retobj
+ else:
+ retobj = UNKNOWN.Out(length, payload)
+ retobj.PROTO = proto # Override class attr with object attr
+ retobj.cause = cause
+ return retobj