2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
49 "GPS_OFFLINE_POSITIONING",
54 "WIFI_OFFLINE_POSITIONING",
57 "GPS_LBS_SWITCH_TIMES",
58 "REMOTE_MONITOR_PHONE",
70 "SYNCHRONOUS_WHITELIST",
76 "CHARGER_DISCONNECTED",
78 "POSITION_UPLOAD_INTERVAL",
89 def __init__(self) -> None:
93 def enframe(buffer: bytes) -> bytes:
94 return b"xx" + buffer + b"\r\n"
96 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
98 Process next segment of the stream. Return successfully deframed
99 packets as `bytes` and error messages as `str`.
102 self.buffer += segment
103 if len(self.buffer) > MAXBUFFER:
104 # We are receiving junk. Let's drop it or we run out of memory.
106 return [f"More than {MAXBUFFER} unparseable data, dropping"]
107 msgs: List[Union[bytes, str]] = []
109 framestart = self.buffer.find(b"xx")
110 if framestart == -1: # No frames, return whatever we have
112 if framestart > 0: # Should not happen, report
114 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
116 self.buffer = self.buffer[framestart:]
117 # At this point, buffer starts with a packet
118 if len(self.buffer) < 6: # no len and proto - cannot proceed
120 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
122 # Length field can legitimeely be much less than the
123 # length of the packet (e.g. WiFi positioning), but
124 # it _should not_ be greater. Still sometimes it is.
125 # Luckily, not by too much: by maybe two or three bytes?
126 # Do this embarrassing hack to avoid accidental match
127 # of some binary data in the packet against '\r\n'.
129 frameend = self.buffer.find(b"\r\n", frameend + 1)
130 if frameend == -1 or frameend >= (
132 ): # Found realistic match or none
134 if frameend == -1: # Incomplete frame, return what we have
136 packet = self.buffer[2:frameend]
137 self.buffer = self.buffer[frameend + 2 :]
138 if len(packet) < 2: # frameend comes too early
139 msgs.append(f"Packet too short: {packet.hex()}")
144 def close(self) -> bytes:
150 ### Parser/Constructor ###
153 class DecodeError(Exception):
154 def __init__(self, e: Exception, **kwargs: Any) -> None:
156 for k, v in kwargs.items():
160 def maybe(typ: type) -> Callable[[Any], Any]:
161 return lambda x: None if x is None else typ(x)
164 def intx(x: Union[str, int]) -> int:
165 if isinstance(x, str):
170 def boolx(x: Union[str, bool]) -> bool:
171 if isinstance(x, str):
172 if x.upper() in ("ON", "TRUE", "1"):
174 if x.upper() in ("OFF", "FALSE", "0"):
176 raise ValueError(str(x) + " could not be parsed as a Boolean")
180 def hhmm(x: str) -> str:
181 """Check for the string that represents hours and minutes"""
182 if not isinstance(x, str) or len(x) != 4:
183 raise ValueError(str(x) + " is not a four-character string")
186 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
187 raise ValueError(str(x) + " does not contain valid hours and minutes")
191 def hhmmhhmm(x: str) -> str:
192 """Check for the string that represents hours and minutes twice"""
193 if not isinstance(x, str) or len(x) != 8:
194 raise ValueError(str(x) + " is not an eight-character string")
195 return hhmm(x[:4]) + hhmm(x[4:])
198 def l3str(x: Union[str, List[str]]) -> List[str]:
199 if isinstance(x, str):
203 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
204 raise ValueError(str(lx) + " is not a list of three strings")
208 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
209 def alrmspec(sub: str) -> Tuple[int, str]:
211 raise ValueError(sub + " does not represent day and time")
225 if isinstance(x, str):
226 lx = [alrmspec(sub) for sub in x.split(",")]
229 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
230 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
231 raise ValueError(str(lx) + " is a wrong alarms specification")
232 return [(d, hhmm(tm)) for d, tm in lx]
235 def l3int(x: Union[str, List[int]]) -> List[int]:
236 if isinstance(x, str):
237 lx = [int(el) for el in x.split(",")]
240 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
241 raise ValueError(str(lx) + " is not a list of three integers")
247 For each class corresponding to a message, automatically create
248 two nested classes `In` and `Out` that also inherit from their
249 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
250 copied to the `In` nested class under the name `KWARGS`, and
251 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
252 to the nested class `Out`. In addition, method `encode` is
253 defined in both classes equal to `in_encode()` and `out_encode()`
259 def __getattr__(self, name: str) -> Any:
262 def __setattr__(self, name: str, value: Any) -> None:
266 cls: Type["MetaPkt"],
268 bases: Tuple[type, ...],
269 attrs: Dict[str, Any],
271 newcls = super().__new__(cls, name, bases, attrs)
272 newcls.In = super().__new__(
277 "KWARGS": newcls.IN_KWARGS,
278 "decode": newcls.in_decode,
279 "encode": newcls.in_encode,
282 newcls.Out = super().__new__(
287 "KWARGS": newcls.OUT_KWARGS,
288 "decode": newcls.out_decode,
289 "encode": newcls.out_encode,
296 NON = 0 # Incoming, no response needed
297 INL = 1 # Birirectional, use `inline_response()`
298 EXT = 2 # Birirectional, use external responder
301 class GPS303Pkt(metaclass=MetaPkt):
302 RESPOND = Respond.NON # Do not send anything back by default
304 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
305 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
307 In: Type["GPS303Pkt"]
308 Out: Type["GPS303Pkt"]
312 def __getattr__(self, name: str) -> Any:
315 def __setattr__(self, name: str, value: Any) -> None:
318 def __init__(self, *args: Any, **kwargs: Any):
320 Construct the object _either_ from (length, payload),
321 _or_ from the values of individual fields
323 assert not args or (len(args) == 2 and not kwargs)
324 if args: # guaranteed to be two arguments at this point
325 self.length, self.payload = args
327 self.decode(self.length, self.payload)
329 raise DecodeError(e, obj=self)
331 for kw, typ, dfl in self.KWARGS:
332 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
335 self.__class__.__name__ + " stray kwargs " + str(kwargs)
338 def __repr__(self) -> str:
339 return "{}({})".format(
340 self.__class__.__name__,
344 'bytes.fromhex("{}")'.format(v.hex())
345 if isinstance(v, bytes)
348 for k, v in self.__dict__.items()
349 if not k.startswith("_")
353 decode: Callable[["GPS303Pkt", int, bytes], None]
355 def in_decode(self, length: int, packet: bytes) -> None:
356 # Overridden in subclasses, otherwise do not decode payload
359 def out_decode(self, length: int, packet: bytes) -> None:
360 # Overridden in subclasses, otherwise do not decode payload
363 encode: Callable[["GPS303Pkt"], bytes]
365 def in_encode(self) -> bytes:
366 # Necessary to emulate terminal, which is not implemented
367 raise NotImplementedError(
368 self.__class__.__name__ + ".encode() not implemented"
371 def out_encode(self) -> bytes:
372 # Overridden in subclasses, otherwise make empty payload
376 def packed(self) -> bytes:
377 payload = self.encode()
378 length = getattr(self, "length", len(payload) + 1)
379 return pack("BB", length, self.PROTO) + payload
382 class UNKNOWN(GPS303Pkt):
383 PROTO = 256 # > 255 is impossible in real packets
386 class LOGIN(GPS303Pkt):
388 RESPOND = Respond.INL
389 # Default response for ACK, can also respond with STOP_UPLOAD
390 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
392 def in_decode(self, length: int, payload: bytes) -> None:
393 self.imei = payload[:8].ljust(8, b"\0").hex()
394 self.ver = payload[8]
396 def in_encode(self) -> bytes:
397 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
402 class SUPERVISION(GPS303Pkt):
404 OUT_KWARGS = (("status", int, 1),)
406 def out_encode(self) -> bytes:
407 # 1: The device automatically answers Pickup effect
408 # 2: Automatically Answering Two-way Calls
409 # 3: Ring manually answer the two-way call
410 return pack("B", self.status)
413 class HEARTBEAT(GPS303Pkt):
415 RESPOND = Respond.INL
418 class _GPS_POSITIONING(GPS303Pkt):
419 RESPOND = Respond.INL
421 def in_decode(self, length: int, payload: bytes) -> None:
422 self.dtime = payload[:6]
423 if self.dtime == b"\0\0\0\0\0\0":
426 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
427 self.devtime = datetime(
428 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
430 self.gps_data_length = payload[6] >> 4
431 self.gps_nb_sat = payload[6] & 0x0F
432 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
433 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
434 flip_lon = bool(flags & 0b0000100000000000) # bit 4
435 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
436 self.heading = flags & 0b0000001111111111 # bits 6 - last
437 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
438 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
442 def out_encode(self) -> bytes:
443 tup = datetime.utcnow().timetuple()
444 ttup = (tup[0] % 100,) + tup[1:6]
445 return pack("BBBBBB", *ttup)
448 class GPS_POSITIONING(_GPS_POSITIONING):
452 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
456 class STATUS(GPS303Pkt):
458 RESPOND = Respond.EXT
462 ("timezone", int, 0),
464 ("signal", maybe(int), None),
466 OUT_KWARGS = (("upload_interval", int, 25),)
468 def in_decode(self, length: int, payload: bytes) -> None:
469 self.batt, self.ver, self.timezone, self.intvl = unpack(
473 self.signal: Optional[int] = payload[4]
477 def in_encode(self) -> bytes:
478 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
479 b"" if self.signal is None else pack("B", self.signal)
482 def out_encode(self) -> bytes: # Set interval in minutes
483 return pack("B", self.upload_interval)
486 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
489 def in_encode(self) -> bytes:
493 class RESET(GPS303Pkt):
494 # Device sends when it got reset SMS
495 # Server can send to initiate factory reset
499 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
501 OUT_KWARGS = (("number", int, 3),)
503 def out_encode(self) -> bytes: # Number of whitelist entries
504 return pack("B", self.number)
507 class _WIFI_POSITIONING(GPS303Pkt):
508 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
510 ("dtime", bytes, b"\0\0\0\0\0\0"),
511 ("wifi_aps", list, []),
514 ("gsm_cells", list, []),
517 def in_decode(self, length: int, payload: bytes) -> None:
518 self.dtime = payload[:6]
519 if self.dtime == b"\0\0\0\0\0\0":
522 self.devtime = datetime.strptime(
523 self.dtime.hex(), "%y%m%d%H%M%S"
524 ).astimezone(tz=timezone.utc)
526 for i in range(self.length): # length has special meaning here
527 slice = payload[6 + i * 7 : 13 + i * 7]
528 self.wifi_aps.append(
529 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
531 gsm_slice = payload[6 + self.length * 7 :]
532 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
534 for i in range(ncells):
535 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
536 locac, cellid, sigstr = unpack(
537 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
539 self.gsm_cells.append((locac, cellid, -sigstr))
541 def in_encode(self) -> bytes:
542 self.length = len(self.wifi_aps)
548 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
550 for mac, sigstr in self.wifi_aps
553 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
556 pack("!HHB", locac, cellid, -sigstr)
557 for locac, cellid, sigstr in self.gsm_cells
564 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
566 RESPOND = Respond.INL
568 def out_encode(self) -> bytes:
569 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
572 class TIME(GPS303Pkt):
574 RESPOND = Respond.INL
576 def out_encode(self) -> bytes:
577 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
580 class PROHIBIT_LBS(GPS303Pkt):
582 OUT_KWARGS = (("status", int, 1),)
584 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
585 return pack("B", self.status)
588 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
592 ("gps_off", boolx, False), # Clarify the meaning of 0/1
593 ("gps_interval_set", boolx, False),
594 ("gps_interval", hhmmhhmm, "00000000"),
595 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
596 ("boot_time_set", boolx, False),
597 ("boot_time", hhmm, "0000"),
598 ("shut_time_set", boolx, False),
599 ("shut_time", hhmm, "0000"),
602 def out_encode(self) -> bytes:
604 pack("B", self.gps_off)
605 + pack("B", self.gps_interval_set)
606 + bytes.fromhex(self.gps_interval)
607 + pack("B", self.lbs_off)
608 + pack("B", self.boot_time_set)
609 + bytes.fromhex(self.boot_time)
610 + pack("B", self.shut_time_set)
611 + bytes.fromhex(self.shut_time)
615 class _SET_PHONE(GPS303Pkt):
616 OUT_KWARGS = (("phone", str, ""),)
618 def out_encode(self) -> bytes:
620 return self.phone.encode("")
623 class REMOTE_MONITOR_PHONE(_SET_PHONE):
627 class SOS_PHONE(_SET_PHONE):
631 class DAD_PHONE(_SET_PHONE):
635 class MOM_PHONE(_SET_PHONE):
639 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
643 class GPS_OFF_PERIOD(GPS303Pkt):
647 ("fm", hhmm, "0000"),
648 ("to", hhmm, "2359"),
651 def out_encode(self) -> bytes:
653 pack("B", self.onoff)
654 + bytes.fromhex(self.fm)
655 + bytes.fromhex(self.to)
659 class DND_PERIOD(GPS303Pkt):
664 ("fm1", hhmm, "0000"),
665 ("to1", hhmm, "2359"),
666 ("fm2", hhmm, "0000"),
667 ("to2", hhmm, "2359"),
670 def out_encode(self) -> bytes:
672 pack("B", self.onoff)
673 + pack("B", self.week)
674 + bytes.fromhex(self.fm1)
675 + bytes.fromhex(self.to1)
676 + bytes.fromhex(self.fm2)
677 + bytes.fromhex(self.to2)
681 class RESTART_SHUTDOWN(GPS303Pkt):
683 OUT_KWARGS = (("flag", int, 0),)
685 def out_encode(self) -> bytes:
688 return pack("B", self.flag)
691 class DEVICE(GPS303Pkt):
693 OUT_KWARGS = (("flag", int, 0),)
695 # 0 - Stop looking for equipment
696 # 1 - Start looking for equipment
697 def out_encode(self) -> bytes:
698 return pack("B", self.flag)
701 class ALARM_CLOCK(GPS303Pkt):
704 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
706 ("alarms", l3alarms, []),
709 def out_encode(self) -> bytes:
711 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
715 class STOP_ALARM(GPS303Pkt):
718 def in_decode(self, length: int, payload: bytes) -> None:
719 self.flag = payload[0]
722 class SETUP(GPS303Pkt):
724 RESPOND = Respond.EXT
726 ("uploadintervalseconds", intx, 0x0300),
727 ("binaryswitch", intx, 0b00110001),
728 ("alarms", l3int, [0, 0, 0]),
729 ("dndtimeswitch", int, 0),
730 ("dndtimes", l3int, [0, 0, 0]),
731 ("gpstimeswitch", int, 0),
732 ("gpstimestart", int, 0),
733 ("gpstimestop", int, 0),
734 ("phonenumbers", l3str, ["", "", ""]),
737 def out_encode(self) -> bytes:
738 def pack3b(x: int) -> bytes:
739 return pack("!I", x)[1:]
743 pack("!H", self.uploadintervalseconds),
744 pack("B", self.binaryswitch),
746 + [pack3b(el) for el in self.alarms]
748 pack("B", self.dndtimeswitch),
750 + [pack3b(el) for el in self.dndtimes]
752 pack("B", self.gpstimeswitch),
753 pack("!H", self.gpstimestart),
754 pack("!H", self.gpstimestop),
756 + [b";".join([el.encode() for el in self.phonenumbers])]
759 def in_encode(self) -> bytes:
763 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
767 class RESTORE_PASSWORD(GPS303Pkt):
771 class WIFI_POSITIONING(_WIFI_POSITIONING):
773 RESPOND = Respond.EXT
774 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
776 def out_encode(self) -> bytes:
777 if self.latitude is None or self.longitude is None:
779 return "{:+#010.8g},{:+#010.8g}".format(
780 self.latitude, self.longitude
783 def out_decode(self, length: int, payload: bytes) -> None:
784 lat, lon = payload.decode().split(",")
785 self.latitude = float(lat)
786 self.longitude = float(lon)
789 class MANUAL_POSITIONING(GPS303Pkt):
792 def in_decode(self, length: int, payload: bytes) -> None:
793 self.flag = payload[0] if len(payload) > 0 else -1
798 4: "LBS search > 3 times",
799 5: "Same LBS and WiFi data",
800 6: "LBS prohibited, WiFi absent",
801 7: "GPS spacing < 50 m",
802 }.get(self.flag, "Unknown")
805 class BATTERY_CHARGE(GPS303Pkt):
809 class CHARGER_CONNECTED(GPS303Pkt):
813 class CHARGER_DISCONNECTED(GPS303Pkt):
817 class VIBRATION_RECEIVED(GPS303Pkt):
821 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
823 RESPOND = Respond.EXT
824 OUT_KWARGS = (("interval", int, 10),)
826 def in_decode(self, length: int, payload: bytes) -> None:
827 self.interval = unpack("!H", payload[:2])
829 def out_encode(self) -> bytes:
830 return pack("!H", self.interval)
833 class SOS_ALARM(GPS303Pkt):
837 class UNKNOWN_B3(GPS303Pkt):
839 IN_KWARGS = (("asciidata", str, ""),)
841 def in_decode(self, length: int, payload: bytes) -> None:
842 self.asciidata = payload.decode()
845 # Build dicts protocol number -> class and class name -> protocol number
848 if True: # just to indent the code, sorry!
851 for name, cls in globals().items()
853 and issubclass(cls, GPS303Pkt)
854 and not name.startswith("_")
856 if hasattr(cls, "PROTO"):
857 CLASSES[cls.PROTO] = cls
858 PROTOS[cls.__name__] = cls.PROTO
863 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
866 for name, proto in PROTOS.items()
867 if name.upper().startswith(prefix.upper())
872 return CLASSES[proto]
875 def proto_by_name(name: str) -> int:
876 return PROTOS.get(name, -1)
879 def proto_of_message(packet: bytes) -> int:
883 def imei_from_packet(packet: bytes) -> Optional[str]:
884 if proto_of_message(packet) == LOGIN.PROTO:
885 msg = parse_message(packet)
886 if isinstance(msg, LOGIN):
891 def is_goodbye_packet(packet: bytes) -> bool:
892 return proto_of_message(packet) == HIBERNATION.PROTO
895 def inline_response(packet: bytes) -> Optional[bytes]:
896 proto = proto_of_message(packet)
899 if cls.RESPOND is Respond.INL:
900 return cls.Out().packed
904 def probe_buffer(buffer: bytes) -> bool:
905 framestart = buffer.find(b"xx")
908 if len(buffer) - framestart < 6:
913 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
914 """From a packet (without framing bytes) derive the XXX.In object"""
915 length, proto = unpack("BB", packet[:2])
917 if proto not in CLASSES:
918 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
919 f"Proto {proto} is unknown"
924 return CLASSES[proto].In(length, payload)
926 return CLASSES[proto].Out(length, payload)
927 except (DecodeError, ValueError, IndexError) as e:
930 retobj = UNKNOWN.In(length, payload)
932 retobj = UNKNOWN.Out(length, payload)
933 retobj.PROTO = proto # Override class attr with object attr