X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fgps303proto.py;h=84358fbcb66ee3b750a3dff2e6a4c5cc5b16cf47;hb=2b51d5b0a8f6a6f8fb934473611b9bfddd38cabf;hp=6fa76550db1031ea901ff55d71998d1aeb9fd640;hpb=4c173a5448990cd4da398be1bf3479bef17b1048;p=loctrkd.git diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 6fa7655..84358fb 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -21,11 +21,11 @@ from struct import pack, unpack __all__ = ( "handle_packet", + "inline_response", "make_object", "make_response", "parse_message", "proto_by_name", - "set_config", "GPS303Pkt", "UNKNOWN", "LOGIN", @@ -58,7 +58,7 @@ log = getLogger("gps303") class GPS303Pkt: PROTO: int - CONFIG = None + INLINE = True def __init__(self, *args, **kwargs): assert len(args) == 0 @@ -88,19 +88,24 @@ class GPS303Pkt: return pack("BB", self.length, self.PROTO) + self.payload @classmethod - def response(cls, *args): - if len(args) == 0: - return None - assert len(args) == 1 and isinstance(args[0], bytes) - payload = args[0] + def make_packet(cls, payload): + assert isinstance(payload, bytes) length = len(payload) + 1 if length > 6: length -= 6 return pack("BB", length, cls.PROTO) + payload + @classmethod + def inline_response(cls, packet): + if cls.INLINE: + return cls.make_packet(b"") + else: + return None + class UNKNOWN(GPS303Pkt): PROTO = 256 # > 255 is impossible in real packets + INLINE = False class LOGIN(GPS303Pkt): @@ -113,20 +118,16 @@ class LOGIN(GPS303Pkt): self.ver = unpack("B", payload[-1:])[0] return self - @classmethod - def response(cls): - return super().response(b"") - class SUPERVISION(GPS303Pkt): # Server sends supervision number status PROTO = 0x05 + INLINE = False - @classmethod - def response(cls, supnum=0): + def response(self, supnum=0): # 1: The device automatically answers Pickup effect # 2: Automatically Answering Two-way Calls # 3: Ring manually answer the two-way call - return super().response(b"") + return self.make_packet(pack("B", supnum)) class HEARTBEAT(GPS303Pkt): @@ -148,17 +149,18 @@ class _GPS_POSITIONING(GPS303Pkt): self.gps_nb_sat = payload[6] & 0x0F lat, lon, speed, flags = unpack("!IIBH", payload[7:18]) self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3 - flip_lon = bool(flags & 0b0000100000000000) # bit 4 - flip_lat = not bool(flags & 0b0000010000000000) # bit 5 - self.heading = flags & 0b0000001111111111 # bits 6 - last + flip_lon = bool(flags & 0b0000100000000000) # bit 4 + flip_lat = not bool(flags & 0b0000010000000000) # bit 5 + self.heading = flags & 0b0000001111111111 # bits 6 - last self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1) self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1) self.speed = speed self.flags = flags return self - def response(self): - return super().response(self.dtime) + @classmethod + def inline_response(cls, packet): + return cls.make_packet(packet[2:8]) class GPS_POSITIONING(_GPS_POSITIONING): @@ -171,6 +173,7 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING): class STATUS(GPS303Pkt): PROTO = 0x13 + INLINE = False @classmethod def from_packet(cls, length, payload): @@ -191,7 +194,7 @@ class STATUS(GPS303Pkt): return self def response(self, upload_interval=25): # Set interval in minutes - return super().response(pack("B", upload_interval)) + return self.make_packet(pack("B", upload_interval)) class HIBERNATION(GPS303Pkt): @@ -200,16 +203,18 @@ class HIBERNATION(GPS303Pkt): class RESET(GPS303Pkt): # Device sends when it got reset SMS PROTO = 0x15 + INLINE = False def response(self): # Server can send to initiate factory reset - return super().response(b"") + return self.make_packet(b"") class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58) PROTO = 0x16 + INLINE = False def response(self, number=3): # Number of whitelist entries - return super().response(pack("B", number)) + return self.make_packet(pack("B", number)) class _WIFI_POSITIONING(GPS303Pkt): @@ -244,23 +249,27 @@ class _WIFI_POSITIONING(GPS303Pkt): class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): PROTO = 0x17 - def response(self): - return super().response(self.dtime) + @classmethod + def inline_response(cls, packet): + return cls.make_packet(packet[2:8]) class TIME(GPS303Pkt): PROTO = 0x30 - def response(self): - payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6]) - return super().response(payload) + @classmethod + def inline_response(cls, packet): + return pack( + "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6] + ) class PROHIBIT_LBS(GPS303Pkt): PROTO = 0x33 + INLINE = False def response(self, status=1): # Server sent, 0-off, 1-on - return super().response(pack("B", status)) + return self.make_packet(pack("B", status)) class MOM_PHONE(GPS303Pkt): @@ -270,9 +279,6 @@ class MOM_PHONE(GPS303Pkt): class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device PROTO = 0x44 - def response(self): - return super().response(b"") - class STOP_ALARM(GPS303Pkt): PROTO = 0x56 @@ -280,6 +286,7 @@ class STOP_ALARM(GPS303Pkt): class SETUP(GPS303Pkt): PROTO = 0x57 + INLINE = False def response( self, @@ -313,7 +320,7 @@ class SETUP(GPS303Pkt): ] + [b";".join([el.encode() for el in phoneNumbers])] ) - return super().response(payload) + return self.make_packet(payload) class SYNCHRONOUS_WHITELIST(GPS303Pkt): @@ -326,6 +333,7 @@ class RESTORE_PASSWORD(GPS303Pkt): class WIFI_POSITIONING(_WIFI_POSITIONING): PROTO = 0x69 + INLINE = False def response(self, lat=None, lon=None): if lat is None or lon is None: @@ -334,11 +342,12 @@ class WIFI_POSITIONING(_WIFI_POSITIONING): payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode( "ascii" ) - return super().response(payload) + return self.make_packet(payload) class MANUAL_POSITIONING(GPS303Pkt): PROTO = 0x80 + INLINE = False class BATTERY_CHARGE(GPS303Pkt): @@ -359,6 +368,7 @@ class VIBRATION_RECEIVED(GPS303Pkt): class POSITION_UPLOAD_INTERVAL(GPS303Pkt): PROTO = 0x98 + INLINE = False @classmethod def from_packet(cls, length, payload): @@ -366,8 +376,8 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt): self.interval = unpack("!H", payload[:2]) return self - def response(self): - return super().response(pack("!H", self.interval)) + def response(self, interval=10): + return self.make_packet(pack("!H", interval)) class SOS_ALARM(GPS303Pkt): @@ -398,6 +408,14 @@ def proto_of_message(packet): return unpack("B", packet[1:2])[0] +def inline_response(packet): + proto = proto_of_message(packet) + if proto in CLASSES: + return CLASSES[proto].inline_response(packet) + else: + return None + + def make_object(length, proto, payload): if proto in CLASSES: return CLASSES[proto].from_packet(length, payload) @@ -412,8 +430,7 @@ def parse_message(packet): payload = packet[2:] adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case if ( - proto - not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) + proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) and length > 1 and len(payload) + adjust != length ): @@ -436,7 +453,3 @@ def handle_packet(packet): # DEPRECATED def make_response(msg, **kwargs): # DEPRECATED inframe = msg.response(**kwargs) return None if inframe is None else b"xx" + inframe + b"\r\n" - - -def set_config(config): # Note that we are setting _class_ attribute - GPS303Pkt.CONFIG = config