2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
62 def __init__(self, *args, **kwargs):
64 for k, v in kwargs.items():
68 return "{}({})".format(
69 self.__class__.__name__,
73 'bytes.fromhex("{}")'.format(v.hex())
74 if isinstance(v, bytes)
77 for k, v in self.__dict__.items()
78 if not k.startswith("_")
83 def from_packet(cls, length, payload):
84 return cls(payload=payload, length=length)
87 return pack("BB", self.length, self.PROTO) + self.payload
90 def make_packet(cls, payload):
91 assert isinstance(payload, bytes)
92 length = len(payload) + 1
95 return pack("BB", length, cls.PROTO) + payload
98 def inline_response(cls, packet):
99 return cls.make_packet(b"")
102 class UNKNOWN(GPS303Pkt):
103 PROTO = 256 # > 255 is impossible in real packets
106 def inline_response(cls, packet):
110 class LOGIN(GPS303Pkt):
114 def from_packet(cls, length, payload):
115 self = super().from_packet(length, payload)
116 self.imei = payload[:-1].hex()
117 self.ver = unpack("B", payload[-1:])[0]
121 return self.make_packet(b"")
124 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
127 def response(self, supnum=0):
128 # 1: The device automatically answers Pickup effect
129 # 2: Automatically Answering Two-way Calls
130 # 3: Ring manually answer the two-way call
131 return super().response(b"")
134 class HEARTBEAT(GPS303Pkt):
138 class _GPS_POSITIONING(GPS303Pkt):
140 def from_packet(cls, length, payload):
141 self = super().from_packet(length, payload)
142 self.dtime = payload[:6]
143 if self.dtime == b"\0\0\0\0\0\0":
146 self.devtime = datetime(
147 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
149 self.gps_data_length = payload[6] >> 4
150 self.gps_nb_sat = payload[6] & 0x0F
151 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
152 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
153 flip_lon = bool(flags & 0b0000100000000000) # bit 4
154 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
155 self.heading = flags & 0b0000001111111111 # bits 6 - last
156 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
157 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
163 def inline_response(cls, packet):
164 return cls.make_packet(packet[2:8])
167 return self.make_packet(self.dtime)
170 class GPS_POSITIONING(_GPS_POSITIONING):
174 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
178 class STATUS(GPS303Pkt):
182 def from_packet(cls, length, payload):
183 self = super().from_packet(length, payload)
184 if len(payload) == 5:
191 ) = unpack("BBBBB", payload)
192 elif len(payload) == 4:
193 self.batt, self.ver, self.timezone, self.intvl = unpack(
200 def inline_response(cls, packet):
203 def response(self, upload_interval=25): # Set interval in minutes
204 return super().response(pack("B", upload_interval))
207 class HIBERNATION(GPS303Pkt):
211 class RESET(GPS303Pkt): # Device sends when it got reset SMS
214 def response(self): # Server can send to initiate factory reset
215 return self.make_packet(b"")
218 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
221 def response(self, number=3): # Number of whitelist entries
222 return super().response(pack("B", number))
225 class _WIFI_POSITIONING(GPS303Pkt):
227 def from_packet(cls, length, payload):
228 self = super().from_packet(length, payload)
229 self.dtime = payload[:6]
230 if self.dtime == b"\0\0\0\0\0\0":
233 self.devtime = datetime.strptime(
234 self.dtime.hex(), "%y%m%d%H%M%S"
235 ).astimezone(tz=timezone.utc)
237 for i in range(self.length): # length has special meaning here
238 slice = payload[6 + i * 7 : 13 + i * 7]
239 self.wifi_aps.append(
240 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
242 gsm_slice = payload[6 + self.length * 7 :]
243 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
245 for i in range(ncells):
246 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
247 locac, cellid, sigstr = unpack(
248 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
250 self.gsm_cells.append((locac, cellid, -sigstr))
254 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
258 def inline_response(cls, packet):
259 return cls.make_packet(packet[2:8])
262 return super().response(self.dtime)
265 class TIME(GPS303Pkt):
269 def inline_response(cls, packet):
271 "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
275 payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
276 return super().response(payload)
279 class PROHIBIT_LBS(GPS303Pkt):
282 def response(self, status=1): # Server sent, 0-off, 1-on
283 return super().response(pack("B", status))
286 class MOM_PHONE(GPS303Pkt):
290 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
294 return super().response(b"")
297 class STOP_ALARM(GPS303Pkt):
301 class SETUP(GPS303Pkt):
306 uploadIntervalSeconds=0x0300,
307 binarySwitch=0b00110001,
314 phoneNumbers=["", "", ""],
317 return pack("!I", x)[1:]
321 pack("!H", uploadIntervalSeconds),
322 pack("B", binarySwitch),
324 + [pack3b(el) for el in alarms]
326 pack("B", dndTimeSwitch),
328 + [pack3b(el) for el in dndTimes]
330 pack("B", gpsTimeSwitch),
331 pack("!H", gpsTimeStart),
332 pack("!H", gpsTimeStop),
334 + [b";".join([el.encode() for el in phoneNumbers])]
336 return super().response(payload)
339 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
343 class RESTORE_PASSWORD(GPS303Pkt):
347 class WIFI_POSITIONING(_WIFI_POSITIONING):
351 def inline_response(cls, packet):
354 def response(self, lat=None, lon=None):
355 if lat is None or lon is None:
358 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
361 return self.make_packet(payload)
364 class MANUAL_POSITIONING(GPS303Pkt):
368 class BATTERY_CHARGE(GPS303Pkt):
372 class CHARGER_CONNECTED(GPS303Pkt):
376 class CHARGER_DISCONNECTED(GPS303Pkt):
380 class VIBRATION_RECEIVED(GPS303Pkt):
384 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
388 def from_packet(cls, length, payload):
389 self = super().from_packet(length, payload)
390 self.interval = unpack("!H", payload[:2])
394 def inline_response(cls, packet):
395 return cls.make_packet(packet[2:4])
398 return self.make_packet(pack("!H", self.interval))
401 class SOS_ALARM(GPS303Pkt):
405 # Build dicts protocol number -> class and class name -> protocol number
408 if True: # just to indent the code, sorry!
411 for name, cls in globals().items()
413 and issubclass(cls, GPS303Pkt)
414 and not name.startswith("_")
416 if hasattr(cls, "PROTO"):
417 CLASSES[cls.PROTO] = cls
418 PROTOS[cls.__name__] = cls.PROTO
421 def proto_by_name(name):
422 return PROTOS.get(name, -1)
425 def proto_of_message(packet):
426 return unpack("B", packet[1:2])[0]
429 def inline_response(packet):
430 proto = proto_of_message(packet)
432 return CLASSES[proto].inline_response(packet)
437 def make_object(length, proto, payload):
439 return CLASSES[proto].from_packet(length, payload)
441 retobj = UNKNOWN.from_packet(length, payload)
442 retobj.PROTO = proto # Override class attr with object attr
446 def parse_message(packet):
447 length, proto = unpack("BB", packet[:2])
449 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
451 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
453 and len(payload) + adjust != length
456 "With proto %d length is %d but payload length is %d+%d",
462 return make_object(length, proto, payload)
465 def handle_packet(packet): # DEPRECATED
466 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
467 return UNKNOWN.from_packet(len(packet), packet)
468 return parse_message(packet[2:-2])
471 def make_response(msg, **kwargs): # DEPRECATED
472 inframe = msg.response(**kwargs)
473 return None if inframe is None else b"xx" + inframe + b"\r\n"