2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
37 "GPS_OFFLINE_POSITIONING",
42 "WIFI_OFFLINE_POSITIONING",
47 "SYNCHRONOUS_WHITELIST",
53 "CHARGER_DISCONNECTED",
55 "POSITION_UPLOAD_INTERVAL",
58 log = getLogger("gps303")
62 IN = 0 # Incoming, no response needed
63 INLINE = 2 # Birirectional, use `inline_response()`
64 EXT = 3 # Birirectional, use external responder
65 OUT = 4 # Outgoing, should not appear on input
70 DIR = Dir.INLINE # Most packets anticipate simple acknowledgement
72 def __init__(self, *args, **kwargs):
74 for k, v in kwargs.items():
78 return "{}({})".format(
79 self.__class__.__name__,
83 'bytes.fromhex("{}")'.format(v.hex())
84 if isinstance(v, bytes)
87 for k, v in self.__dict__.items()
88 if not k.startswith("_")
93 def from_packet(cls, length, payload):
94 return cls(payload=payload, length=length)
97 return pack("BB", self.length, self.PROTO) + self.payload
100 def make_packet(cls, payload):
101 assert isinstance(payload, bytes)
102 length = len(payload) + 1 # plus proto byte
105 return pack("BB", length, cls.PROTO) + payload
108 def inline_response(cls, packet):
109 if cls.DIR is Dir.INLINE:
110 return cls.make_packet(b"")
115 class UNKNOWN(GPS303Pkt):
116 PROTO = 256 # > 255 is impossible in real packets
120 class LOGIN(GPS303Pkt):
122 # Default response for ACK, can also respond with STOP_UPLOAD
125 def from_packet(cls, length, payload):
126 self = super().from_packet(length, payload)
127 self.imei = payload[:-1].hex()
128 self.ver = unpack("B", payload[-1:])[0]
132 class SUPERVISION(GPS303Pkt):
136 def response(self, status=0):
137 # 1: The device automatically answers Pickup effect
138 # 2: Automatically Answering Two-way Calls
139 # 3: Ring manually answer the two-way call
140 return self.make_packet(pack("B", status))
143 class HEARTBEAT(GPS303Pkt):
147 class _GPS_POSITIONING(GPS303Pkt):
149 def from_packet(cls, length, payload):
150 self = super().from_packet(length, payload)
151 self.dtime = payload[:6]
152 if self.dtime == b"\0\0\0\0\0\0":
155 self.devtime = datetime(
156 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
158 self.gps_data_length = payload[6] >> 4
159 self.gps_nb_sat = payload[6] & 0x0F
160 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
161 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
162 flip_lon = bool(flags & 0b0000100000000000) # bit 4
163 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
164 self.heading = flags & 0b0000001111111111 # bits 6 - last
165 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
166 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
172 def inline_response(cls, packet):
173 tup = datetime.utcnow().timetuple()
174 ttup = (tup[0] % 100,) + tup[1:6]
175 return cls.make_packet(pack("BBBBBB", *ttup))
178 class GPS_POSITIONING(_GPS_POSITIONING):
182 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
186 class STATUS(GPS303Pkt):
191 def from_packet(cls, length, payload):
192 self = super().from_packet(length, payload)
193 if len(payload) == 5:
200 ) = unpack("BBBBB", payload)
201 elif len(payload) == 4:
202 self.batt, self.ver, self.timezone, self.intvl = unpack(
208 def response(self, upload_interval=25): # Set interval in minutes
209 return self.make_packet(pack("B", upload_interval))
212 class HIBERNATION(GPS303Pkt):
216 def response(self): # Server can send to send devicee to sleep
217 return self.make_packet(b"")
220 class RESET(GPS303Pkt): # Device sends when it got reset SMS
224 def response(self): # Server can send to initiate factory reset
225 return self.make_packet(b"")
228 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
232 def response(self, number=3): # Number of whitelist entries
233 return self.make_packet(pack("B", number))
236 class _WIFI_POSITIONING(GPS303Pkt):
238 def from_packet(cls, length, payload):
239 self = super().from_packet(length, payload)
240 self.dtime = payload[:6]
241 if self.dtime == b"\0\0\0\0\0\0":
244 self.devtime = datetime.strptime(
245 self.dtime.hex(), "%y%m%d%H%M%S"
246 ).astimezone(tz=timezone.utc)
248 for i in range(self.length): # length has special meaning here
249 slice = payload[6 + i * 7 : 13 + i * 7]
250 self.wifi_aps.append(
251 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
253 gsm_slice = payload[6 + self.length * 7 :]
254 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
256 for i in range(ncells):
257 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
258 locac, cellid, sigstr = unpack(
259 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
261 self.gsm_cells.append((locac, cellid, -sigstr))
265 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
269 def inline_response(cls, packet):
270 return cls.make_packet(
271 bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
275 class TIME(GPS303Pkt):
279 def inline_response(cls, packet):
280 return cls.make_packet(
281 pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
285 class PROHIBIT_LBS(GPS303Pkt):
289 def response(self, status=1): # Server sent, 0-off, 1-on
290 return self.make_packet(pack("B", status))
293 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
298 # Data is in packed decimal
300 # 00/01 - Don't set / Set upload period
301 # HHMMHHMM - Upload period
303 # 00/01 - Don't set / Set time of boot
304 # HHMM - Time of boot
305 # 00/01 - Don't set / Set time of shutdown
306 # HHMM - Time of shutdown
307 return self.make_packet(b"") # TODO
310 class _SET_PHONE(GPS303Pkt):
313 def response(self, phone):
314 return self.make_packet(phone.encode())
317 class REMOTE_MONITOR_PHONE(_SET_PHONE):
321 class SOS_PHONE(_SET_PHONE):
325 class DAD_PHONE(_SET_PHONE):
329 class MOM_PHONE(_SET_PHONE):
333 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
338 return self.make_packet(b"")
341 class GPS_OFF_PERIOD(GPS303Pkt):
345 def response(self, onoff=0, fm="0000", to="2359"):
346 return self.make_packet(
347 pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
351 class DND_PERIOD(GPS303Pkt):
356 self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
358 return self.make_packet(
368 class RESTART_SHUTDOWN(GPS303Pkt):
372 def response(self, flag=0):
375 return self.make_packet(pack("B", flag))
378 class DEVICE(GPS303Pkt):
382 def response(self, flag=0):
383 # 0 - Stop looking for equipment
384 # 1 - Start looking for equipment
385 return self.make_packet(pack("B", flag))
388 class ALARM_CLOCK(GPS303Pkt):
392 def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
394 pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
398 class STOP_ALARM(GPS303Pkt):
402 def from_packet(cls, length, payload):
403 self = super().from_packet(length, payload)
404 self.flag = payload[0]
407 class SETUP(GPS303Pkt):
413 uploadintervalseconds=0x0300,
414 binaryswitch=0b00110001,
421 phonenumbers=["", "", ""],
424 return pack("!I", x)[1:]
428 pack("!H", uploadintervalseconds),
429 pack("B", binaryswitch),
431 + [pack3b(el) for el in alarms]
433 pack("B", dndtimeswitch),
435 + [pack3b(el) for el in dndtimes]
437 pack("B", gpstimeswitch),
438 pack("!H", gpstimestart),
439 pack("!H", gpstimestop),
441 + [b";".join([el.encode() for el in phonenumbers])]
443 return self.make_packet(payload)
446 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
450 class RESTORE_PASSWORD(GPS303Pkt):
454 class WIFI_POSITIONING(_WIFI_POSITIONING):
458 def response(self, lat=None, lon=None):
459 if lat is None or lon is None:
462 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
465 return self.make_packet(payload)
468 class MANUAL_POSITIONING(GPS303Pkt):
473 def from_packet(cls, length, payload):
474 self = super().from_packet(length, payload)
475 self.flag = payload[0]
480 4: "LBS search > 3 times",
481 5: "Same LBS and WiFi data",
482 6: "LBS prohibited, WiFi absent",
483 7: "GPS spacing < 50 m",
484 }.get(self.flag, "Unknown")
487 return self.make_packet(b"")
490 class BATTERY_CHARGE(GPS303Pkt):
494 class CHARGER_CONNECTED(GPS303Pkt):
498 class CHARGER_DISCONNECTED(GPS303Pkt):
502 class VIBRATION_RECEIVED(GPS303Pkt):
506 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
511 def from_packet(cls, length, payload):
512 self = super().from_packet(length, payload)
513 self.interval = unpack("!H", payload[:2])
516 def response(self, interval=10):
517 return self.make_packet(pack("!H", interval))
520 class SOS_ALARM(GPS303Pkt):
524 # Build dicts protocol number -> class and class name -> protocol number
527 if True: # just to indent the code, sorry!
530 for name, cls in globals().items()
532 and issubclass(cls, GPS303Pkt)
533 and not name.startswith("_")
535 if hasattr(cls, "PROTO"):
536 CLASSES[cls.PROTO] = cls
537 PROTOS[cls.__name__] = cls.PROTO
540 def class_by_prefix(prefix):
541 lst = [(name, proto) for name, proto in PROTOS.items()
542 if name.upper().startswith(prefix.upper())]
546 return CLASSES[proto]
549 def proto_by_name(name):
550 return PROTOS.get(name, -1)
553 def proto_of_message(packet):
554 return unpack("B", packet[1:2])[0]
557 def inline_response(packet):
558 proto = proto_of_message(packet)
560 return CLASSES[proto].inline_response(packet)
565 def make_object(length, proto, payload):
567 return CLASSES[proto].from_packet(length, payload)
569 retobj = UNKNOWN.from_packet(length, payload)
570 retobj.PROTO = proto # Override class attr with object attr
574 def parse_message(packet):
575 length, proto = unpack("BB", packet[:2])
577 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
579 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
581 and len(payload) + adjust != length
584 "With proto %d length is %d but payload length is %d+%d",
590 return make_object(length, proto, payload)
593 def handle_packet(packet): # DEPRECATED
594 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
595 return UNKNOWN.from_packet(len(packet), packet)
596 return parse_message(packet[2:-2])
599 def make_response(msg, **kwargs): # DEPRECATED
600 inframe = msg.response(**kwargs)
601 return None if inframe is None else b"xx" + inframe + b"\r\n"