2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
63 def __init__(self, *args, **kwargs):
65 for k, v in kwargs.items():
69 return "{}({})".format(
70 self.__class__.__name__,
74 'bytes.fromhex("{}")'.format(v.hex())
75 if isinstance(v, bytes)
78 for k, v in self.__dict__.items()
79 if not k.startswith("_")
84 def from_packet(cls, length, payload):
85 return cls(payload=payload, length=length)
88 return pack("BB", self.length, self.PROTO) + self.payload
91 def make_packet(cls, payload):
92 assert isinstance(payload, bytes)
93 length = len(payload) + 1
96 return pack("BB", length, cls.PROTO) + payload
99 def inline_response(cls, packet):
101 return cls.make_packet(b"")
106 class UNKNOWN(GPS303Pkt):
107 PROTO = 256 # > 255 is impossible in real packets
111 class LOGIN(GPS303Pkt):
115 def from_packet(cls, length, payload):
116 self = super().from_packet(length, payload)
117 self.imei = payload[:-1].hex()
118 self.ver = unpack("B", payload[-1:])[0]
122 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
126 def response(self, supnum=0):
127 # 1: The device automatically answers Pickup effect
128 # 2: Automatically Answering Two-way Calls
129 # 3: Ring manually answer the two-way call
130 return self.make_packet(pack("B", supnum))
133 class HEARTBEAT(GPS303Pkt):
137 class _GPS_POSITIONING(GPS303Pkt):
139 def from_packet(cls, length, payload):
140 self = super().from_packet(length, payload)
141 self.dtime = payload[:6]
142 if self.dtime == b"\0\0\0\0\0\0":
145 self.devtime = datetime(
146 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
148 self.gps_data_length = payload[6] >> 4
149 self.gps_nb_sat = payload[6] & 0x0F
150 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
152 flip_lon = bool(flags & 0b0000100000000000) # bit 4
153 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
154 self.heading = flags & 0b0000001111111111 # bits 6 - last
155 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
162 def inline_response(cls, packet):
163 return cls.make_packet(packet[2:8])
166 class GPS_POSITIONING(_GPS_POSITIONING):
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
174 class STATUS(GPS303Pkt):
179 def from_packet(cls, length, payload):
180 self = super().from_packet(length, payload)
181 if len(payload) == 5:
188 ) = unpack("BBBBB", payload)
189 elif len(payload) == 4:
190 self.batt, self.ver, self.timezone, self.intvl = unpack(
196 def response(self, upload_interval=25): # Set interval in minutes
197 return self.make_packet(pack("B", upload_interval))
200 class HIBERNATION(GPS303Pkt):
204 class RESET(GPS303Pkt): # Device sends when it got reset SMS
208 def response(self): # Server can send to initiate factory reset
209 return self.make_packet(b"")
212 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
216 def response(self, number=3): # Number of whitelist entries
217 return self.make_packet(pack("B", number))
220 class _WIFI_POSITIONING(GPS303Pkt):
222 def from_packet(cls, length, payload):
223 self = super().from_packet(length, payload)
224 self.dtime = payload[:6]
225 if self.dtime == b"\0\0\0\0\0\0":
228 self.devtime = datetime.strptime(
229 self.dtime.hex(), "%y%m%d%H%M%S"
230 ).astimezone(tz=timezone.utc)
232 for i in range(self.length): # length has special meaning here
233 slice = payload[6 + i * 7 : 13 + i * 7]
234 self.wifi_aps.append(
235 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
237 gsm_slice = payload[6 + self.length * 7 :]
238 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
240 for i in range(ncells):
241 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242 locac, cellid, sigstr = unpack(
243 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
245 self.gsm_cells.append((locac, cellid, -sigstr))
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
253 def inline_response(cls, packet):
254 return cls.make_packet(packet[2:8])
257 class TIME(GPS303Pkt):
261 def inline_response(cls, packet):
263 "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
267 class PROHIBIT_LBS(GPS303Pkt):
271 def response(self, status=1): # Server sent, 0-off, 1-on
272 return self.make_packet(pack("B", status))
275 class LBS_SWITCH_TIMES(GPS303Pkt):
280 return self.make_packet(b"")
283 class REMOTE_MONITOR_PHONE(GPS303Pkt):
288 class SOS_PHONE(GPS303Pkt):
293 class DAD_PHONE(GPS303Pkt):
298 class MOM_PHONE(GPS303Pkt):
303 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
307 class STOP_ALARM(GPS303Pkt):
311 class SETUP(GPS303Pkt):
317 uploadIntervalSeconds=0x0300,
318 binarySwitch=0b00110001,
325 phoneNumbers=["", "", ""],
328 return pack("!I", x)[1:]
332 pack("!H", uploadIntervalSeconds),
333 pack("B", binarySwitch),
335 + [pack3b(el) for el in alarms]
337 pack("B", dndTimeSwitch),
339 + [pack3b(el) for el in dndTimes]
341 pack("B", gpsTimeSwitch),
342 pack("!H", gpsTimeStart),
343 pack("!H", gpsTimeStop),
345 + [b";".join([el.encode() for el in phoneNumbers])]
347 return self.make_packet(payload)
350 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
354 class RESTORE_PASSWORD(GPS303Pkt):
358 class WIFI_POSITIONING(_WIFI_POSITIONING):
362 def response(self, lat=None, lon=None):
363 if lat is None or lon is None:
366 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
369 return self.make_packet(payload)
372 class MANUAL_POSITIONING(GPS303Pkt):
377 class BATTERY_CHARGE(GPS303Pkt):
381 class CHARGER_CONNECTED(GPS303Pkt):
385 class CHARGER_DISCONNECTED(GPS303Pkt):
389 class VIBRATION_RECEIVED(GPS303Pkt):
393 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
398 def from_packet(cls, length, payload):
399 self = super().from_packet(length, payload)
400 self.interval = unpack("!H", payload[:2])
403 def response(self, interval=10):
404 return self.make_packet(pack("!H", interval))
407 class SOS_ALARM(GPS303Pkt):
411 # Build dicts protocol number -> class and class name -> protocol number
414 if True: # just to indent the code, sorry!
417 for name, cls in globals().items()
419 and issubclass(cls, GPS303Pkt)
420 and not name.startswith("_")
422 if hasattr(cls, "PROTO"):
423 CLASSES[cls.PROTO] = cls
424 PROTOS[cls.__name__] = cls.PROTO
427 def proto_by_name(name):
428 return PROTOS.get(name, -1)
431 def proto_of_message(packet):
432 return unpack("B", packet[1:2])[0]
435 def inline_response(packet):
436 proto = proto_of_message(packet)
438 return CLASSES[proto].inline_response(packet)
443 def make_object(length, proto, payload):
445 return CLASSES[proto].from_packet(length, payload)
447 retobj = UNKNOWN.from_packet(length, payload)
448 retobj.PROTO = proto # Override class attr with object attr
452 def parse_message(packet):
453 length, proto = unpack("BB", packet[:2])
455 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
457 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
459 and len(payload) + adjust != length
462 "With proto %d length is %d but payload length is %d+%d",
468 return make_object(length, proto, payload)
471 def handle_packet(packet): # DEPRECATED
472 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
473 return UNKNOWN.from_packet(len(packet), packet)
474 return parse_message(packet[2:-2])
477 def make_response(msg, **kwargs): # DEPRECATED
478 inframe = msg.response(**kwargs)
479 return None if inframe is None else b"xx" + inframe + b"\r\n"