X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fgps303proto.py;h=dcc335b184fc6fa59249ce2c692f37c54d1587d5;hb=faa8ce9d87530105ec2ad2f4809a9ace581a2ad6;hp=acfaea63c272aa74bc676dcac90765d027c8ddd2;hpb=3f74a195c346809d3075b1351705eb9ad543afd5;p=loctrkd.git diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index acfaea6..dcc335b 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -21,12 +21,14 @@ from logging import getLogger from struct import pack, unpack __all__ = ( + "class_by_prefix", "handle_packet", "inline_response", "make_object", "make_response", "parse_message", "proto_by_name", + "Dir", "GPS303Pkt", "UNKNOWN", "LOGIN", @@ -66,7 +68,7 @@ class Dir(Enum): class GPS303Pkt: PROTO: int - DIR = Dir.INLINE # Most packets anticipate simple acknowledgement + DIR = Dir.IN # Do not send anything back by default def __init__(self, *args, **kwargs): assert len(args) == 0 @@ -113,11 +115,11 @@ class GPS303Pkt: class UNKNOWN(GPS303Pkt): PROTO = 256 # > 255 is impossible in real packets - DIR = Dir.IN class LOGIN(GPS303Pkt): PROTO = 0x01 + DIR = Dir.INLINE # Default response for ACK, can also respond with STOP_UPLOAD @classmethod @@ -132,18 +134,22 @@ class SUPERVISION(GPS303Pkt): PROTO = 0x05 DIR = Dir.OUT - def response(self, status=0): + @classmethod + def response(cls, status=0): # 1: The device automatically answers Pickup effect # 2: Automatically Answering Two-way Calls # 3: Ring manually answer the two-way call - return self.make_packet(pack("B", status)) + return cls.make_packet(pack("B", status)) class HEARTBEAT(GPS303Pkt): PROTO = 0x08 + DIR = Dir.INLINE class _GPS_POSITIONING(GPS303Pkt): + DIR = Dir.INLINE + @classmethod def from_packet(cls, length, payload): self = super().from_packet(length, payload) @@ -204,32 +210,35 @@ class STATUS(GPS303Pkt): self.signal = None return self - def response(self, upload_interval=25): # Set interval in minutes - return self.make_packet(pack("B", upload_interval)) + @classmethod + def response(cls, upload_interval=25): # Set interval in minutes + return cls.make_packet(pack("B", upload_interval)) class HIBERNATION(GPS303Pkt): PROTO = 0x14 - DIR = Dir.EXT + DIR = Dir.INLINE - def response(self): # Server can send to send devicee to sleep - return self.make_packet(b"") + @classmethod + def response(cls): # Server can send to send devicee to sleep + return cls.make_packet(b"") class RESET(GPS303Pkt): # Device sends when it got reset SMS PROTO = 0x15 - DIR = Dir.EXT - def response(self): # Server can send to initiate factory reset - return self.make_packet(b"") + @classmethod + def response(cls): # Server can send to initiate factory reset + return cls.make_packet(b"") class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58) PROTO = 0x16 DIR = Dir.OUT - def response(self, number=3): # Number of whitelist entries - return self.make_packet(pack("B", number)) + @classmethod + def response(cls, number=3): # Number of whitelist entries + return cls.make_packet(pack("B", number)) class _WIFI_POSITIONING(GPS303Pkt): @@ -263,6 +272,7 @@ class _WIFI_POSITIONING(GPS303Pkt): class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): PROTO = 0x17 + DIR = Dir.INLINE @classmethod def inline_response(cls, packet): @@ -273,6 +283,7 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): class TIME(GPS303Pkt): PROTO = 0x30 + DIR = Dir.INLINE @classmethod def inline_response(cls, packet): @@ -285,15 +296,17 @@ class PROHIBIT_LBS(GPS303Pkt): PROTO = 0x33 DIR = Dir.OUT - def response(self, status=1): # Server sent, 0-off, 1-on - return self.make_packet(pack("B", status)) + @classmethod + def response(cls, status=1): # Server sent, 0-off, 1-on + return cls.make_packet(pack("B", status)) class GPS_LBS_SWITCH_TIMES(GPS303Pkt): PROTO = 0x34 DIR = Dir.OUT - def response(self): + @classmethod + def response(cls): # Data is in packed decimal # 00/01 - GPS on/off # 00/01 - Don't set / Set upload period @@ -303,14 +316,15 @@ class GPS_LBS_SWITCH_TIMES(GPS303Pkt): # HHMM - Time of boot # 00/01 - Don't set / Set time of shutdown # HHMM - Time of shutdown - return self.make_packet(b"") # TODO + return cls.make_packet(b"") # TODO class _SET_PHONE(GPS303Pkt): DIR = Dir.OUT - def response(self, phone): - return self.make_packet(phone.encode()) + @classmethod + def response(cls, phone): + return cls.make_packet(phone.encode()) class REMOTE_MONITOR_PHONE(_SET_PHONE): @@ -333,16 +347,18 @@ class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device PROTO = 0x44 DIR = Dir.OUT - def response(self): - return self.make_packet(b"") + @classmethod + def response(cls): + return cls.make_packet(b"") class GPS_OFF_PERIOD(GPS303Pkt): PROTO = 0x46 DIR = Dir.OUT - def response(self, onoff=0, fm="0000", to="2359"): - return self.make_packet( + @classmethod + def response(cls, onoff=0, fm="0000", to="2359"): + return cls.make_packet( pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to) ) @@ -351,10 +367,11 @@ class DND_PERIOD(GPS303Pkt): PROTO = 0x47 DIR = Dir.OUT + @classmethod def response( - self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359" + cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359" ): - return self.make_packet( + return cls.make_packet( pack("B", onoff) + pack("B", week) + bytes.fromhex(fm1) @@ -368,29 +385,32 @@ class RESTART_SHUTDOWN(GPS303Pkt): PROTO = 0x48 DIR = Dir.OUT - def response(self, flag=0): + @classmethod + def response(cls, flag=2): # 1 - restart # 2 - shutdown - return self.make_packet(pack("B", flag)) + return cls.make_packet(pack("B", flag)) class DEVICE(GPS303Pkt): PROTO = 0x49 DIR = Dir.OUT - def response(self, flag=0): + @classmethod + def response(cls, flag=0): # 0 - Stop looking for equipment # 1 - Start looking for equipment - return self.make_packet(pack("B", flag)) + return cls.make_packet(pack("B", flag)) class ALARM_CLOCK(GPS303Pkt): PROTO = 0x50 DIR = Dir.OUT - def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))): + @classmethod + def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))): return b"".join( - pack("B", day) + bytes.fromhex(tm) for day, tm in alarms + cls("B", day) + bytes.fromhex(tm) for day, tm in alarms ) @@ -401,14 +421,16 @@ class STOP_ALARM(GPS303Pkt): def from_packet(cls, length, payload): self = super().from_packet(length, payload) self.flag = payload[0] + return self class SETUP(GPS303Pkt): PROTO = 0x57 DIR = Dir.EXT + @classmethod def response( - self, + cls, uploadintervalseconds=0x0300, binaryswitch=0b00110001, alarms=[0, 0, 0], @@ -439,7 +461,7 @@ class SETUP(GPS303Pkt): ] + [b";".join([el.encode() for el in phonenumbers])] ) - return self.make_packet(payload) + return cls.make_packet(payload) class SYNCHRONOUS_WHITELIST(GPS303Pkt): @@ -454,24 +476,25 @@ class WIFI_POSITIONING(_WIFI_POSITIONING): PROTO = 0x69 DIR = Dir.EXT - def response(self, lat=None, lon=None): + @classmethod + def response(cls, lat=None, lon=None): if lat is None or lon is None: payload = b"" else: payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode( "ascii" ) - return self.make_packet(payload) + return cls.make_packet(payload) class MANUAL_POSITIONING(GPS303Pkt): PROTO = 0x80 - DIR = Dir.EXT + DIR = Dir.OUT @classmethod def from_packet(cls, length, payload): self = super().from_packet(length, payload) - self.flag = payload[0] + self.flag = payload[0] if len(payload) > 0 else None self.reason = { 1: "Incorrect time", 2: "LBS less", @@ -481,9 +504,11 @@ class MANUAL_POSITIONING(GPS303Pkt): 6: "LBS prohibited, WiFi absent", 7: "GPS spacing < 50 m", }.get(self.flag, "Unknown") + return self - def response(self): - return self.make_packet(b"") + @classmethod + def response(cls): + return cls.make_packet(b"") class BATTERY_CHARGE(GPS303Pkt): @@ -512,8 +537,9 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt): self.interval = unpack("!H", payload[:2]) return self - def response(self, interval=10): - return self.make_packet(pack("!H", interval)) + @classmethod + def response(cls, interval=10): + return cls.make_packet(pack("!H", interval)) class SOS_ALARM(GPS303Pkt): @@ -536,6 +562,18 @@ if True: # just to indent the code, sorry! PROTOS[cls.__name__] = cls.PROTO +def class_by_prefix(prefix): + lst = [ + (name, proto) + for name, proto in PROTOS.items() + if name.upper().startswith(prefix.upper()) + ] + if len(lst) != 1: + return lst + _, proto = lst[0] + return CLASSES[proto] + + def proto_by_name(name): return PROTOS.get(name, -1)