2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
63 def __init__(self, *args, **kwargs):
65 for k, v in kwargs.items():
69 return "{}({})".format(
70 self.__class__.__name__,
74 'bytes.fromhex("{}")'.format(v.hex())
75 if isinstance(v, bytes)
78 for k, v in self.__dict__.items()
79 if not k.startswith("_")
84 def from_packet(cls, length, payload):
85 return cls(payload=payload, length=length)
88 return pack("BB", self.length, self.PROTO) + self.payload
90 def response(self, *args):
93 assert len(args) == 1 and isinstance(args[0], bytes)
95 length = len(payload) + 1
98 return pack("BB", length, self.PROTO) + payload
101 class UNKNOWN(GPS303Pkt):
102 PROTO = 256 # > 255 is impossible in real packets
105 class LOGIN(GPS303Pkt):
109 def from_packet(cls, length, payload):
110 self = super().from_packet(length, payload)
111 self.imei = payload[:-1].hex()
112 self.ver = unpack("B", payload[-1:])[0]
116 return super().response(b"")
119 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
122 def response(self, supnum=0):
123 # 1: The device automatically answers Pickup effect
124 # 2: Automatically Answering Two-way Calls
125 # 3: Ring manually answer the two-way call
126 return super().response(b"")
129 class HEARTBEAT(GPS303Pkt):
133 class _GPS_POSITIONING(GPS303Pkt):
135 def from_packet(cls, length, payload):
136 self = super().from_packet(length, payload)
137 self.dtime = payload[:6]
138 if self.dtime == b"\0\0\0\0\0\0":
141 self.devtime = datetime(
142 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
144 self.gps_data_length = payload[6] >> 4
145 self.gps_nb_sat = payload[6] & 0x0F
146 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
147 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
148 flip_lon = bool(flags & 0b0000100000000000) # bit 4
149 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
150 self.heading = flags & 0b0000001111111111 # bits 6 - last
151 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
152 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
158 return super().response(self.dtime)
161 class GPS_POSITIONING(_GPS_POSITIONING):
165 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
169 class STATUS(GPS303Pkt):
173 def from_packet(cls, length, payload):
174 self = super().from_packet(length, payload)
175 if len(payload) == 5:
182 ) = unpack("BBBBB", payload)
183 elif len(payload) == 4:
184 self.batt, self.ver, self.timezone, self.intvl = unpack(
190 def response(self, upload_interval=25): # Set interval in minutes
191 return super().response(pack("B", upload_interval))
194 class HIBERNATION(GPS303Pkt):
198 class RESET(GPS303Pkt): # Device sends when it got reset SMS
201 def response(self): # Server can send to initiate factory reset
202 return super().response(b"")
205 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
208 def response(self, number=3): # Number of whitelist entries
209 return super().response(pack("B", number))
212 class _WIFI_POSITIONING(GPS303Pkt):
214 def from_packet(cls, length, payload):
215 self = super().from_packet(length, payload)
216 self.dtime = payload[:6]
217 if self.dtime == b"\0\0\0\0\0\0":
220 self.devtime = datetime.strptime(
221 self.dtime.hex(), "%y%m%d%H%M%S"
222 ).astimezone(tz=timezone.utc)
224 for i in range(self.length): # length has special meaning here
225 slice = payload[6 + i * 7 : 13 + i * 7]
226 self.wifi_aps.append(
227 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
229 gsm_slice = payload[6 + self.length * 7 :]
230 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
232 for i in range(ncells):
233 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
234 locac, cellid, sigstr = unpack(
235 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
237 self.gsm_cells.append((locac, cellid, -sigstr))
241 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
245 return super().response(self.dtime)
248 class TIME(GPS303Pkt):
252 payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
253 return super().response(payload)
256 class PROHIBIT_LBS(GPS303Pkt):
259 def response(self, status=1): # Server sent, 0-off, 1-on
260 return super().response(pack("B", status))
263 class MOM_PHONE(GPS303Pkt):
267 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
271 return super().response(b"")
274 class STOP_ALARM(GPS303Pkt):
278 class SETUP(GPS303Pkt):
283 uploadIntervalSeconds=0x0300,
284 binarySwitch=0b00110001,
291 phoneNumbers=["", "", ""],
294 return pack("!I", x)[1:]
298 pack("!H", uploadIntervalSeconds),
299 pack("B", binarySwitch),
301 + [pack3b(el) for el in alarms]
303 pack("B", dndTimeSwitch),
305 + [pack3b(el) for el in dndTimes]
307 pack("B", gpsTimeSwitch),
308 pack("!H", gpsTimeStart),
309 pack("!H", gpsTimeStop),
311 + [b";".join([el.encode() for el in phoneNumbers])]
313 return super().response(payload)
316 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
320 class RESTORE_PASSWORD(GPS303Pkt):
324 class WIFI_POSITIONING(_WIFI_POSITIONING):
327 def response(self, lat=None, lon=None):
328 if lat is None or lon is None:
331 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
334 return super().response(payload)
337 class MANUAL_POSITIONING(GPS303Pkt):
341 class BATTERY_CHARGE(GPS303Pkt):
345 class CHARGER_CONNECTED(GPS303Pkt):
349 class CHARGER_DISCONNECTED(GPS303Pkt):
353 class VIBRATION_RECEIVED(GPS303Pkt):
357 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
361 def from_packet(cls, length, payload):
362 self = super().from_packet(length, payload)
363 self.interval = unpack("!H", payload[:2])
367 return super().response(pack("!H", self.interval))
370 class SOS_ALARM(GPS303Pkt):
374 # Build dicts protocol number -> class and class name -> protocol number
377 if True: # just to indent the code, sorry!
380 for name, cls in globals().items()
382 and issubclass(cls, GPS303Pkt)
383 and not name.startswith("_")
385 if hasattr(cls, "PROTO"):
386 CLASSES[cls.PROTO] = cls
387 PROTOS[cls.__name__] = cls.PROTO
390 def proto_by_name(name):
391 return PROTOS.get(name, -1)
394 def proto_of_message(packet):
395 return unpack("B", packet[1:2])
398 def make_object(length, proto, payload):
400 return CLASSES[proto].from_packet(length, payload)
402 retobj = UNKNOWN.from_packet(length, payload)
403 retobj.PROTO = proto # Override class attr with object attr
407 def parse_message(packet):
408 length, proto = unpack("BB", packet[:2])
410 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
413 not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
415 and len(payload) + adjust != length
418 "With proto %d length is %d but payload length is %d+%d",
424 return make_object(length, proto, payload)
427 def handle_packet(packet): # DEPRECATED
428 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
429 return UNKNOWN.from_packet(len(packet), packet)
430 return parse_message(packet[2:-2])
433 def make_response(msg, **kwargs): # DEPRECATED
434 inframe = msg.response(**kwargs)
435 return None if inframe is None else b"xx" + inframe + b"\r\n"
438 def set_config(config): # Note that we are setting _class_ attribute
439 GPS303Pkt.CONFIG = config