2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
122 "KWARGS": newcls.IN_KWARGS,
123 "decode": newcls.in_decode,
124 "encode": newcls.in_encode,
127 newcls.Out = super().__new__(
132 "KWARGS": newcls.OUT_KWARGS,
133 "decode": newcls.out_decode,
134 "encode": newcls.out_encode,
141 NON = 0 # Incoming, no response needed
142 INL = 1 # Birirectional, use `inline_response()`
143 EXT = 2 # Birirectional, use external responder
146 class GPS303Pkt(metaclass=MetaPkt):
147 RESPOND = Respond.NON # Do not send anything back by default
152 def __init__(self, *args, **kwargs):
154 Construct the object _either_ from (length, payload),
155 _or_ from the values of individual fields
157 assert not args or (len(args) == 2 and not kwargs)
158 if args: # guaranteed to be two arguments at this point
159 self.length, self.payload = args
160 self.decode(self.length, self.payload)
162 for kw, typ, dfl in self.KWARGS:
163 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
166 self.__class__.__name__ + " stray kwargs " + str(kwargs)
170 return "{}({})".format(
171 self.__class__.__name__,
175 'bytes.fromhex("{}")'.format(v.hex())
176 if isinstance(v, bytes)
179 for k, v in self.__dict__.items()
180 if not k.startswith("_")
184 def in_decode(self, length, packet):
185 # Overridden in subclasses, otherwise do not decode payload
188 def out_decode(self, length, packet):
189 # Overridden in subclasses, otherwise do not decode payload
193 # Necessary to emulate terminal, which is not implemented
194 raise NotImplementedError(
195 self.__class__.__name__ + ".encode() not implemented"
198 def out_encode(self):
199 # Overridden in subclasses, otherwise make empty payload
204 payload = self.encode()
205 length = len(payload) + 1
206 return pack("BB", length, self.PROTO) + payload
209 class UNKNOWN(GPS303Pkt):
210 PROTO = 256 # > 255 is impossible in real packets
213 class LOGIN(GPS303Pkt):
215 RESPOND = Respond.INL
216 # Default response for ACK, can also respond with STOP_UPLOAD
218 def in_decode(self, length, payload):
219 self.imei = payload[:-1].hex()
220 self.ver = unpack("B", payload[-1:])[0]
224 class SUPERVISION(GPS303Pkt):
226 OUT_KWARGS = (("status", int, 1),)
228 def out_encode(self):
229 # 1: The device automatically answers Pickup effect
230 # 2: Automatically Answering Two-way Calls
231 # 3: Ring manually answer the two-way call
232 return pack("B", self.status)
235 class HEARTBEAT(GPS303Pkt):
237 RESPOND = Respond.INL
240 class _GPS_POSITIONING(GPS303Pkt):
241 RESPOND = Respond.INL
243 def in_decode(self, length, payload):
244 self.dtime = payload[:6]
245 if self.dtime == b"\0\0\0\0\0\0":
248 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249 self.devtime = datetime(
250 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
252 self.gps_data_length = payload[6] >> 4
253 self.gps_nb_sat = payload[6] & 0x0F
254 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
256 flip_lon = bool(flags & 0b0000100000000000) # bit 4
257 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
258 self.heading = flags & 0b0000001111111111 # bits 6 - last
259 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
265 def out_encode(self):
266 tup = datetime.utcnow().timetuple()
267 ttup = (tup[0] % 100,) + tup[1:6]
268 return pack("BBBBBB", *ttup)
271 class GPS_POSITIONING(_GPS_POSITIONING):
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
279 class STATUS(GPS303Pkt):
281 RESPOND = Respond.EXT
282 OUT_KWARGS = (("upload_interval", int, 25),)
284 def in_decode(self, length, payload):
285 self.batt, self.ver, self.timezone, self.intvl = unpack(
289 self.signal = payload[4]
294 def out_encode(self): # Set interval in minutes
295 return pack("B", self.upload_interval)
298 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
300 RESPOND = Respond.INL
303 class RESET(GPS303Pkt):
304 # Device sends when it got reset SMS
305 # Server can send to initiate factory reset
309 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
311 OUT_KWARGS = (("number", int, 3),)
313 def out_encode(self): # Number of whitelist entries
314 return pack("B", number)
317 class _WIFI_POSITIONING(GPS303Pkt):
318 def in_decode(self, length, payload):
319 self.dtime = payload[:6]
320 if self.dtime == b"\0\0\0\0\0\0":
323 self.devtime = datetime.strptime(
324 self.dtime.hex(), "%y%m%d%H%M%S"
325 ).astimezone(tz=timezone.utc)
327 for i in range(self.length): # length has special meaning here
328 slice = payload[6 + i * 7 : 13 + i * 7]
329 self.wifi_aps.append(
330 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
332 gsm_slice = payload[6 + self.length * 7 :]
333 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
335 for i in range(ncells):
336 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
337 locac, cellid, sigstr = unpack(
338 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
340 self.gsm_cells.append((locac, cellid, -sigstr))
344 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
346 RESPOND = Respond.INL
348 def out_encode(self):
349 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
352 class TIME(GPS303Pkt):
354 RESPOND = Respond.INL
356 def out_encode(self):
357 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
360 class PROHIBIT_LBS(GPS303Pkt):
362 OUT_KWARGS = (("status", int, 1),)
364 def out_encode(self): # Server sent, 0-off, 1-on
365 return pack("B", self.status)
368 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
371 # Data is in packed decimal
373 # 00/01 - Don't set / Set upload period
374 # HHMMHHMM - Upload period
376 # 00/01 - Don't set / Set time of boot
377 # HHMM - Time of boot
378 # 00/01 - Don't set / Set time of shutdown
379 # HHMM - Time of shutdown
380 def out_encode(self):
384 class _SET_PHONE(GPS303Pkt):
385 OUT_KWARGS = (("phone", str, ""),)
387 def out_encode(self):
388 return self.phone.encode()
391 class REMOTE_MONITOR_PHONE(_SET_PHONE):
395 class SOS_PHONE(_SET_PHONE):
399 class DAD_PHONE(_SET_PHONE):
403 class MOM_PHONE(_SET_PHONE):
407 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
411 class GPS_OFF_PERIOD(GPS303Pkt):
415 ("fm", hhmm, "0000"),
416 ("to", hhmm, "2359"),
419 def out_encode(self):
421 pack("B", self.onoff)
422 + bytes.fromhex(self.fm)
423 + bytes.fromhex(self.to)
427 class DND_PERIOD(GPS303Pkt):
432 ("fm1", hhmm, "0000"),
433 ("to1", hhmm, "2359"),
434 ("fm2", hhmm, "0000"),
435 ("to2", hhmm, "2359"),
438 def out_endode(self):
440 pack("B", self.onoff)
441 + pack("B", self.week)
442 + bytes.fromhex(self.fm1)
443 + bytes.fromhex(self.to1)
444 + bytes.fromhex(self.fm2)
445 + bytes.fromhex(self.to2)
449 class RESTART_SHUTDOWN(GPS303Pkt):
451 OUT_KWARGS = (("flag", int, 0),)
453 def out_encode(self):
456 return pack("B", self.flag)
459 class DEVICE(GPS303Pkt):
461 OUT_KWARGS = (("flag", int, 0),)
463 # 0 - Stop looking for equipment
464 # 1 - Start looking for equipment
465 def out_encode(self):
466 return pack("B", self.flag)
469 class ALARM_CLOCK(GPS303Pkt):
472 def out_encode(self):
473 # TODO implement parsing kwargs
474 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
476 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
480 class STOP_ALARM(GPS303Pkt):
483 def in_decode(self, length, payload):
484 self.flag = payload[0]
488 class SETUP(GPS303Pkt):
490 RESPOND = Respond.EXT
492 ("uploadintervalseconds", intx, 0x0300),
493 ("binaryswitch", intx, 0b00110001),
494 ("alarms", l3int, [0, 0, 0]),
495 ("dndtimeswitch", int, 0),
496 ("dndtimes", l3int, [0, 0, 0]),
497 ("gpstimeswitch", int, 0),
498 ("gpstimestart", int, 0),
499 ("gpstimestop", int, 0),
500 ("phonenumbers", l3str, ["", "", ""]),
503 def out_encode(self):
505 return pack("!I", x)[1:]
509 pack("!H", self.uploadintervalseconds),
510 pack("B", self.binaryswitch),
512 + [pack3b(el) for el in self.alarms]
514 pack("B", self.dndtimeswitch),
516 + [pack3b(el) for el in self.dndtimes]
518 pack("B", self.gpstimeswitch),
519 pack("!H", self.gpstimestart),
520 pack("!H", self.gpstimestop),
522 + [b";".join([el.encode() for el in self.phonenumbers])]
526 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
530 class RESTORE_PASSWORD(GPS303Pkt):
534 class WIFI_POSITIONING(_WIFI_POSITIONING):
536 RESPOND = Respond.EXT
537 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
539 def out_encode(self):
540 if self.latitude is None or self.longitude is None:
542 return "{:+#010.8g},{:+#010.8g}".format(
543 self.latitude, self.longitude
546 def out_decode(self, length, payload):
547 lat, lon = payload.decode().split(",")
548 self.latitude = float(lat)
549 self.longitude = float(lon)
552 class MANUAL_POSITIONING(GPS303Pkt):
555 def in_decode(self, length, payload):
556 self.flag = payload[0] if len(payload) > 0 else None
561 4: "LBS search > 3 times",
562 5: "Same LBS and WiFi data",
563 6: "LBS prohibited, WiFi absent",
564 7: "GPS spacing < 50 m",
565 }.get(self.flag, "Unknown")
569 class BATTERY_CHARGE(GPS303Pkt):
573 class CHARGER_CONNECTED(GPS303Pkt):
577 class CHARGER_DISCONNECTED(GPS303Pkt):
581 class VIBRATION_RECEIVED(GPS303Pkt):
585 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
587 RESPOND = Respond.EXT
588 OUT_KWARGS = (("interval", int, 10),)
590 def in_decode(self, length, payload):
591 self.interval = unpack("!H", payload[:2])
594 def out_encode(self):
595 return pack("!H", interval)
598 class SOS_ALARM(GPS303Pkt):
602 class UNKNOWN_B3(GPS303Pkt):
604 IN_KWARGS = (("asciidata", str, ""),)
606 def in_decode(self, length, payload):
607 self.asciidata = payload.decode()
611 # Build dicts protocol number -> class and class name -> protocol number
614 if True: # just to indent the code, sorry!
617 for name, cls in globals().items()
619 and issubclass(cls, GPS303Pkt)
620 and not name.startswith("_")
622 if hasattr(cls, "PROTO"):
623 CLASSES[cls.PROTO] = cls
624 PROTOS[cls.__name__] = cls.PROTO
627 def class_by_prefix(prefix):
630 for name, proto in PROTOS.items()
631 if name.upper().startswith(prefix.upper())
636 return CLASSES[proto]
639 def proto_by_name(name):
640 return PROTOS.get(name, -1)
643 def proto_of_message(packet):
644 return unpack("B", packet[1:2])[0]
647 def inline_response(packet):
648 proto = proto_of_message(packet)
651 if cls.RESPOND is Respond.INL:
652 return cls.Out().packed
656 def parse_message(packet, is_incoming=True):
657 """From a packet (without framing bytes) derive the XXX.In object"""
658 length, proto = unpack("BB", packet[:2])
662 return CLASSES[proto].In(length, payload)
664 return CLASSES[proto].Out(length, payload)
666 retobj = UNKNOWN.In(length, payload)
667 retobj.PROTO = proto # Override class attr with object attr