2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
32 "GPS_OFFLINE_POSITIONING",
37 "WIFI_OFFLINE_POSITIONING",
42 "SYNCHRONOUS_WHITELIST",
48 "CHARGER_DISCONNECTED",
50 "POSITION_UPLOAD_INTERVAL",
53 log = getLogger("gps303")
60 def __init__(self, *args, **kwargs):
62 for k, v in kwargs.items():
66 return "{}({})".format(
67 self.__class__.__name__,
71 'bytes.fromhex("{}")'.format(v.hex())
72 if isinstance(v, bytes)
75 for k, v in self.__dict__.items()
76 if not k.startswith("_")
81 def from_packet(cls, length, proto, payload):
82 return cls(proto=proto, payload=payload, length=length)
84 def response(self, *args):
87 assert len(args) == 1 and isinstance(args[0], bytes)
89 length = len(payload) + 1
92 return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
95 class UNKNOWN(_GT06pkt):
99 class LOGIN(_GT06pkt):
103 def from_packet(cls, length, proto, payload):
104 self = super().from_packet(length, proto, payload)
105 self.imei = payload[:-1].hex()
106 self.ver = unpack("B", payload[-1:])[0]
110 return super().response(b"")
113 class SUPERVISION(_GT06pkt): # Server sends supervision number status
116 def response(self, supnum=0):
117 # 1: The device automatically answers Pickup effect
118 # 2: Automatically Answering Two-way Calls
119 # 3: Ring manually answer the two-way call
120 return super().response(b"")
123 class HEARTBEAT(_GT06pkt):
127 class _GPS_POSITIONING(_GT06pkt):
129 def from_packet(cls, length, proto, payload):
130 self = super().from_packet(length, proto, payload)
131 self.dtime = payload[:6]
132 if self.dtime == b"\0\0\0\0\0\0":
135 self.devtime = datetime(
136 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
138 self.gps_data_length = payload[6] >> 4
139 self.gps_nb_sat = payload[6] & 0x0F
140 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
141 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
142 flip_lon = bool(flags & 0b0000100000000000) # bit 4
143 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
144 self.heading = flags & 0b0000001111111111 # bits 6 - last
145 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
146 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
152 return super().response(self.dtime)
155 class GPS_POSITIONING(_GPS_POSITIONING):
159 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
163 class STATUS(_GT06pkt):
167 def from_packet(cls, length, proto, payload):
168 self = super().from_packet(length, proto, payload)
169 if len(payload) == 5:
176 ) = unpack("BBBBB", payload)
177 elif len(payload) == 4:
178 self.batt, self.ver, self.timezone, self.intvl = unpack(
184 def response(self, upload_interval=25): # Set interval in minutes
185 return super().response(pack("B", upload_interval))
188 class HIBERNATION(_GT06pkt):
192 class RESET(_GT06pkt): # Device sends when it got reset SMS
195 def response(self): # Server can send to initiate factory reset
196 return super().response(b"")
199 class WHITELIST_TOTAL(_GT06pkt): # Server sends to initiage sync (0x58)
202 def response(self, number=3): # Number of whitelist entries
203 return super().response(pack("B", number))
206 class _WIFI_POSITIONING(_GT06pkt):
208 def from_packet(cls, length, proto, payload):
209 self = super().from_packet(length, proto, payload)
210 self.dtime = payload[:6]
211 if self.dtime == b"\0\0\0\0\0\0":
214 self.devtime = datetime.strptime(
215 self.dtime.hex(), "%y%m%d%H%M%S"
216 ).astimezone(tz=timezone.utc)
218 for i in range(self.length): # length has special meaning here
219 slice = payload[6 + i * 7 : 13 + i * 7]
220 self.wifi_aps.append(
221 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
223 gsm_slice = payload[6 + self.length * 7 :]
224 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
226 for i in range(ncells):
227 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
228 locac, cellid, sigstr = unpack(
229 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
231 self.gsm_cells.append((locac, cellid, -sigstr))
235 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
239 return super().response(self.dtime)
242 class TIME(_GT06pkt):
246 payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
247 return super().response(payload)
250 class PROHIBIT_LBS(_GT06pkt):
253 def response(self, status=1): # Server sent, 0-off, 1-on
254 return super().response(pack("B", status))
257 class MOM_PHONE(_GT06pkt):
261 class STOP_UPLOAD(_GT06pkt): # Server response to LOGIN to thwart the device
265 return super().response(b"")
268 class STOP_ALARM(_GT06pkt):
272 class SETUP(_GT06pkt):
277 uploadIntervalSeconds=0x0300,
278 binarySwitch=0b00110001,
285 phoneNumbers=["", "", ""],
288 return pack("!I", x)[1:]
292 pack("!H", uploadIntervalSeconds),
293 pack("B", binarySwitch),
295 + [pack3b(el) for el in alarms]
297 pack("B", dndTimeSwitch),
299 + [pack3b(el) for el in dndTimes]
301 pack("B", gpsTimeSwitch),
302 pack("!H", gpsTimeStart),
303 pack("!H", gpsTimeStop),
305 + [b";".join([el.encode() for el in phoneNumbers])]
307 return super().response(payload)
310 class SYNCHRONOUS_WHITELIST(_GT06pkt):
314 class RESTORE_PASSWORD(_GT06pkt):
318 class WIFI_POSITIONING(_WIFI_POSITIONING):
321 def response(self, lat=None, lon=None):
322 if lat is None or lon is None:
325 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
328 return super().response(payload)
331 class MANUAL_POSITIONING(_GT06pkt):
335 class BATTERY_CHARGE(_GT06pkt):
339 class CHARGER_CONNECTED(_GT06pkt):
343 class CHARGER_DISCONNECTED(_GT06pkt):
347 class VIBRATION_RECEIVED(_GT06pkt):
351 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
355 def from_packet(cls, length, proto, payload):
356 self = super().from_packet(length, proto, payload)
357 self.interval = unpack("!H", payload[:2])
361 return super().response(pack("!H", self.interval))
364 class SOS_ALARM(_GT06pkt):
368 # Build a dict protocol number -> class
370 if True: # just to indent the code, sorry!
373 for name, cls in globals().items()
375 and issubclass(cls, _GT06pkt)
376 and not name.startswith("_")
378 if hasattr(cls, "PROTO"):
379 CLASSES[cls.PROTO] = cls
382 def make_object(length, proto, payload):
384 return CLASSES[proto].from_packet(length, proto, payload)
386 return UNKNOWN.from_packet(length, proto, payload)
389 def handle_packet(packet, addr, when):
391 return UNKNOWN.from_packet(0, 0, packet)
393 xx, length, proto = unpack("!2sBB", packet[:4])
395 payload = packet[4:-2]
396 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
399 not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
401 and len(payload) + adjust != length
404 "With proto %d length is %d but payload length is %d+%d",
410 if xx != b"xx" or crlf != b"\r\n":
411 return UNKNOWN.from_packet(length, proto, packet) # full packet
413 return make_object(length, proto, payload)
416 def make_response(msg, **kwargs):
417 return msg.response(**kwargs)
420 def set_config(config): # Note that we are setting _class_ attribute
421 _GT06pkt.CONFIG = config