2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
63 def __init__(self, *args, **kwargs):
65 for k, v in kwargs.items():
69 return "{}({})".format(
70 self.__class__.__name__,
74 'bytes.fromhex("{}")'.format(v.hex())
75 if isinstance(v, bytes)
78 for k, v in self.__dict__.items()
79 if not k.startswith("_")
84 def from_packet(cls, length, payload):
85 return cls(payload=payload, length=length)
88 return pack("BB", self.length, self.PROTO) + self.payload
91 def make_packet(cls, payload):
92 assert isinstance(payload, bytes)
93 length = len(payload) + 1
96 return pack("BB", length, cls.PROTO) + payload
99 def inline_response(cls, packet):
101 return cls.make_packet(b"")
106 class UNKNOWN(GPS303Pkt):
107 PROTO = 256 # > 255 is impossible in real packets
111 class LOGIN(GPS303Pkt):
115 def from_packet(cls, length, payload):
116 self = super().from_packet(length, payload)
117 self.imei = payload[:-1].hex()
118 self.ver = unpack("B", payload[-1:])[0]
122 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
126 def response(self, supnum=0):
127 # 1: The device automatically answers Pickup effect
128 # 2: Automatically Answering Two-way Calls
129 # 3: Ring manually answer the two-way call
130 return self.make_packet(pack("B", supnum))
133 class HEARTBEAT(GPS303Pkt):
137 class _GPS_POSITIONING(GPS303Pkt):
139 def from_packet(cls, length, payload):
140 self = super().from_packet(length, payload)
141 self.dtime = payload[:6]
142 if self.dtime == b"\0\0\0\0\0\0":
145 self.devtime = datetime(
146 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
148 self.gps_data_length = payload[6] >> 4
149 self.gps_nb_sat = payload[6] & 0x0F
150 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
152 flip_lon = bool(flags & 0b0000100000000000) # bit 4
153 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
154 self.heading = flags & 0b0000001111111111 # bits 6 - last
155 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
162 def inline_response(cls, packet):
163 return cls.make_packet(packet[2:8])
166 class GPS_POSITIONING(_GPS_POSITIONING):
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
174 class STATUS(GPS303Pkt):
179 def from_packet(cls, length, payload):
180 self = super().from_packet(length, payload)
181 if len(payload) == 5:
188 ) = unpack("BBBBB", payload)
189 elif len(payload) == 4:
190 self.batt, self.ver, self.timezone, self.intvl = unpack(
196 def response(self, upload_interval=25): # Set interval in minutes
197 return self.make_packet(pack("B", upload_interval))
200 class HIBERNATION(GPS303Pkt):
204 class RESET(GPS303Pkt): # Device sends when it got reset SMS
208 def response(self): # Server can send to initiate factory reset
209 return self.make_packet(b"")
212 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
216 def response(self, number=3): # Number of whitelist entries
217 return self.make_packet(pack("B", number))
220 class _WIFI_POSITIONING(GPS303Pkt):
222 def from_packet(cls, length, payload):
223 self = super().from_packet(length, payload)
224 self.dtime = payload[:6]
225 if self.dtime == b"\0\0\0\0\0\0":
228 self.devtime = datetime.strptime(
229 self.dtime.hex(), "%y%m%d%H%M%S"
230 ).astimezone(tz=timezone.utc)
232 for i in range(self.length): # length has special meaning here
233 slice = payload[6 + i * 7 : 13 + i * 7]
234 self.wifi_aps.append(
235 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
237 gsm_slice = payload[6 + self.length * 7 :]
238 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
240 for i in range(ncells):
241 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242 locac, cellid, sigstr = unpack(
243 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
245 self.gsm_cells.append((locac, cellid, -sigstr))
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
253 def inline_response(cls, packet):
254 return cls.make_packet(packet[2:8])
257 class TIME(GPS303Pkt):
261 def inline_response(cls, packet):
263 "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
267 class PROHIBIT_LBS(GPS303Pkt):
271 def response(self, status=1): # Server sent, 0-off, 1-on
272 return self.make_packet(pack("B", status))
275 class MOM_PHONE(GPS303Pkt):
279 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
283 class STOP_ALARM(GPS303Pkt):
287 class SETUP(GPS303Pkt):
293 uploadIntervalSeconds=0x0300,
294 binarySwitch=0b00110001,
301 phoneNumbers=["", "", ""],
304 return pack("!I", x)[1:]
308 pack("!H", uploadIntervalSeconds),
309 pack("B", binarySwitch),
311 + [pack3b(el) for el in alarms]
313 pack("B", dndTimeSwitch),
315 + [pack3b(el) for el in dndTimes]
317 pack("B", gpsTimeSwitch),
318 pack("!H", gpsTimeStart),
319 pack("!H", gpsTimeStop),
321 + [b";".join([el.encode() for el in phoneNumbers])]
323 return self.make_packet(payload)
326 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
330 class RESTORE_PASSWORD(GPS303Pkt):
334 class WIFI_POSITIONING(_WIFI_POSITIONING):
338 def response(self, lat=None, lon=None):
339 if lat is None or lon is None:
342 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
345 return self.make_packet(payload)
348 class MANUAL_POSITIONING(GPS303Pkt):
352 class BATTERY_CHARGE(GPS303Pkt):
356 class CHARGER_CONNECTED(GPS303Pkt):
360 class CHARGER_DISCONNECTED(GPS303Pkt):
364 class VIBRATION_RECEIVED(GPS303Pkt):
368 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
373 def from_packet(cls, length, payload):
374 self = super().from_packet(length, payload)
375 self.interval = unpack("!H", payload[:2])
378 def response(self, interval=10):
379 return self.make_packet(pack("!H", interval))
382 class SOS_ALARM(GPS303Pkt):
386 # Build dicts protocol number -> class and class name -> protocol number
389 if True: # just to indent the code, sorry!
392 for name, cls in globals().items()
394 and issubclass(cls, GPS303Pkt)
395 and not name.startswith("_")
397 if hasattr(cls, "PROTO"):
398 CLASSES[cls.PROTO] = cls
399 PROTOS[cls.__name__] = cls.PROTO
402 def proto_by_name(name):
403 return PROTOS.get(name, -1)
406 def proto_of_message(packet):
407 return unpack("B", packet[1:2])[0]
410 def inline_response(packet):
411 proto = proto_of_message(packet)
413 return CLASSES[proto].inline_response(packet)
418 def make_object(length, proto, payload):
420 return CLASSES[proto].from_packet(length, payload)
422 retobj = UNKNOWN.from_packet(length, payload)
423 retobj.PROTO = proto # Override class attr with object attr
427 def parse_message(packet):
428 length, proto = unpack("BB", packet[:2])
430 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
432 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
434 and len(payload) + adjust != length
437 "With proto %d length is %d but payload length is %d+%d",
443 return make_object(length, proto, payload)
446 def handle_packet(packet): # DEPRECATED
447 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
448 return UNKNOWN.from_packet(len(packet), packet)
449 return parse_message(packet[2:-2])
452 def make_response(msg, **kwargs): # DEPRECATED
453 inframe = msg.response(**kwargs)
454 return None if inframe is None else b"xx" + inframe + b"\r\n"