X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fgps303proto.py;h=b0259a49631e1573af22825fad081ece6030d28f;hb=e3a66f35e7850bfbf507b0f50dcd6d365dc0afe4;hp=27733a3fa48f246212d7d86e33a0106348ec651c;hpb=ba20de1b9666bd56818d5f951b5a9d2fc98af133;p=loctrkd.git diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 27733a3..b0259a4 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -22,12 +22,11 @@ from struct import pack, unpack __all__ = ( "class_by_prefix", - "handle_packet", "inline_response", "make_object", - "make_response", "parse_message", "proto_by_name", + "Dir", "GPS303Pkt", "UNKNOWN", "LOGIN", @@ -67,7 +66,7 @@ class Dir(Enum): class GPS303Pkt: PROTO: int - DIR = Dir.INLINE # Most packets anticipate simple acknowledgement + DIR = Dir.IN # Do not send anything back by default def __init__(self, *args, **kwargs): assert len(args) == 0 @@ -114,11 +113,11 @@ class GPS303Pkt: class UNKNOWN(GPS303Pkt): PROTO = 256 # > 255 is impossible in real packets - DIR = Dir.IN class LOGIN(GPS303Pkt): PROTO = 0x01 + DIR = Dir.INLINE # Default response for ACK, can also respond with STOP_UPLOAD @classmethod @@ -143,9 +142,12 @@ class SUPERVISION(GPS303Pkt): class HEARTBEAT(GPS303Pkt): PROTO = 0x08 + DIR = Dir.INLINE class _GPS_POSITIONING(GPS303Pkt): + DIR = Dir.INLINE + @classmethod def from_packet(cls, length, payload): self = super().from_packet(length, payload) @@ -213,7 +215,7 @@ class STATUS(GPS303Pkt): class HIBERNATION(GPS303Pkt): PROTO = 0x14 - DIR = Dir.EXT + DIR = Dir.INLINE @classmethod def response(cls): # Server can send to send devicee to sleep @@ -222,7 +224,6 @@ class HIBERNATION(GPS303Pkt): class RESET(GPS303Pkt): # Device sends when it got reset SMS PROTO = 0x15 - DIR = Dir.EXT @classmethod def response(cls): # Server can send to initiate factory reset @@ -269,6 +270,7 @@ class _WIFI_POSITIONING(GPS303Pkt): class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): PROTO = 0x17 + DIR = Dir.INLINE @classmethod def inline_response(cls, packet): @@ -279,6 +281,7 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): class TIME(GPS303Pkt): PROTO = 0x30 + DIR = Dir.INLINE @classmethod def inline_response(cls, packet): @@ -416,6 +419,7 @@ class STOP_ALARM(GPS303Pkt): def from_packet(cls, length, payload): self = super().from_packet(length, payload) self.flag = payload[0] + return self class SETUP(GPS303Pkt): @@ -483,12 +487,12 @@ class WIFI_POSITIONING(_WIFI_POSITIONING): class MANUAL_POSITIONING(GPS303Pkt): PROTO = 0x80 - DIR = Dir.EXT + DIR = Dir.OUT @classmethod def from_packet(cls, length, payload): self = super().from_packet(length, payload) - self.flag = payload[0] + self.flag = payload[0] if len(payload) > 0 else None self.reason = { 1: "Incorrect time", 2: "LBS less", @@ -498,6 +502,7 @@ class MANUAL_POSITIONING(GPS303Pkt): 6: "LBS prohibited, WiFi absent", 7: "GPS spacing < 50 m", }.get(self.flag, "Unknown") + return self @classmethod def response(cls): @@ -556,8 +561,11 @@ if True: # just to indent the code, sorry! def class_by_prefix(prefix): - lst = [(name, proto) for name, proto in PROTOS.items() - if name.upper().startswith(prefix.upper())] + lst = [ + (name, proto) + for name, proto in PROTOS.items() + if name.upper().startswith(prefix.upper()) + ] if len(lst) != 1: return lst _, proto = lst[0] @@ -606,14 +614,3 @@ def parse_message(packet): adjust, ) return make_object(length, proto, payload) - - -def handle_packet(packet): # DEPRECATED - if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n": - return UNKNOWN.from_packet(len(packet), packet) - return parse_message(packet[2:-2]) - - -def make_response(msg, **kwargs): # DEPRECATED - inframe = msg.response(**kwargs) - return None if inframe is None else b"xx" + inframe + b"\r\n"