2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
46 "SYNCHRONOUS_WHITELIST",
52 "CHARGER_DISCONNECTED",
54 "POSITION_UPLOAD_INTERVAL",
57 log = getLogger("gps303")
61 IN = 0 # Incoming, no response needed
62 INLINE = 2 # Birirectional, use `inline_response()`
63 EXT = 3 # Birirectional, use external responder
64 OUT = 4 # Outgoing, should not appear on input
69 DIR = Dir.INLINE # Most packets anticipate simple acknowledgement
71 def __init__(self, *args, **kwargs):
73 for k, v in kwargs.items():
77 return "{}({})".format(
78 self.__class__.__name__,
82 'bytes.fromhex("{}")'.format(v.hex())
83 if isinstance(v, bytes)
86 for k, v in self.__dict__.items()
87 if not k.startswith("_")
92 def from_packet(cls, length, payload):
93 return cls(payload=payload, length=length)
96 return pack("BB", self.length, self.PROTO) + self.payload
99 def make_packet(cls, payload):
100 assert isinstance(payload, bytes)
101 length = len(payload) + 1 # plus proto byte
104 return pack("BB", length, cls.PROTO) + payload
107 def inline_response(cls, packet):
108 if cls.DIR is Dir.INLINE:
109 return cls.make_packet(b"")
114 class UNKNOWN(GPS303Pkt):
115 PROTO = 256 # > 255 is impossible in real packets
119 class LOGIN(GPS303Pkt):
121 # Default response for ACK, can also respond with STOP_UPLOAD
124 def from_packet(cls, length, payload):
125 self = super().from_packet(length, payload)
126 self.imei = payload[:-1].hex()
127 self.ver = unpack("B", payload[-1:])[0]
131 class SUPERVISION(GPS303Pkt):
135 def response(self, status=0):
136 # 1: The device automatically answers Pickup effect
137 # 2: Automatically Answering Two-way Calls
138 # 3: Ring manually answer the two-way call
139 return self.make_packet(pack("B", status))
142 class HEARTBEAT(GPS303Pkt):
146 class _GPS_POSITIONING(GPS303Pkt):
148 def from_packet(cls, length, payload):
149 self = super().from_packet(length, payload)
150 self.dtime = payload[:6]
151 if self.dtime == b"\0\0\0\0\0\0":
154 self.devtime = datetime(
155 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
157 self.gps_data_length = payload[6] >> 4
158 self.gps_nb_sat = payload[6] & 0x0F
159 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
160 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
161 flip_lon = bool(flags & 0b0000100000000000) # bit 4
162 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
163 self.heading = flags & 0b0000001111111111 # bits 6 - last
164 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
165 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
171 def inline_response(cls, packet):
172 tup = datetime.utcnow().timetuple()
173 ttup = (tup[0] % 100,) + tup[1:6]
174 return cls.make_packet(pack("BBBBBB", *ttup))
177 class GPS_POSITIONING(_GPS_POSITIONING):
181 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
185 class STATUS(GPS303Pkt):
190 def from_packet(cls, length, payload):
191 self = super().from_packet(length, payload)
192 if len(payload) == 5:
199 ) = unpack("BBBBB", payload)
200 elif len(payload) == 4:
201 self.batt, self.ver, self.timezone, self.intvl = unpack(
207 def response(self, upload_interval=25): # Set interval in minutes
208 return self.make_packet(pack("B", upload_interval))
211 class HIBERNATION(GPS303Pkt):
215 def response(self): # Server can send to send devicee to sleep
216 return self.make_packet(b"")
219 class RESET(GPS303Pkt): # Device sends when it got reset SMS
223 def response(self): # Server can send to initiate factory reset
224 return self.make_packet(b"")
227 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
231 def response(self, number=3): # Number of whitelist entries
232 return self.make_packet(pack("B", number))
235 class _WIFI_POSITIONING(GPS303Pkt):
237 def from_packet(cls, length, payload):
238 self = super().from_packet(length, payload)
239 self.dtime = payload[:6]
240 if self.dtime == b"\0\0\0\0\0\0":
243 self.devtime = datetime.strptime(
244 self.dtime.hex(), "%y%m%d%H%M%S"
245 ).astimezone(tz=timezone.utc)
247 for i in range(self.length): # length has special meaning here
248 slice = payload[6 + i * 7 : 13 + i * 7]
249 self.wifi_aps.append(
250 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
252 gsm_slice = payload[6 + self.length * 7 :]
253 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
255 for i in range(ncells):
256 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
257 locac, cellid, sigstr = unpack(
258 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
260 self.gsm_cells.append((locac, cellid, -sigstr))
264 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
268 def inline_response(cls, packet):
269 return cls.make_packet(
270 bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
274 class TIME(GPS303Pkt):
278 def inline_response(cls, packet):
279 return cls.make_packet(
280 pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
284 class PROHIBIT_LBS(GPS303Pkt):
288 def response(self, status=1): # Server sent, 0-off, 1-on
289 return self.make_packet(pack("B", status))
292 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
297 # Data is in packed decimal
299 # 00/01 - Don't set / Set upload period
300 # HHMMHHMM - Upload period
302 # 00/01 - Don't set / Set time of boot
303 # HHMM - Time of boot
304 # 00/01 - Don't set / Set time of shutdown
305 # HHMM - Time of shutdown
306 return self.make_packet(b"") # TODO
309 class _SET_PHONE(GPS303Pkt):
312 def response(self, phone):
313 return self.make_packet(phone.encode())
316 class REMOTE_MONITOR_PHONE(_SET_PHONE):
320 class SOS_PHONE(_SET_PHONE):
324 class DAD_PHONE(_SET_PHONE):
328 class MOM_PHONE(_SET_PHONE):
332 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
337 return self.make_packet(b"")
340 class GPS_OFF_PERIOD(GPS303Pkt):
344 def response(self, onoff=0, fm="0000", to="2359"):
345 return self.make_packet(
346 pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
350 class DND_PERIOD(GPS303Pkt):
355 self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
357 return self.make_packet(
367 class RESTART_SHUTDOWN(GPS303Pkt):
371 def response(self, flag=0):
374 return self.make_packet(pack("B", flag))
377 class DEVICE(GPS303Pkt):
381 def response(self, flag=0):
382 # 0 - Stop looking for equipment
383 # 1 - Start looking for equipment
384 return self.make_packet(pack("B", flag))
387 class ALARM_CLOCK(GPS303Pkt):
391 def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
393 pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
397 class STOP_ALARM(GPS303Pkt):
401 def from_packet(cls, length, payload):
402 self = super().from_packet(length, payload)
403 self.flag = payload[0]
406 class SETUP(GPS303Pkt):
412 uploadintervalseconds=0x0300,
413 binaryswitch=0b00110001,
420 phonenumbers=["", "", ""],
423 return pack("!I", x)[1:]
427 pack("!H", uploadintervalseconds),
428 pack("B", binaryswitch),
430 + [pack3b(el) for el in alarms]
432 pack("B", dndtimeswitch),
434 + [pack3b(el) for el in dndtimes]
436 pack("B", gpstimeswitch),
437 pack("!H", gpstimestart),
438 pack("!H", gpstimestop),
440 + [b";".join([el.encode() for el in phonenumbers])]
442 return self.make_packet(payload)
445 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
449 class RESTORE_PASSWORD(GPS303Pkt):
453 class WIFI_POSITIONING(_WIFI_POSITIONING):
457 def response(self, lat=None, lon=None):
458 if lat is None or lon is None:
461 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
464 return self.make_packet(payload)
467 class MANUAL_POSITIONING(GPS303Pkt):
472 def from_packet(cls, length, payload):
473 self = super().from_packet(length, payload)
474 self.flag = payload[0]
479 4: "LBS search > 3 times",
480 5: "Same LBS and WiFi data",
481 6: "LBS prohibited, WiFi absent",
482 7: "GPS spacing < 50 m",
483 }.get(self.flag, "Unknown")
486 return self.make_packet(b"")
489 class BATTERY_CHARGE(GPS303Pkt):
493 class CHARGER_CONNECTED(GPS303Pkt):
497 class CHARGER_DISCONNECTED(GPS303Pkt):
501 class VIBRATION_RECEIVED(GPS303Pkt):
505 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
510 def from_packet(cls, length, payload):
511 self = super().from_packet(length, payload)
512 self.interval = unpack("!H", payload[:2])
515 def response(self, interval=10):
516 return self.make_packet(pack("!H", interval))
519 class SOS_ALARM(GPS303Pkt):
523 # Build dicts protocol number -> class and class name -> protocol number
526 if True: # just to indent the code, sorry!
529 for name, cls in globals().items()
531 and issubclass(cls, GPS303Pkt)
532 and not name.startswith("_")
534 if hasattr(cls, "PROTO"):
535 CLASSES[cls.PROTO] = cls
536 PROTOS[cls.__name__] = cls.PROTO
539 def proto_by_name(name):
540 return PROTOS.get(name, -1)
543 def proto_of_message(packet):
544 return unpack("B", packet[1:2])[0]
547 def inline_response(packet):
548 proto = proto_of_message(packet)
550 return CLASSES[proto].inline_response(packet)
555 def make_object(length, proto, payload):
557 return CLASSES[proto].from_packet(length, payload)
559 retobj = UNKNOWN.from_packet(length, payload)
560 retobj.PROTO = proto # Override class attr with object attr
564 def parse_message(packet):
565 length, proto = unpack("BB", packet[:2])
567 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
569 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
571 and len(payload) + adjust != length
574 "With proto %d length is %d but payload length is %d+%d",
580 return make_object(length, proto, payload)
583 def handle_packet(packet): # DEPRECATED
584 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
585 return UNKNOWN.from_packet(len(packet), packet)
586 return parse_message(packet[2:-2])
589 def make_response(msg, **kwargs): # DEPRECATED
590 inframe = msg.response(**kwargs)
591 return None if inframe is None else b"xx" + inframe + b"\r\n"