2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
48 "GPS_OFFLINE_POSITIONING",
53 "WIFI_OFFLINE_POSITIONING",
56 "GPS_LBS_SWITCH_TIMES",
57 "REMOTE_MONITOR_PHONE",
69 "SYNCHRONOUS_WHITELIST",
75 "CHARGER_DISCONNECTED",
77 "POSITION_UPLOAD_INTERVAL",
88 def __init__(self) -> None:
92 def enframe(buffer: bytes) -> bytes:
93 return b"xx" + buffer + b"\r\n"
95 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
97 Process next segment of the stream. Return successfully deframed
98 packets as `bytes` and error messages as `str`.
101 self.buffer += segment
102 if len(self.buffer) > MAXBUFFER:
103 # We are receiving junk. Let's drop it or we run out of memory.
105 return [f"More than {MAXBUFFER} unparseable data, dropping"]
106 msgs: List[Union[bytes, str]] = []
108 framestart = self.buffer.find(b"xx")
109 if framestart == -1: # No frames, return whatever we have
111 if framestart > 0: # Should not happen, report
113 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
115 self.buffer = self.buffer[framestart:]
116 # At this point, buffer starts with a packet
117 if len(self.buffer) < 6: # no len and proto - cannot proceed
119 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
121 # Length field can legitimeely be much less than the
122 # length of the packet (e.g. WiFi positioning), but
123 # it _should not_ be greater. Still sometimes it is.
124 # Luckily, not by too much: by maybe two or three bytes?
125 # Do this embarrassing hack to avoid accidental match
126 # of some binary data in the packet against '\r\n'.
128 frameend = self.buffer.find(b"\r\n", frameend + 1)
129 if frameend == -1 or frameend >= (
131 ): # Found realistic match or none
133 if frameend == -1: # Incomplete frame, return what we have
135 packet = self.buffer[2:frameend]
136 self.buffer = self.buffer[frameend + 2 :]
137 if len(packet) < 2: # frameend comes too early
138 msgs.append(f"Packet too short: {packet.hex()}")
143 def close(self) -> bytes:
149 ### Parser/Constructor ###
152 class DecodeError(Exception):
153 def __init__(self, e: Exception, **kwargs: Any) -> None:
155 for k, v in kwargs.items():
159 def maybe(typ: type) -> Callable[[Any], Any]:
160 return lambda x: None if x is None else typ(x)
163 def intx(x: Union[str, int]) -> int:
164 if isinstance(x, str):
169 def boolx(x: Union[str, bool]) -> bool:
170 if isinstance(x, str):
171 if x.upper() in ("ON", "TRUE", "1"):
173 if x.upper() in ("OFF", "FALSE", "0"):
175 raise ValueError(str(x) + " could not be parsed as a Boolean")
179 def hhmm(x: str) -> str:
180 """Check for the string that represents hours and minutes"""
181 if not isinstance(x, str) or len(x) != 4:
182 raise ValueError(str(x) + " is not a four-character string")
185 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
186 raise ValueError(str(x) + " does not contain valid hours and minutes")
190 def hhmmhhmm(x: str) -> str:
191 """Check for the string that represents hours and minutes twice"""
192 if not isinstance(x, str) or len(x) != 8:
193 raise ValueError(str(x) + " is not an eight-character string")
194 return hhmm(x[:4]) + hhmm(x[4:])
197 def l3str(x: Union[str, List[str]]) -> List[str]:
198 if isinstance(x, str):
202 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
203 raise ValueError(str(lx) + " is not a list of three strings")
207 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
208 def alrmspec(sub: str) -> Tuple[int, str]:
210 raise ValueError(sub + " does not represent day and time")
224 if isinstance(x, str):
225 lx = [alrmspec(sub) for sub in x.split(",")]
228 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
229 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
230 raise ValueError(str(lx) + " is a wrong alarms specification")
231 return [(d, hhmm(tm)) for d, tm in lx]
234 def l3int(x: Union[str, List[int]]) -> List[int]:
235 if isinstance(x, str):
236 lx = [int(el) for el in x.split(",")]
239 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
240 raise ValueError(str(lx) + " is not a list of three integers")
246 For each class corresponding to a message, automatically create
247 two nested classes `In` and `Out` that also inherit from their
248 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
249 copied to the `In` nested class under the name `KWARGS`, and
250 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
251 to the nested class `Out`. In addition, method `encode` is
252 defined in both classes equal to `in_encode()` and `out_encode()`
258 def __getattr__(self, name: str) -> Any:
261 def __setattr__(self, name: str, value: Any) -> None:
265 cls: Type["MetaPkt"],
267 bases: Tuple[type, ...],
268 attrs: Dict[str, Any],
270 newcls = super().__new__(cls, name, bases, attrs)
271 newcls.In = super().__new__(
276 "KWARGS": newcls.IN_KWARGS,
277 "decode": newcls.in_decode,
278 "encode": newcls.in_encode,
281 newcls.Out = super().__new__(
286 "KWARGS": newcls.OUT_KWARGS,
287 "decode": newcls.out_decode,
288 "encode": newcls.out_encode,
295 NON = 0 # Incoming, no response needed
296 INL = 1 # Birirectional, use `inline_response()`
297 EXT = 2 # Birirectional, use external responder
300 class GPS303Pkt(metaclass=MetaPkt):
301 RESPOND = Respond.NON # Do not send anything back by default
303 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
304 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
305 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306 In: Type["GPS303Pkt"]
307 Out: Type["GPS303Pkt"]
311 def __getattr__(self, name: str) -> Any:
314 def __setattr__(self, name: str, value: Any) -> None:
317 def __init__(self, *args: Any, **kwargs: Any):
319 Construct the object _either_ from (length, payload),
320 _or_ from the values of individual fields
322 assert not args or (len(args) == 2 and not kwargs)
323 if args: # guaranteed to be two arguments at this point
324 self.length, self.payload = args
326 self.decode(self.length, self.payload)
328 raise DecodeError(e, obj=self)
330 for kw, typ, dfl in self.KWARGS:
331 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
334 self.__class__.__name__ + " stray kwargs " + str(kwargs)
337 def __repr__(self) -> str:
338 return "{}({})".format(
339 self.__class__.__name__,
343 'bytes.fromhex("{}")'.format(v.hex())
344 if isinstance(v, bytes)
347 for k, v in self.__dict__.items()
348 if not k.startswith("_")
352 decode: Callable[["GPS303Pkt", int, bytes], None]
354 def in_decode(self, length: int, packet: bytes) -> None:
355 # Overridden in subclasses, otherwise do not decode payload
358 def out_decode(self, length: int, packet: bytes) -> None:
359 # Overridden in subclasses, otherwise do not decode payload
362 encode: Callable[["GPS303Pkt"], bytes]
364 def in_encode(self) -> bytes:
365 # Necessary to emulate terminal, which is not implemented
366 raise NotImplementedError(
367 self.__class__.__name__ + ".encode() not implemented"
370 def out_encode(self) -> bytes:
371 # Overridden in subclasses, otherwise make empty payload
375 def packed(self) -> bytes:
376 payload = self.encode()
377 length = getattr(self, "length", len(payload) + 1)
378 return pack("BB", length, self.PROTO) + payload
381 class UNKNOWN(GPS303Pkt):
382 PROTO = 256 # > 255 is impossible in real packets
385 class LOGIN(GPS303Pkt):
387 RESPOND = Respond.INL
388 # Default response for ACK, can also respond with STOP_UPLOAD
389 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
391 def in_decode(self, length: int, payload: bytes) -> None:
392 self.imei = payload[:8].ljust(8, b"\0").hex()
393 self.ver = payload[8]
395 def in_encode(self) -> bytes:
396 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
401 class SUPERVISION(GPS303Pkt):
403 OUT_KWARGS = (("status", int, 1),)
405 def out_encode(self) -> bytes:
406 # 1: The device automatically answers Pickup effect
407 # 2: Automatically Answering Two-way Calls
408 # 3: Ring manually answer the two-way call
409 return pack("B", self.status)
412 class HEARTBEAT(GPS303Pkt):
414 RESPOND = Respond.INL
417 class _GPS_POSITIONING(GPS303Pkt):
418 RESPOND = Respond.INL
420 def in_decode(self, length: int, payload: bytes) -> None:
421 self.dtime = payload[:6]
422 if self.dtime == b"\0\0\0\0\0\0":
425 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
426 self.devtime = datetime(
427 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
429 self.gps_data_length = payload[6] >> 4
430 self.gps_nb_sat = payload[6] & 0x0F
431 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
432 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
433 flip_lon = bool(flags & 0b0000100000000000) # bit 4
434 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
435 self.heading = flags & 0b0000001111111111 # bits 6 - last
436 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
437 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
441 def out_encode(self) -> bytes:
442 tup = datetime.utcnow().timetuple()
443 ttup = (tup[0] % 100,) + tup[1:6]
444 return pack("BBBBBB", *ttup)
447 class GPS_POSITIONING(_GPS_POSITIONING):
451 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
455 class STATUS(GPS303Pkt):
457 RESPOND = Respond.EXT
461 ("timezone", int, 0),
463 ("signal", maybe(int), None),
465 OUT_KWARGS = (("upload_interval", int, 25),)
467 def in_decode(self, length: int, payload: bytes) -> None:
468 self.batt, self.ver, self.timezone, self.intvl = unpack(
472 self.signal: Optional[int] = payload[4]
476 def in_encode(self) -> bytes:
477 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
478 b"" if self.signal is None else pack("B", self.signal)
481 def out_encode(self) -> bytes: # Set interval in minutes
482 return pack("B", self.upload_interval)
485 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
488 def in_encode(self) -> bytes:
492 class RESET(GPS303Pkt):
493 # Device sends when it got reset SMS
494 # Server can send to initiate factory reset
498 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
500 OUT_KWARGS = (("number", int, 3),)
502 def out_encode(self) -> bytes: # Number of whitelist entries
503 return pack("B", self.number)
506 class _WIFI_POSITIONING(GPS303Pkt):
507 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
509 ("dtime", bytes, b"\0\0\0\0\0\0"),
510 ("wifi_aps", list, []),
513 ("gsm_cells", list, []),
516 def in_decode(self, length: int, payload: bytes) -> None:
517 self.dtime = payload[:6]
518 if self.dtime == b"\0\0\0\0\0\0":
521 self.devtime = datetime.strptime(
522 self.dtime.hex(), "%y%m%d%H%M%S"
523 ).astimezone(tz=timezone.utc)
525 for i in range(self.length): # length has special meaning here
526 slice = payload[6 + i * 7 : 13 + i * 7]
527 self.wifi_aps.append(
528 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
530 gsm_slice = payload[6 + self.length * 7 :]
531 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
533 for i in range(ncells):
534 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
535 locac, cellid, sigstr = unpack(
536 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
538 self.gsm_cells.append((locac, cellid, -sigstr))
540 def in_encode(self) -> bytes:
541 self.length = len(self.wifi_aps)
547 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
549 for mac, sigstr in self.wifi_aps
552 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
555 pack("!HHB", locac, cellid, -sigstr)
556 for locac, cellid, sigstr in self.gsm_cells
563 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
565 RESPOND = Respond.INL
567 def out_encode(self) -> bytes:
568 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
571 class TIME(GPS303Pkt):
573 RESPOND = Respond.INL
575 def out_encode(self) -> bytes:
576 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
579 class PROHIBIT_LBS(GPS303Pkt):
581 OUT_KWARGS = (("status", int, 1),)
583 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
584 return pack("B", self.status)
587 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
591 ("gps_off", boolx, False), # Clarify the meaning of 0/1
592 ("gps_interval_set", boolx, False),
593 ("gps_interval", hhmmhhmm, "00000000"),
594 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
595 ("boot_time_set", boolx, False),
596 ("boot_time", hhmm, "0000"),
597 ("shut_time_set", boolx, False),
598 ("shut_time", hhmm, "0000"),
601 def out_encode(self) -> bytes:
603 pack("B", self.gps_off)
604 + pack("B", self.gps_interval_set)
605 + bytes.fromhex(self.gps_interval)
606 + pack("B", self.lbs_off)
607 + pack("B", self.boot_time_set)
608 + bytes.fromhex(self.boot_time)
609 + pack("B", self.shut_time_set)
610 + bytes.fromhex(self.shut_time)
614 class _SET_PHONE(GPS303Pkt):
615 OUT_KWARGS = (("phone", str, ""),)
617 def out_encode(self) -> bytes:
619 return self.phone.encode("")
622 class REMOTE_MONITOR_PHONE(_SET_PHONE):
626 class SOS_PHONE(_SET_PHONE):
630 class DAD_PHONE(_SET_PHONE):
634 class MOM_PHONE(_SET_PHONE):
638 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
642 class GPS_OFF_PERIOD(GPS303Pkt):
646 ("fm", hhmm, "0000"),
647 ("to", hhmm, "2359"),
650 def out_encode(self) -> bytes:
652 pack("B", self.onoff)
653 + bytes.fromhex(self.fm)
654 + bytes.fromhex(self.to)
658 class DND_PERIOD(GPS303Pkt):
663 ("fm1", hhmm, "0000"),
664 ("to1", hhmm, "2359"),
665 ("fm2", hhmm, "0000"),
666 ("to2", hhmm, "2359"),
669 def out_encode(self) -> bytes:
671 pack("B", self.onoff)
672 + pack("B", self.week)
673 + bytes.fromhex(self.fm1)
674 + bytes.fromhex(self.to1)
675 + bytes.fromhex(self.fm2)
676 + bytes.fromhex(self.to2)
680 class RESTART_SHUTDOWN(GPS303Pkt):
682 OUT_KWARGS = (("flag", int, 0),)
684 def out_encode(self) -> bytes:
687 return pack("B", self.flag)
690 class DEVICE(GPS303Pkt):
692 OUT_KWARGS = (("flag", int, 0),)
694 # 0 - Stop looking for equipment
695 # 1 - Start looking for equipment
696 def out_encode(self) -> bytes:
697 return pack("B", self.flag)
700 class ALARM_CLOCK(GPS303Pkt):
703 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
705 ("alarms", l3alarms, []),
708 def out_encode(self) -> bytes:
710 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
714 class STOP_ALARM(GPS303Pkt):
717 def in_decode(self, length: int, payload: bytes) -> None:
718 self.flag = payload[0]
721 class SETUP(GPS303Pkt):
723 RESPOND = Respond.EXT
725 ("uploadintervalseconds", intx, 0x0300),
726 ("binaryswitch", intx, 0b00110001),
727 ("alarms", l3int, [0, 0, 0]),
728 ("dndtimeswitch", int, 0),
729 ("dndtimes", l3int, [0, 0, 0]),
730 ("gpstimeswitch", int, 0),
731 ("gpstimestart", int, 0),
732 ("gpstimestop", int, 0),
733 ("phonenumbers", l3str, ["", "", ""]),
736 def out_encode(self) -> bytes:
737 def pack3b(x: int) -> bytes:
738 return pack("!I", x)[1:]
742 pack("!H", self.uploadintervalseconds),
743 pack("B", self.binaryswitch),
745 + [pack3b(el) for el in self.alarms]
747 pack("B", self.dndtimeswitch),
749 + [pack3b(el) for el in self.dndtimes]
751 pack("B", self.gpstimeswitch),
752 pack("!H", self.gpstimestart),
753 pack("!H", self.gpstimestop),
755 + [b";".join([el.encode() for el in self.phonenumbers])]
758 def in_encode(self) -> bytes:
762 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
766 class RESTORE_PASSWORD(GPS303Pkt):
770 class WIFI_POSITIONING(_WIFI_POSITIONING):
772 RESPOND = Respond.EXT
773 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
775 def out_encode(self) -> bytes:
776 if self.latitude is None or self.longitude is None:
778 return "{:+#010.8g},{:+#010.8g}".format(
779 self.latitude, self.longitude
782 def out_decode(self, length: int, payload: bytes) -> None:
783 lat, lon = payload.decode().split(",")
784 self.latitude = float(lat)
785 self.longitude = float(lon)
788 class MANUAL_POSITIONING(GPS303Pkt):
791 def in_decode(self, length: int, payload: bytes) -> None:
792 self.flag = payload[0] if len(payload) > 0 else -1
797 4: "LBS search > 3 times",
798 5: "Same LBS and WiFi data",
799 6: "LBS prohibited, WiFi absent",
800 7: "GPS spacing < 50 m",
801 }.get(self.flag, "Unknown")
804 class BATTERY_CHARGE(GPS303Pkt):
808 class CHARGER_CONNECTED(GPS303Pkt):
812 class CHARGER_DISCONNECTED(GPS303Pkt):
816 class VIBRATION_RECEIVED(GPS303Pkt):
820 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
822 RESPOND = Respond.EXT
823 OUT_KWARGS = (("interval", int, 10),)
825 def in_decode(self, length: int, payload: bytes) -> None:
826 self.interval = unpack("!H", payload[:2])
828 def out_encode(self) -> bytes:
829 return pack("!H", self.interval)
832 class SOS_ALARM(GPS303Pkt):
836 class UNKNOWN_B3(GPS303Pkt):
838 IN_KWARGS = (("asciidata", str, ""),)
840 def in_decode(self, length: int, payload: bytes) -> None:
841 self.asciidata = payload.decode()
844 # Build dicts protocol number -> class and class name -> protocol number
847 if True: # just to indent the code, sorry!
850 for name, cls in globals().items()
852 and issubclass(cls, GPS303Pkt)
853 and not name.startswith("_")
855 if hasattr(cls, "PROTO"):
856 CLASSES[cls.PROTO] = cls
857 PROTOS[cls.__name__] = cls.PROTO
862 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
865 for name, proto in PROTOS.items()
866 if name.upper().startswith(prefix.upper())
871 return CLASSES[proto]
874 def proto_by_name(name: str) -> int:
875 return PROTOS.get(name, -1)
878 def proto_of_message(packet: bytes) -> int:
882 def imei_from_packet(packet: bytes) -> Optional[str]:
883 if proto_of_message(packet) == LOGIN.PROTO:
884 msg = parse_message(packet)
885 if isinstance(msg, LOGIN):
890 def is_goodbye_packet(packet: bytes) -> bool:
891 return proto_of_message(packet) == HIBERNATION.PROTO
894 def inline_response(packet: bytes) -> Optional[bytes]:
895 proto = proto_of_message(packet)
898 if cls.RESPOND is Respond.INL:
899 return cls.Out().packed
903 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
904 """From a packet (without framing bytes) derive the XXX.In object"""
905 length, proto = unpack("BB", packet[:2])
907 if proto not in CLASSES:
908 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
909 f"Proto {proto} is unknown"
914 return CLASSES[proto].In(length, payload)
916 return CLASSES[proto].Out(length, payload)
917 except (DecodeError, ValueError, IndexError) as e:
920 retobj = UNKNOWN.In(length, payload)
922 retobj = UNKNOWN.Out(length, payload)
923 retobj.PROTO = proto # Override class attr with object attr