2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
50 "GPS_OFFLINE_POSITIONING",
55 "WIFI_OFFLINE_POSITIONING",
58 "GPS_LBS_SWITCH_TIMES",
59 "REMOTE_MONITOR_PHONE",
71 "SYNCHRONOUS_WHITELIST",
77 "CHARGER_DISCONNECTED",
79 "POSITION_UPLOAD_INTERVAL",
92 def __init__(self) -> None:
96 def enframe(buffer: bytes) -> bytes:
97 return b"xx" + buffer + b"\r\n"
99 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
101 Process next segment of the stream. Return successfully deframed
102 packets as `bytes` and error messages as `str`.
105 self.buffer += segment
106 if len(self.buffer) > MAXBUFFER:
107 # We are receiving junk. Let's drop it or we run out of memory.
109 return [f"More than {MAXBUFFER} unparseable data, dropping"]
110 msgs: List[Union[bytes, str]] = []
112 framestart = self.buffer.find(b"xx")
113 if framestart == -1: # No frames, return whatever we have
115 if framestart > 0: # Should not happen, report
117 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
119 self.buffer = self.buffer[framestart:]
120 # At this point, buffer starts with a packet
121 if len(self.buffer) < 6: # no len and proto - cannot proceed
123 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
125 # Length field can legitimeely be much less than the
126 # length of the packet (e.g. WiFi positioning), but
127 # it _should not_ be greater. Still sometimes it is.
128 # Luckily, not by too much: by maybe two or three bytes?
129 # Do this embarrassing hack to avoid accidental match
130 # of some binary data in the packet against '\r\n'.
132 frameend = self.buffer.find(b"\r\n", frameend + 1)
133 if frameend == -1 or frameend >= (
135 ): # Found realistic match or none
137 if frameend == -1: # Incomplete frame, return what we have
139 packet = self.buffer[2:frameend]
140 self.buffer = self.buffer[frameend + 2 :]
141 if len(packet) < 2: # frameend comes too early
142 msgs.append(f"Packet too short: {packet.hex()}")
147 def close(self) -> bytes:
153 ### Parser/Constructor ###
156 class DecodeError(Exception):
157 def __init__(self, e: Exception, **kwargs: Any) -> None:
159 for k, v in kwargs.items():
163 def maybe(typ: type) -> Callable[[Any], Any]:
164 return lambda x: None if x is None else typ(x)
167 def intx(x: Union[str, int]) -> int:
168 if isinstance(x, str):
173 def boolx(x: Union[str, bool]) -> bool:
174 if isinstance(x, str):
175 if x.upper() in ("ON", "TRUE", "1"):
177 if x.upper() in ("OFF", "FALSE", "0"):
179 raise ValueError(str(x) + " could not be parsed as a Boolean")
183 def hhmm(x: str) -> str:
184 """Check for the string that represents hours and minutes"""
185 if not isinstance(x, str) or len(x) != 4:
186 raise ValueError(str(x) + " is not a four-character string")
189 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
190 raise ValueError(str(x) + " does not contain valid hours and minutes")
194 def hhmmhhmm(x: str) -> str:
195 """Check for the string that represents hours and minutes twice"""
196 if not isinstance(x, str) or len(x) != 8:
197 raise ValueError(str(x) + " is not an eight-character string")
198 return hhmm(x[:4]) + hhmm(x[4:])
201 def l3str(x: Union[str, List[str]]) -> List[str]:
202 if isinstance(x, str):
206 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
207 raise ValueError(str(lx) + " is not a list of three strings")
211 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
212 def alrmspec(sub: str) -> Tuple[int, str]:
214 raise ValueError(sub + " does not represent day and time")
228 if isinstance(x, str):
229 lx = [alrmspec(sub) for sub in x.split(",")]
232 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
233 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
234 raise ValueError(str(lx) + " is a wrong alarms specification")
235 return [(d, hhmm(tm)) for d, tm in lx]
238 def l3int(x: Union[str, List[int]]) -> List[int]:
239 if isinstance(x, str):
240 lx = [int(el) for el in x.split(",")]
243 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
244 raise ValueError(str(lx) + " is not a list of three integers")
250 For each class corresponding to a message, automatically create
251 two nested classes `In` and `Out` that also inherit from their
252 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
253 copied to the `In` nested class under the name `KWARGS`, and
254 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
255 to the nested class `Out`. In addition, method `encode` is
256 defined in both classes equal to `in_encode()` and `out_encode()`
262 def __getattr__(self, name: str) -> Any:
265 def __setattr__(self, name: str, value: Any) -> None:
269 cls: Type["MetaPkt"],
271 bases: Tuple[type, ...],
272 attrs: Dict[str, Any],
274 newcls = super().__new__(cls, name, bases, attrs)
275 newcls.In = super().__new__(
280 "KWARGS": newcls.IN_KWARGS,
281 "decode": newcls.in_decode,
282 "encode": newcls.in_encode,
285 newcls.Out = super().__new__(
290 "KWARGS": newcls.OUT_KWARGS,
291 "decode": newcls.out_decode,
292 "encode": newcls.out_encode,
299 NON = 0 # Incoming, no response needed
300 INL = 1 # Birirectional, use `inline_response()`
301 EXT = 2 # Birirectional, use external responder
304 class GPS303Pkt(metaclass=MetaPkt):
305 RESPOND = Respond.NON # Do not send anything back by default
307 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
308 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
309 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310 In: Type["GPS303Pkt"]
311 Out: Type["GPS303Pkt"]
315 def __getattr__(self, name: str) -> Any:
318 def __setattr__(self, name: str, value: Any) -> None:
321 def __init__(self, *args: Any, **kwargs: Any):
323 Construct the object _either_ from (length, payload),
324 _or_ from the values of individual fields
326 assert not args or (len(args) == 2 and not kwargs)
327 if args: # guaranteed to be two arguments at this point
328 self.length, self.payload = args
330 self.decode(self.length, self.payload)
332 raise DecodeError(e, obj=self)
334 for kw, typ, dfl in self.KWARGS:
335 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
338 self.__class__.__name__ + " stray kwargs " + str(kwargs)
341 def __repr__(self) -> str:
342 return "{}({})".format(
343 self.__class__.__name__,
347 'bytes.fromhex("{}")'.format(v.hex())
348 if isinstance(v, bytes)
351 for k, v in self.__dict__.items()
352 if not k.startswith("_")
356 decode: Callable[["GPS303Pkt", int, bytes], None]
358 def in_decode(self, length: int, packet: bytes) -> None:
359 # Overridden in subclasses, otherwise do not decode payload
362 def out_decode(self, length: int, packet: bytes) -> None:
363 # Overridden in subclasses, otherwise do not decode payload
366 encode: Callable[["GPS303Pkt"], bytes]
368 def in_encode(self) -> bytes:
369 # Necessary to emulate terminal, which is not implemented
370 raise NotImplementedError(
371 self.__class__.__name__ + ".encode() not implemented"
374 def out_encode(self) -> bytes:
375 # Overridden in subclasses, otherwise make empty payload
379 def packed(self) -> bytes:
380 payload = self.encode()
381 length = getattr(self, "length", len(payload) + 1)
382 return pack("BB", length, self.PROTO) + payload
385 class UNKNOWN(GPS303Pkt):
386 PROTO = 256 # > 255 is impossible in real packets
389 class LOGIN(GPS303Pkt):
391 RESPOND = Respond.INL
392 # Default response for ACK, can also respond with STOP_UPLOAD
393 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
395 def in_decode(self, length: int, payload: bytes) -> None:
396 self.imei = payload[:8].ljust(8, b"\0").hex()
397 self.ver = payload[8]
399 def in_encode(self) -> bytes:
400 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
405 class SUPERVISION(GPS303Pkt):
407 OUT_KWARGS = (("status", int, 1),)
409 def out_encode(self) -> bytes:
410 # 1: The device automatically answers Pickup effect
411 # 2: Automatically Answering Two-way Calls
412 # 3: Ring manually answer the two-way call
413 return pack("B", self.status)
416 class HEARTBEAT(GPS303Pkt):
418 RESPOND = Respond.INL
421 class _GPS_POSITIONING(GPS303Pkt):
422 RESPOND = Respond.INL
424 def in_decode(self, length: int, payload: bytes) -> None:
425 self.dtime = payload[:6]
426 if self.dtime == b"\0\0\0\0\0\0":
429 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
430 self.devtime = datetime(
431 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
433 self.gps_data_length = payload[6] >> 4
434 self.gps_nb_sat = payload[6] & 0x0F
435 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
436 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
437 flip_lon = bool(flags & 0b0000100000000000) # bit 4
438 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
439 self.heading = flags & 0b0000001111111111 # bits 6 - last
440 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
441 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
445 def out_encode(self) -> bytes:
446 tup = datetime.utcnow().timetuple()
447 ttup = (tup[0] % 100,) + tup[1:6]
448 return pack("BBBBBB", *ttup)
451 class GPS_POSITIONING(_GPS_POSITIONING):
455 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
459 class STATUS(GPS303Pkt):
461 RESPOND = Respond.EXT
465 ("timezone", int, 0),
467 ("signal", maybe(int), None),
469 OUT_KWARGS = (("upload_interval", int, 25),)
471 def in_decode(self, length: int, payload: bytes) -> None:
472 self.batt, self.ver, self.timezone, self.intvl = unpack(
476 self.signal: Optional[int] = payload[4]
480 def in_encode(self) -> bytes:
481 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
482 b"" if self.signal is None else pack("B", self.signal)
485 def out_encode(self) -> bytes: # Set interval in minutes
486 return pack("B", self.upload_interval)
489 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
492 def in_encode(self) -> bytes:
496 class RESET(GPS303Pkt):
497 # Device sends when it got reset SMS
498 # Server can send to initiate factory reset
502 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
504 OUT_KWARGS = (("number", int, 3),)
506 def out_encode(self) -> bytes: # Number of whitelist entries
507 return pack("B", self.number)
510 class _WIFI_POSITIONING(GPS303Pkt):
511 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
513 ("dtime", bytes, b"\0\0\0\0\0\0"),
514 ("wifi_aps", list, []),
517 ("gsm_cells", list, []),
520 def in_decode(self, length: int, payload: bytes) -> None:
521 self.dtime = payload[:6]
522 if self.dtime == b"\0\0\0\0\0\0":
525 self.devtime = datetime.strptime(
526 self.dtime.hex(), "%y%m%d%H%M%S"
527 ).astimezone(tz=timezone.utc)
529 for i in range(self.length): # length has special meaning here
530 slice = payload[6 + i * 7 : 13 + i * 7]
531 self.wifi_aps.append(
532 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
534 gsm_slice = payload[6 + self.length * 7 :]
535 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
537 for i in range(ncells):
538 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
539 locac, cellid, sigstr = unpack(
540 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
542 self.gsm_cells.append((locac, cellid, -sigstr))
544 def in_encode(self) -> bytes:
545 self.length = len(self.wifi_aps)
551 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
553 for mac, sigstr in self.wifi_aps
556 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
559 pack("!HHB", locac, cellid, -sigstr)
560 for locac, cellid, sigstr in self.gsm_cells
567 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
569 RESPOND = Respond.INL
571 def out_encode(self) -> bytes:
572 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
575 class TIME(GPS303Pkt):
577 RESPOND = Respond.INL
579 def out_encode(self) -> bytes:
580 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
583 class PROHIBIT_LBS(GPS303Pkt):
585 OUT_KWARGS = (("status", int, 1),)
587 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
588 return pack("B", self.status)
591 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
595 ("gps_off", boolx, False), # Clarify the meaning of 0/1
596 ("gps_interval_set", boolx, False),
597 ("gps_interval", hhmmhhmm, "00000000"),
598 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
599 ("boot_time_set", boolx, False),
600 ("boot_time", hhmm, "0000"),
601 ("shut_time_set", boolx, False),
602 ("shut_time", hhmm, "0000"),
605 def out_encode(self) -> bytes:
607 pack("B", self.gps_off)
608 + pack("B", self.gps_interval_set)
609 + bytes.fromhex(self.gps_interval)
610 + pack("B", self.lbs_off)
611 + pack("B", self.boot_time_set)
612 + bytes.fromhex(self.boot_time)
613 + pack("B", self.shut_time_set)
614 + bytes.fromhex(self.shut_time)
618 class _SET_PHONE(GPS303Pkt):
619 OUT_KWARGS = (("phone", str, ""),)
621 def out_encode(self) -> bytes:
623 return self.phone.encode("")
626 class REMOTE_MONITOR_PHONE(_SET_PHONE):
630 class SOS_PHONE(_SET_PHONE):
634 class DAD_PHONE(_SET_PHONE):
638 class MOM_PHONE(_SET_PHONE):
642 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
646 class GPS_OFF_PERIOD(GPS303Pkt):
650 ("fm", hhmm, "0000"),
651 ("to", hhmm, "2359"),
654 def out_encode(self) -> bytes:
656 pack("B", self.onoff)
657 + bytes.fromhex(self.fm)
658 + bytes.fromhex(self.to)
662 class DND_PERIOD(GPS303Pkt):
667 ("fm1", hhmm, "0000"),
668 ("to1", hhmm, "2359"),
669 ("fm2", hhmm, "0000"),
670 ("to2", hhmm, "2359"),
673 def out_encode(self) -> bytes:
675 pack("B", self.onoff)
676 + pack("B", self.week)
677 + bytes.fromhex(self.fm1)
678 + bytes.fromhex(self.to1)
679 + bytes.fromhex(self.fm2)
680 + bytes.fromhex(self.to2)
684 class RESTART_SHUTDOWN(GPS303Pkt):
686 OUT_KWARGS = (("flag", int, 0),)
688 def out_encode(self) -> bytes:
691 return pack("B", self.flag)
694 class DEVICE(GPS303Pkt):
696 OUT_KWARGS = (("flag", int, 0),)
698 # 0 - Stop looking for equipment
699 # 1 - Start looking for equipment
700 def out_encode(self) -> bytes:
701 return pack("B", self.flag)
704 class ALARM_CLOCK(GPS303Pkt):
707 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
709 ("alarms", l3alarms, []),
712 def out_encode(self) -> bytes:
714 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
718 class STOP_ALARM(GPS303Pkt):
721 def in_decode(self, length: int, payload: bytes) -> None:
722 self.flag = payload[0]
725 class SETUP(GPS303Pkt):
727 RESPOND = Respond.EXT
729 ("uploadintervalseconds", intx, 0x0300),
730 ("binaryswitch", intx, 0b00110001),
731 ("alarms", l3int, [0, 0, 0]),
732 ("dndtimeswitch", int, 0),
733 ("dndtimes", l3int, [0, 0, 0]),
734 ("gpstimeswitch", int, 0),
735 ("gpstimestart", int, 0),
736 ("gpstimestop", int, 0),
737 ("phonenumbers", l3str, ["", "", ""]),
740 def out_encode(self) -> bytes:
741 def pack3b(x: int) -> bytes:
742 return pack("!I", x)[1:]
746 pack("!H", self.uploadintervalseconds),
747 pack("B", self.binaryswitch),
749 + [pack3b(el) for el in self.alarms]
751 pack("B", self.dndtimeswitch),
753 + [pack3b(el) for el in self.dndtimes]
755 pack("B", self.gpstimeswitch),
756 pack("!H", self.gpstimestart),
757 pack("!H", self.gpstimestop),
759 + [b";".join([el.encode() for el in self.phonenumbers])]
762 def in_encode(self) -> bytes:
766 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
770 class RESTORE_PASSWORD(GPS303Pkt):
774 class WIFI_POSITIONING(_WIFI_POSITIONING):
776 RESPOND = Respond.EXT
777 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
779 def out_encode(self) -> bytes:
780 if self.latitude is None or self.longitude is None:
782 return "{:+#010.8g},{:+#010.8g}".format(
783 self.latitude, self.longitude
786 def out_decode(self, length: int, payload: bytes) -> None:
787 lat, lon = payload.decode().split(",")
788 self.latitude = float(lat)
789 self.longitude = float(lon)
792 class MANUAL_POSITIONING(GPS303Pkt):
795 def in_decode(self, length: int, payload: bytes) -> None:
796 self.flag = payload[0] if len(payload) > 0 else -1
801 4: "LBS search > 3 times",
802 5: "Same LBS and WiFi data",
803 6: "LBS prohibited, WiFi absent",
804 7: "GPS spacing < 50 m",
805 }.get(self.flag, "Unknown")
808 class BATTERY_CHARGE(GPS303Pkt):
812 class CHARGER_CONNECTED(GPS303Pkt):
816 class CHARGER_DISCONNECTED(GPS303Pkt):
820 class VIBRATION_RECEIVED(GPS303Pkt):
824 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
826 RESPOND = Respond.EXT
827 OUT_KWARGS = (("interval", int, 10),)
829 def in_decode(self, length: int, payload: bytes) -> None:
830 self.interval = unpack("!H", payload[:2])
832 def out_encode(self) -> bytes:
833 return pack("!H", self.interval)
836 class SOS_ALARM(GPS303Pkt):
840 class UNKNOWN_B3(GPS303Pkt):
842 IN_KWARGS = (("asciidata", str, ""),)
844 def in_decode(self, length: int, payload: bytes) -> None:
845 self.asciidata = payload.decode()
848 # Build dicts protocol number -> class and class name -> protocol number
851 if True: # just to indent the code, sorry!
854 for name, cls in globals().items()
856 and issubclass(cls, GPS303Pkt)
857 and not name.startswith("_")
859 if hasattr(cls, "PROTO"):
860 CLASSES[cls.PROTO] = cls
861 PROTOS[cls.__name__] = cls.PROTO
866 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
869 for name, proto in PROTOS.items()
870 if name.upper().startswith(prefix.upper())
875 return CLASSES[proto]
878 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
883 obj.__class__.__name__
884 if isinstance(obj, GPS303Pkt)
887 ).ljust(16, "\0")[:16]
890 def proto_by_name(name: str) -> int:
891 return PROTOS.get(name, -1)
894 def proto_of_message(packet: bytes) -> str:
895 return proto_name(CLASSES.get(packet[1], UNKNOWN))
898 def imei_from_packet(packet: bytes) -> Optional[str]:
899 if packet[1] == LOGIN.PROTO:
900 msg = parse_message(packet)
901 if isinstance(msg, LOGIN):
906 def is_goodbye_packet(packet: bytes) -> bool:
907 return packet[1] == HIBERNATION.PROTO
910 def inline_response(packet: bytes) -> Optional[bytes]:
914 if cls.RESPOND is Respond.INL:
915 return cls.Out().packed
919 def probe_buffer(buffer: bytes) -> bool:
920 framestart = buffer.find(b"xx")
923 if len(buffer) - framestart < 6:
928 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
929 """From a packet (without framing bytes) derive the XXX.In object"""
930 length, proto = unpack("BB", packet[:2])
932 if proto not in CLASSES:
933 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
934 f"Proto {proto} is unknown"
939 return CLASSES[proto].In(length, payload)
941 return CLASSES[proto].Out(length, payload)
942 except (DecodeError, ValueError, IndexError) as e:
945 retobj = UNKNOWN.In(length, payload)
947 retobj = UNKNOWN.Out(length, payload)
948 retobj.PROTO = proto # Override class attr with object attr