2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
38 "GPS_OFFLINE_POSITIONING",
43 "WIFI_OFFLINE_POSITIONING",
48 "SYNCHRONOUS_WHITELIST",
54 "CHARGER_DISCONNECTED",
56 "POSITION_UPLOAD_INTERVAL",
59 log = getLogger("gps303")
63 IN = 0 # Incoming, no response needed
64 INLINE = 2 # Birirectional, use `inline_response()`
65 EXT = 3 # Birirectional, use external responder
66 OUT = 4 # Outgoing, should not appear on input
71 DIR = Dir.IN # Do not send anything back by default
73 def __init__(self, *args, **kwargs):
75 for k, v in kwargs.items():
79 return "{}({})".format(
80 self.__class__.__name__,
84 'bytes.fromhex("{}")'.format(v.hex())
85 if isinstance(v, bytes)
88 for k, v in self.__dict__.items()
89 if not k.startswith("_")
94 def from_packet(cls, length, payload):
95 return cls(payload=payload, length=length)
98 return pack("BB", self.length, self.PROTO) + self.payload
101 def make_packet(cls, payload):
102 assert isinstance(payload, bytes)
103 length = len(payload) + 1 # plus proto byte
106 return pack("BB", length, cls.PROTO) + payload
109 def inline_response(cls, packet):
110 if cls.DIR is Dir.INLINE:
111 return cls.make_packet(b"")
116 class UNKNOWN(GPS303Pkt):
117 PROTO = 256 # > 255 is impossible in real packets
120 class LOGIN(GPS303Pkt):
123 # Default response for ACK, can also respond with STOP_UPLOAD
126 def from_packet(cls, length, payload):
127 self = super().from_packet(length, payload)
128 self.imei = payload[:-1].hex()
129 self.ver = unpack("B", payload[-1:])[0]
133 class SUPERVISION(GPS303Pkt):
138 def response(cls, status=0):
139 # 1: The device automatically answers Pickup effect
140 # 2: Automatically Answering Two-way Calls
141 # 3: Ring manually answer the two-way call
142 return cls.make_packet(pack("B", status))
145 class HEARTBEAT(GPS303Pkt):
150 class _GPS_POSITIONING(GPS303Pkt):
154 def from_packet(cls, length, payload):
155 self = super().from_packet(length, payload)
156 self.dtime = payload[:6]
157 if self.dtime == b"\0\0\0\0\0\0":
160 self.devtime = datetime(
161 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
163 self.gps_data_length = payload[6] >> 4
164 self.gps_nb_sat = payload[6] & 0x0F
165 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
166 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
167 flip_lon = bool(flags & 0b0000100000000000) # bit 4
168 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
169 self.heading = flags & 0b0000001111111111 # bits 6 - last
170 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
171 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
177 def inline_response(cls, packet):
178 tup = datetime.utcnow().timetuple()
179 ttup = (tup[0] % 100,) + tup[1:6]
180 return cls.make_packet(pack("BBBBBB", *ttup))
183 class GPS_POSITIONING(_GPS_POSITIONING):
187 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
191 class STATUS(GPS303Pkt):
196 def from_packet(cls, length, payload):
197 self = super().from_packet(length, payload)
198 if len(payload) == 5:
205 ) = unpack("BBBBB", payload)
206 elif len(payload) == 4:
207 self.batt, self.ver, self.timezone, self.intvl = unpack(
214 def response(cls, upload_interval=25): # Set interval in minutes
215 return cls.make_packet(pack("B", upload_interval))
218 class HIBERNATION(GPS303Pkt):
223 def response(cls): # Server can send to send devicee to sleep
224 return cls.make_packet(b"")
227 class RESET(GPS303Pkt): # Device sends when it got reset SMS
231 def response(cls): # Server can send to initiate factory reset
232 return cls.make_packet(b"")
235 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
240 def response(cls, number=3): # Number of whitelist entries
241 return cls.make_packet(pack("B", number))
244 class _WIFI_POSITIONING(GPS303Pkt):
246 def from_packet(cls, length, payload):
247 self = super().from_packet(length, payload)
248 self.dtime = payload[:6]
249 if self.dtime == b"\0\0\0\0\0\0":
252 self.devtime = datetime.strptime(
253 self.dtime.hex(), "%y%m%d%H%M%S"
254 ).astimezone(tz=timezone.utc)
256 for i in range(self.length): # length has special meaning here
257 slice = payload[6 + i * 7 : 13 + i * 7]
258 self.wifi_aps.append(
259 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
261 gsm_slice = payload[6 + self.length * 7 :]
262 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
264 for i in range(ncells):
265 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
266 locac, cellid, sigstr = unpack(
267 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
269 self.gsm_cells.append((locac, cellid, -sigstr))
273 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
278 def inline_response(cls, packet):
279 return cls.make_packet(
280 bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
284 class TIME(GPS303Pkt):
289 def inline_response(cls, packet):
290 return cls.make_packet(
291 pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
295 class PROHIBIT_LBS(GPS303Pkt):
300 def response(cls, status=1): # Server sent, 0-off, 1-on
301 return cls.make_packet(pack("B", status))
304 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
310 # Data is in packed decimal
312 # 00/01 - Don't set / Set upload period
313 # HHMMHHMM - Upload period
315 # 00/01 - Don't set / Set time of boot
316 # HHMM - Time of boot
317 # 00/01 - Don't set / Set time of shutdown
318 # HHMM - Time of shutdown
319 return cls.make_packet(b"") # TODO
322 class _SET_PHONE(GPS303Pkt):
326 def response(cls, phone):
327 return cls.make_packet(phone.encode())
330 class REMOTE_MONITOR_PHONE(_SET_PHONE):
334 class SOS_PHONE(_SET_PHONE):
338 class DAD_PHONE(_SET_PHONE):
342 class MOM_PHONE(_SET_PHONE):
346 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
352 return cls.make_packet(b"")
355 class GPS_OFF_PERIOD(GPS303Pkt):
360 def response(cls, onoff=0, fm="0000", to="2359"):
361 return cls.make_packet(
362 pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
366 class DND_PERIOD(GPS303Pkt):
372 cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
374 return cls.make_packet(
384 class RESTART_SHUTDOWN(GPS303Pkt):
389 def response(cls, flag=2):
392 return cls.make_packet(pack("B", flag))
395 class DEVICE(GPS303Pkt):
400 def response(cls, flag=0):
401 # 0 - Stop looking for equipment
402 # 1 - Start looking for equipment
403 return cls.make_packet(pack("B", flag))
406 class ALARM_CLOCK(GPS303Pkt):
411 def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
413 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
417 class STOP_ALARM(GPS303Pkt):
421 def from_packet(cls, length, payload):
422 self = super().from_packet(length, payload)
423 self.flag = payload[0]
427 class SETUP(GPS303Pkt):
434 uploadintervalseconds=0x0300,
435 binaryswitch=0b00110001,
442 phonenumbers=["", "", ""],
445 return pack("!I", x)[1:]
449 pack("!H", uploadintervalseconds),
450 pack("B", binaryswitch),
452 + [pack3b(el) for el in alarms]
454 pack("B", dndtimeswitch),
456 + [pack3b(el) for el in dndtimes]
458 pack("B", gpstimeswitch),
459 pack("!H", gpstimestart),
460 pack("!H", gpstimestop),
462 + [b";".join([el.encode() for el in phonenumbers])]
464 return cls.make_packet(payload)
467 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
471 class RESTORE_PASSWORD(GPS303Pkt):
475 class WIFI_POSITIONING(_WIFI_POSITIONING):
480 def response(cls, lat=None, lon=None):
481 if lat is None or lon is None:
484 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
487 return cls.make_packet(payload)
490 class MANUAL_POSITIONING(GPS303Pkt):
495 def from_packet(cls, length, payload):
496 self = super().from_packet(length, payload)
497 self.flag = payload[0] if len(payload) > 0 else None
502 4: "LBS search > 3 times",
503 5: "Same LBS and WiFi data",
504 6: "LBS prohibited, WiFi absent",
505 7: "GPS spacing < 50 m",
506 }.get(self.flag, "Unknown")
511 return cls.make_packet(b"")
514 class BATTERY_CHARGE(GPS303Pkt):
518 class CHARGER_CONNECTED(GPS303Pkt):
522 class CHARGER_DISCONNECTED(GPS303Pkt):
526 class VIBRATION_RECEIVED(GPS303Pkt):
530 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
535 def from_packet(cls, length, payload):
536 self = super().from_packet(length, payload)
537 self.interval = unpack("!H", payload[:2])
541 def response(cls, interval=10):
542 return cls.make_packet(pack("!H", interval))
545 class SOS_ALARM(GPS303Pkt):
549 # Build dicts protocol number -> class and class name -> protocol number
552 if True: # just to indent the code, sorry!
555 for name, cls in globals().items()
557 and issubclass(cls, GPS303Pkt)
558 and not name.startswith("_")
560 if hasattr(cls, "PROTO"):
561 CLASSES[cls.PROTO] = cls
562 PROTOS[cls.__name__] = cls.PROTO
565 def class_by_prefix(prefix):
568 for name, proto in PROTOS.items()
569 if name.upper().startswith(prefix.upper())
574 return CLASSES[proto]
577 def proto_by_name(name):
578 return PROTOS.get(name, -1)
581 def proto_of_message(packet):
582 return unpack("B", packet[1:2])[0]
585 def inline_response(packet):
586 proto = proto_of_message(packet)
588 return CLASSES[proto].inline_response(packet)
593 def make_object(length, proto, payload):
595 return CLASSES[proto].from_packet(length, payload)
597 retobj = UNKNOWN.from_packet(length, payload)
598 retobj.PROTO = proto # Override class attr with object attr
602 def parse_message(packet):
603 length, proto = unpack("BB", packet[:2])
605 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
607 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
609 and len(payload) + adjust != length
612 "With proto %d length is %d but payload length is %d+%d",
618 return make_object(length, proto, payload)
621 def handle_packet(packet): # DEPRECATED
622 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
623 return UNKNOWN.from_packet(len(packet), packet)
624 return parse_message(packet[2:-2])
627 def make_response(msg, **kwargs): # DEPRECATED
628 inframe = msg.response(**kwargs)
629 return None if inframe is None else b"xx" + inframe + b"\r\n"