2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
46 "SYNCHRONOUS_WHITELIST",
52 "CHARGER_DISCONNECTED",
54 "POSITION_UPLOAD_INTERVAL",
57 log = getLogger("gps303")
61 IN = 0 # Incoming, no response needed
62 INLINE = 2 # Birirectional, use `inline_response()`
63 EXT = 3 # Birirectional, use external responder
64 OUT = 4 # Outgoing, should not appear on input
69 DIR = Dir.IN # Do not send anything back by default
71 def __init__(self, *args, **kwargs):
73 for k, v in kwargs.items():
77 return "{}({})".format(
78 self.__class__.__name__,
82 'bytes.fromhex("{}")'.format(v.hex())
83 if isinstance(v, bytes)
86 for k, v in self.__dict__.items()
87 if not k.startswith("_")
92 def from_packet(cls, length, payload):
93 return cls(payload=payload, length=length)
96 return pack("BB", self.length, self.PROTO) + self.payload
99 def make_packet(cls, payload):
100 assert isinstance(payload, bytes)
101 length = len(payload) + 1 # plus proto byte
104 return pack("BB", length, cls.PROTO) + payload
107 def inline_response(cls, packet):
108 if cls.DIR is Dir.INLINE:
109 return cls.make_packet(b"")
114 class UNKNOWN(GPS303Pkt):
115 PROTO = 256 # > 255 is impossible in real packets
118 class LOGIN(GPS303Pkt):
121 # Default response for ACK, can also respond with STOP_UPLOAD
124 def from_packet(cls, length, payload):
125 self = super().from_packet(length, payload)
126 self.imei = payload[:-1].hex()
127 self.ver = unpack("B", payload[-1:])[0]
131 class SUPERVISION(GPS303Pkt):
136 def response(cls, status=0):
137 # 1: The device automatically answers Pickup effect
138 # 2: Automatically Answering Two-way Calls
139 # 3: Ring manually answer the two-way call
140 return cls.make_packet(pack("B", status))
143 class HEARTBEAT(GPS303Pkt):
148 class _GPS_POSITIONING(GPS303Pkt):
152 def from_packet(cls, length, payload):
153 self = super().from_packet(length, payload)
154 self.dtime = payload[:6]
155 if self.dtime == b"\0\0\0\0\0\0":
158 self.devtime = datetime(
159 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
161 self.gps_data_length = payload[6] >> 4
162 self.gps_nb_sat = payload[6] & 0x0F
163 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
164 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
165 flip_lon = bool(flags & 0b0000100000000000) # bit 4
166 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
167 self.heading = flags & 0b0000001111111111 # bits 6 - last
168 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
169 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
175 def inline_response(cls, packet):
176 tup = datetime.utcnow().timetuple()
177 ttup = (tup[0] % 100,) + tup[1:6]
178 return cls.make_packet(pack("BBBBBB", *ttup))
181 class GPS_POSITIONING(_GPS_POSITIONING):
185 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
189 class STATUS(GPS303Pkt):
194 def from_packet(cls, length, payload):
195 self = super().from_packet(length, payload)
196 if len(payload) == 5:
203 ) = unpack("BBBBB", payload)
204 elif len(payload) == 4:
205 self.batt, self.ver, self.timezone, self.intvl = unpack(
212 def response(cls, upload_interval=25): # Set interval in minutes
213 return cls.make_packet(pack("B", upload_interval))
216 class HIBERNATION(GPS303Pkt):
221 def response(cls): # Server can send to send devicee to sleep
222 return cls.make_packet(b"")
225 class RESET(GPS303Pkt): # Device sends when it got reset SMS
229 def response(cls): # Server can send to initiate factory reset
230 return cls.make_packet(b"")
233 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
238 def response(cls, number=3): # Number of whitelist entries
239 return cls.make_packet(pack("B", number))
242 class _WIFI_POSITIONING(GPS303Pkt):
244 def from_packet(cls, length, payload):
245 self = super().from_packet(length, payload)
246 self.dtime = payload[:6]
247 if self.dtime == b"\0\0\0\0\0\0":
250 self.devtime = datetime.strptime(
251 self.dtime.hex(), "%y%m%d%H%M%S"
252 ).astimezone(tz=timezone.utc)
254 for i in range(self.length): # length has special meaning here
255 slice = payload[6 + i * 7 : 13 + i * 7]
256 self.wifi_aps.append(
257 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
259 gsm_slice = payload[6 + self.length * 7 :]
260 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
262 for i in range(ncells):
263 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
264 locac, cellid, sigstr = unpack(
265 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
267 self.gsm_cells.append((locac, cellid, -sigstr))
271 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
276 def inline_response(cls, packet):
277 return cls.make_packet(
278 bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
282 class TIME(GPS303Pkt):
287 def inline_response(cls, packet):
288 return cls.make_packet(
289 pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
293 class PROHIBIT_LBS(GPS303Pkt):
298 def response(cls, status=1): # Server sent, 0-off, 1-on
299 return cls.make_packet(pack("B", status))
302 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
308 # Data is in packed decimal
310 # 00/01 - Don't set / Set upload period
311 # HHMMHHMM - Upload period
313 # 00/01 - Don't set / Set time of boot
314 # HHMM - Time of boot
315 # 00/01 - Don't set / Set time of shutdown
316 # HHMM - Time of shutdown
317 return cls.make_packet(b"") # TODO
320 class _SET_PHONE(GPS303Pkt):
324 def response(cls, phone):
325 return cls.make_packet(phone.encode())
328 class REMOTE_MONITOR_PHONE(_SET_PHONE):
332 class SOS_PHONE(_SET_PHONE):
336 class DAD_PHONE(_SET_PHONE):
340 class MOM_PHONE(_SET_PHONE):
344 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
350 return cls.make_packet(b"")
353 class GPS_OFF_PERIOD(GPS303Pkt):
358 def response(cls, onoff=0, fm="0000", to="2359"):
359 return cls.make_packet(
360 pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
364 class DND_PERIOD(GPS303Pkt):
370 cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
372 return cls.make_packet(
382 class RESTART_SHUTDOWN(GPS303Pkt):
387 def response(cls, flag=2):
390 return cls.make_packet(pack("B", flag))
393 class DEVICE(GPS303Pkt):
398 def response(cls, flag=0):
399 # 0 - Stop looking for equipment
400 # 1 - Start looking for equipment
401 return cls.make_packet(pack("B", flag))
404 class ALARM_CLOCK(GPS303Pkt):
409 def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
411 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
415 class STOP_ALARM(GPS303Pkt):
419 def from_packet(cls, length, payload):
420 self = super().from_packet(length, payload)
421 self.flag = payload[0]
425 class SETUP(GPS303Pkt):
432 uploadintervalseconds=0x0300,
433 binaryswitch=0b00110001,
440 phonenumbers=["", "", ""],
443 return pack("!I", x)[1:]
447 pack("!H", uploadintervalseconds),
448 pack("B", binaryswitch),
450 + [pack3b(el) for el in alarms]
452 pack("B", dndtimeswitch),
454 + [pack3b(el) for el in dndtimes]
456 pack("B", gpstimeswitch),
457 pack("!H", gpstimestart),
458 pack("!H", gpstimestop),
460 + [b";".join([el.encode() for el in phonenumbers])]
462 return cls.make_packet(payload)
465 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
469 class RESTORE_PASSWORD(GPS303Pkt):
473 class WIFI_POSITIONING(_WIFI_POSITIONING):
478 def response(cls, lat=None, lon=None):
479 if lat is None or lon is None:
482 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
485 return cls.make_packet(payload)
488 class MANUAL_POSITIONING(GPS303Pkt):
493 def from_packet(cls, length, payload):
494 self = super().from_packet(length, payload)
495 self.flag = payload[0] if len(payload) > 0 else None
500 4: "LBS search > 3 times",
501 5: "Same LBS and WiFi data",
502 6: "LBS prohibited, WiFi absent",
503 7: "GPS spacing < 50 m",
504 }.get(self.flag, "Unknown")
509 return cls.make_packet(b"")
512 class BATTERY_CHARGE(GPS303Pkt):
516 class CHARGER_CONNECTED(GPS303Pkt):
520 class CHARGER_DISCONNECTED(GPS303Pkt):
524 class VIBRATION_RECEIVED(GPS303Pkt):
528 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
533 def from_packet(cls, length, payload):
534 self = super().from_packet(length, payload)
535 self.interval = unpack("!H", payload[:2])
539 def response(cls, interval=10):
540 return cls.make_packet(pack("!H", interval))
543 class SOS_ALARM(GPS303Pkt):
547 # Build dicts protocol number -> class and class name -> protocol number
550 if True: # just to indent the code, sorry!
553 for name, cls in globals().items()
555 and issubclass(cls, GPS303Pkt)
556 and not name.startswith("_")
558 if hasattr(cls, "PROTO"):
559 CLASSES[cls.PROTO] = cls
560 PROTOS[cls.__name__] = cls.PROTO
563 def class_by_prefix(prefix):
566 for name, proto in PROTOS.items()
567 if name.upper().startswith(prefix.upper())
572 return CLASSES[proto]
575 def proto_by_name(name):
576 return PROTOS.get(name, -1)
579 def proto_of_message(packet):
580 return unpack("B", packet[1:2])[0]
583 def inline_response(packet):
584 proto = proto_of_message(packet)
586 return CLASSES[proto].inline_response(packet)
591 def make_object(length, proto, payload):
593 return CLASSES[proto].from_packet(length, payload)
595 retobj = UNKNOWN.from_packet(length, payload)
596 retobj.PROTO = proto # Override class attr with object attr
600 def parse_message(packet):
601 length, proto = unpack("BB", packet[:2])
603 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
605 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
607 and len(payload) + adjust != length
610 "With proto %d length is %d but payload length is %d+%d",
616 return make_object(length, proto, payload)