2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
52 "GPS_OFFLINE_POSITIONING",
57 "WIFI_OFFLINE_POSITIONING",
60 "GPS_LBS_SWITCH_TIMES",
61 "REMOTE_MONITOR_PHONE",
73 "SYNCHRONOUS_WHITELIST",
79 "CHARGER_DISCONNECTED",
81 "POSITION_UPLOAD_INTERVAL",
94 def __init__(self) -> None:
97 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
99 Process next segment of the stream. Return successfully deframed
100 packets as `bytes` and error messages as `str`.
103 self.buffer += segment
104 if len(self.buffer) > MAXBUFFER:
105 # We are receiving junk. Let's drop it or we run out of memory.
107 return [f"More than {MAXBUFFER} unparseable data, dropping"]
108 msgs: List[Union[bytes, str]] = []
110 framestart = self.buffer.find(b"xx")
111 if framestart == -1: # No frames, return whatever we have
113 if framestart > 0: # Should not happen, report
115 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
117 self.buffer = self.buffer[framestart:]
118 # At this point, buffer starts with a packet
119 if len(self.buffer) < 6: # no len and proto - cannot proceed
121 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
123 # Length field can legitimeely be much less than the
124 # length of the packet (e.g. WiFi positioning), but
125 # it _should not_ be greater. Still sometimes it is.
126 # Luckily, not by too much: by maybe two or three bytes?
127 # Do this embarrassing hack to avoid accidental match
128 # of some binary data in the packet against '\r\n'.
130 frameend = self.buffer.find(b"\r\n", frameend + 1)
131 if frameend == -1 or frameend >= (
133 ): # Found realistic match or none
135 if frameend == -1: # Incomplete frame, return what we have
137 packet = self.buffer[2:frameend]
138 self.buffer = self.buffer[frameend + 2 :]
139 if len(packet) < 2: # frameend comes too early
140 msgs.append(f"Packet too short: {packet.hex()}")
145 def close(self) -> bytes:
151 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
152 return b"xx" + buffer + b"\r\n"
155 ### Parser/Constructor ###
158 class DecodeError(Exception):
159 def __init__(self, e: Exception, **kwargs: Any) -> None:
161 for k, v in kwargs.items():
165 def maybe(typ: type) -> Callable[[Any], Any]:
166 return lambda x: None if x is None else typ(x)
169 def intx(x: Union[str, int]) -> int:
170 if isinstance(x, str):
175 def boolx(x: Union[str, bool]) -> bool:
176 if isinstance(x, str):
177 if x.upper() in ("ON", "TRUE", "1"):
179 if x.upper() in ("OFF", "FALSE", "0"):
181 raise ValueError(str(x) + " could not be parsed as a Boolean")
185 def hhmm(x: str) -> str:
186 """Check for the string that represents hours and minutes"""
187 if not isinstance(x, str) or len(x) != 4:
188 raise ValueError(str(x) + " is not a four-character string")
191 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
192 raise ValueError(str(x) + " does not contain valid hours and minutes")
196 def hhmmhhmm(x: str) -> str:
197 """Check for the string that represents hours and minutes twice"""
198 if not isinstance(x, str) or len(x) != 8:
199 raise ValueError(str(x) + " is not an eight-character string")
200 return hhmm(x[:4]) + hhmm(x[4:])
203 def l3str(x: Union[str, List[str]]) -> List[str]:
204 if isinstance(x, str):
208 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
209 raise ValueError(str(lx) + " is not a list of three strings")
213 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
214 def alrmspec(sub: str) -> Tuple[int, str]:
216 raise ValueError(sub + " does not represent day and time")
230 if isinstance(x, str):
231 lx = [alrmspec(sub) for sub in x.split(",")]
234 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
235 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
236 raise ValueError(str(lx) + " is a wrong alarms specification")
237 return [(d, hhmm(tm)) for d, tm in lx]
240 def l3int(x: Union[str, List[int]]) -> List[int]:
241 if isinstance(x, str):
242 lx = [int(el) for el in x.split(",")]
245 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
246 raise ValueError(str(lx) + " is not a list of three integers")
252 For each class corresponding to a message, automatically create
253 two nested classes `In` and `Out` that also inherit from their
254 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
255 copied to the `In` nested class under the name `KWARGS`, and
256 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
257 to the nested class `Out`. In addition, method `encode` is
258 defined in both classes equal to `in_encode()` and `out_encode()`
264 def __getattr__(self, name: str) -> Any:
267 def __setattr__(self, name: str, value: Any) -> None:
271 cls: Type["MetaPkt"],
273 bases: Tuple[type, ...],
274 attrs: Dict[str, Any],
276 newcls = super().__new__(cls, name, bases, attrs)
277 newcls.In = super().__new__(
282 "KWARGS": newcls.IN_KWARGS,
283 "decode": newcls.in_decode,
284 "encode": newcls.in_encode,
287 newcls.Out = super().__new__(
292 "KWARGS": newcls.OUT_KWARGS,
293 "decode": newcls.out_decode,
294 "encode": newcls.out_encode,
301 NON = 0 # Incoming, no response needed
302 INL = 1 # Birirectional, use `inline_response()`
303 EXT = 2 # Birirectional, use external responder
306 class GPS303Pkt(metaclass=MetaPkt):
307 RESPOND = Respond.NON # Do not send anything back by default
309 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
311 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
312 In: Type["GPS303Pkt"]
313 Out: Type["GPS303Pkt"]
317 def __getattr__(self, name: str) -> Any:
320 def __setattr__(self, name: str, value: Any) -> None:
323 def __init__(self, *args: Any, **kwargs: Any):
325 Construct the object _either_ from (length, payload),
326 _or_ from the values of individual fields
328 assert not args or (len(args) == 2 and not kwargs)
329 if args: # guaranteed to be two arguments at this point
330 self.length, self.payload = args
332 self.decode(self.length, self.payload)
334 raise DecodeError(e, obj=self)
336 for kw, typ, dfl in self.KWARGS:
337 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
340 self.__class__.__name__ + " stray kwargs " + str(kwargs)
343 def __repr__(self) -> str:
344 return "{}({})".format(
345 self.__class__.__name__,
349 'bytes.fromhex("{}")'.format(v.hex())
350 if isinstance(v, bytes)
353 for k, v in self.__dict__.items()
354 if not k.startswith("_")
358 decode: Callable[["GPS303Pkt", int, bytes], None]
360 def in_decode(self, length: int, packet: bytes) -> None:
361 # Overridden in subclasses, otherwise do not decode payload
364 def out_decode(self, length: int, packet: bytes) -> None:
365 # Overridden in subclasses, otherwise do not decode payload
368 encode: Callable[["GPS303Pkt"], bytes]
370 def in_encode(self) -> bytes:
371 # Necessary to emulate terminal, which is not implemented
372 raise NotImplementedError(
373 self.__class__.__name__ + ".encode() not implemented"
376 def out_encode(self) -> bytes:
377 # Overridden in subclasses, otherwise make empty payload
381 def packed(self) -> bytes:
382 payload = self.encode()
383 length = getattr(self, "length", len(payload) + 1)
384 return pack("BB", length, self.PROTO) + payload
387 class UNKNOWN(GPS303Pkt):
388 PROTO = 256 # > 255 is impossible in real packets
391 class LOGIN(GPS303Pkt):
393 RESPOND = Respond.INL
394 # Default response for ACK, can also respond with STOP_UPLOAD
395 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
397 def in_decode(self, length: int, payload: bytes) -> None:
398 self.imei = payload[:8].ljust(8, b"\0").hex()
399 self.ver = payload[8]
401 def in_encode(self) -> bytes:
402 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
407 class SUPERVISION(GPS303Pkt):
409 OUT_KWARGS = (("status", int, 1),)
411 def out_encode(self) -> bytes:
412 # 1: The device automatically answers Pickup effect
413 # 2: Automatically Answering Two-way Calls
414 # 3: Ring manually answer the two-way call
415 return pack("B", self.status)
418 class HEARTBEAT(GPS303Pkt):
420 RESPOND = Respond.INL
423 class _GPS_POSITIONING(GPS303Pkt):
424 RESPOND = Respond.INL
426 def in_decode(self, length: int, payload: bytes) -> None:
427 self.dtime = payload[:6]
428 if self.dtime == b"\0\0\0\0\0\0":
431 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
432 self.devtime = datetime(
433 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
435 self.gps_data_length = payload[6] >> 4
436 self.gps_nb_sat = payload[6] & 0x0F
437 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
438 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
439 flip_lon = bool(flags & 0b0000100000000000) # bit 4
440 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
441 self.heading = flags & 0b0000001111111111 # bits 6 - last
442 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
443 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
447 def out_encode(self) -> bytes:
448 tup = datetime.utcnow().timetuple()
449 ttup = (tup[0] % 100,) + tup[1:6]
450 return pack("BBBBBB", *ttup)
453 class GPS_POSITIONING(_GPS_POSITIONING):
457 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
461 class STATUS(GPS303Pkt):
463 RESPOND = Respond.EXT
467 ("timezone", int, 0),
469 ("signal", maybe(int), None),
471 OUT_KWARGS = (("upload_interval", int, 25),)
473 def in_decode(self, length: int, payload: bytes) -> None:
474 self.batt, self.ver, self.timezone, self.intvl = unpack(
478 self.signal: Optional[int] = payload[4]
482 def in_encode(self) -> bytes:
483 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
484 b"" if self.signal is None else pack("B", self.signal)
487 def out_encode(self) -> bytes: # Set interval in minutes
488 return pack("B", self.upload_interval)
491 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
494 def in_encode(self) -> bytes:
498 class RESET(GPS303Pkt):
499 # Device sends when it got reset SMS
500 # Server can send to initiate factory reset
504 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
506 OUT_KWARGS = (("number", int, 3),)
508 def out_encode(self) -> bytes: # Number of whitelist entries
509 return pack("B", self.number)
512 class _WIFI_POSITIONING(GPS303Pkt):
513 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
515 ("dtime", bytes, b"\0\0\0\0\0\0"),
516 ("wifi_aps", list, []),
519 ("gsm_cells", list, []),
522 def in_decode(self, length: int, payload: bytes) -> None:
523 self.dtime = payload[:6]
524 if self.dtime == b"\0\0\0\0\0\0":
527 self.devtime = datetime.strptime(
528 self.dtime.hex(), "%y%m%d%H%M%S"
529 ).astimezone(tz=timezone.utc)
531 for i in range(self.length): # length has special meaning here
532 slice = payload[6 + i * 7 : 13 + i * 7]
533 self.wifi_aps.append(
534 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
536 gsm_slice = payload[6 + self.length * 7 :]
537 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
539 for i in range(ncells):
540 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
541 locac, cellid, sigstr = unpack(
542 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
544 self.gsm_cells.append((locac, cellid, -sigstr))
546 def in_encode(self) -> bytes:
547 self.length = len(self.wifi_aps)
553 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
555 for mac, sigstr in self.wifi_aps
558 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
561 pack("!HHB", locac, cellid, -sigstr)
562 for locac, cellid, sigstr in self.gsm_cells
569 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
571 RESPOND = Respond.INL
573 def out_encode(self) -> bytes:
574 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
577 class TIME(GPS303Pkt):
579 RESPOND = Respond.INL
581 def out_encode(self) -> bytes:
582 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
585 class PROHIBIT_LBS(GPS303Pkt):
587 OUT_KWARGS = (("status", int, 1),)
589 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
590 return pack("B", self.status)
593 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
597 ("gps_off", boolx, False), # Clarify the meaning of 0/1
598 ("gps_interval_set", boolx, False),
599 ("gps_interval", hhmmhhmm, "00000000"),
600 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
601 ("boot_time_set", boolx, False),
602 ("boot_time", hhmm, "0000"),
603 ("shut_time_set", boolx, False),
604 ("shut_time", hhmm, "0000"),
607 def out_encode(self) -> bytes:
609 pack("B", self.gps_off)
610 + pack("B", self.gps_interval_set)
611 + bytes.fromhex(self.gps_interval)
612 + pack("B", self.lbs_off)
613 + pack("B", self.boot_time_set)
614 + bytes.fromhex(self.boot_time)
615 + pack("B", self.shut_time_set)
616 + bytes.fromhex(self.shut_time)
620 class _SET_PHONE(GPS303Pkt):
621 OUT_KWARGS = (("phone", str, ""),)
623 def out_encode(self) -> bytes:
625 return self.phone.encode("")
628 class REMOTE_MONITOR_PHONE(_SET_PHONE):
632 class SOS_PHONE(_SET_PHONE):
636 class DAD_PHONE(_SET_PHONE):
640 class MOM_PHONE(_SET_PHONE):
644 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
648 class GPS_OFF_PERIOD(GPS303Pkt):
652 ("fm", hhmm, "0000"),
653 ("to", hhmm, "2359"),
656 def out_encode(self) -> bytes:
658 pack("B", self.onoff)
659 + bytes.fromhex(self.fm)
660 + bytes.fromhex(self.to)
664 class DND_PERIOD(GPS303Pkt):
669 ("fm1", hhmm, "0000"),
670 ("to1", hhmm, "2359"),
671 ("fm2", hhmm, "0000"),
672 ("to2", hhmm, "2359"),
675 def out_encode(self) -> bytes:
677 pack("B", self.onoff)
678 + pack("B", self.week)
679 + bytes.fromhex(self.fm1)
680 + bytes.fromhex(self.to1)
681 + bytes.fromhex(self.fm2)
682 + bytes.fromhex(self.to2)
686 class RESTART_SHUTDOWN(GPS303Pkt):
688 OUT_KWARGS = (("flag", int, 0),)
690 def out_encode(self) -> bytes:
693 return pack("B", self.flag)
696 class DEVICE(GPS303Pkt):
698 OUT_KWARGS = (("flag", int, 0),)
700 # 0 - Stop looking for equipment
701 # 1 - Start looking for equipment
702 def out_encode(self) -> bytes:
703 return pack("B", self.flag)
706 class ALARM_CLOCK(GPS303Pkt):
709 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
711 ("alarms", l3alarms, []),
714 def out_encode(self) -> bytes:
716 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
720 class STOP_ALARM(GPS303Pkt):
723 def in_decode(self, length: int, payload: bytes) -> None:
724 self.flag = payload[0]
727 class SETUP(GPS303Pkt):
729 RESPOND = Respond.EXT
731 ("uploadintervalseconds", intx, 0x0300),
732 ("binaryswitch", intx, 0b00110001),
733 ("alarms", l3int, [0, 0, 0]),
734 ("dndtimeswitch", int, 0),
735 ("dndtimes", l3int, [0, 0, 0]),
736 ("gpstimeswitch", int, 0),
737 ("gpstimestart", int, 0),
738 ("gpstimestop", int, 0),
739 ("phonenumbers", l3str, ["", "", ""]),
742 def out_encode(self) -> bytes:
743 def pack3b(x: int) -> bytes:
744 return pack("!I", x)[1:]
748 pack("!H", self.uploadintervalseconds),
749 pack("B", self.binaryswitch),
751 + [pack3b(el) for el in self.alarms]
753 pack("B", self.dndtimeswitch),
755 + [pack3b(el) for el in self.dndtimes]
757 pack("B", self.gpstimeswitch),
758 pack("!H", self.gpstimestart),
759 pack("!H", self.gpstimestop),
761 + [b";".join([el.encode() for el in self.phonenumbers])]
764 def in_encode(self) -> bytes:
768 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
772 class RESTORE_PASSWORD(GPS303Pkt):
776 class WIFI_POSITIONING(_WIFI_POSITIONING):
778 RESPOND = Respond.EXT
779 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
781 def out_encode(self) -> bytes:
782 if self.latitude is None or self.longitude is None:
784 return "{:+#010.8g},{:+#010.8g}".format(
785 self.latitude, self.longitude
788 def out_decode(self, length: int, payload: bytes) -> None:
789 lat, lon = payload.decode().split(",")
790 self.latitude = float(lat)
791 self.longitude = float(lon)
794 class MANUAL_POSITIONING(GPS303Pkt):
797 def in_decode(self, length: int, payload: bytes) -> None:
798 self.flag = payload[0] if len(payload) > 0 else -1
803 4: "LBS search > 3 times",
804 5: "Same LBS and WiFi data",
805 6: "LBS prohibited, WiFi absent",
806 7: "GPS spacing < 50 m",
807 }.get(self.flag, "Unknown")
810 class BATTERY_CHARGE(GPS303Pkt):
814 class CHARGER_CONNECTED(GPS303Pkt):
818 class CHARGER_DISCONNECTED(GPS303Pkt):
822 class VIBRATION_RECEIVED(GPS303Pkt):
826 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
828 RESPOND = Respond.EXT
829 OUT_KWARGS = (("interval", int, 10),)
831 def in_decode(self, length: int, payload: bytes) -> None:
832 self.interval = unpack("!H", payload[:2])
834 def out_encode(self) -> bytes:
835 return pack("!H", self.interval)
838 class SOS_ALARM(GPS303Pkt):
842 class UNKNOWN_B3(GPS303Pkt):
844 IN_KWARGS = (("asciidata", str, ""),)
846 def in_decode(self, length: int, payload: bytes) -> None:
847 self.asciidata = payload.decode()
850 # Build dicts protocol number -> class and class name -> protocol number
853 if True: # just to indent the code, sorry!
856 for name, cls in globals().items()
858 and issubclass(cls, GPS303Pkt)
859 and not name.startswith("_")
861 if hasattr(cls, "PROTO"):
862 CLASSES[cls.PROTO] = cls
863 PROTOS[cls.__name__] = cls.PROTO
868 ) -> Union[Type[GPS303Pkt], List[str]]:
869 if prefix.startswith(PROTO_PREFIX):
870 pname = prefix[len(PROTO_PREFIX) :]
872 raise KeyError(pname)
875 for name, proto in PROTOS.items()
876 if name.upper().startswith(prefix.upper())
879 return [name for name, _ in lst]
881 return CLASSES[proto]
884 def proto_handled(proto: str) -> bool:
885 return proto.startswith(PROTO_PREFIX)
888 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
889 return PROTO_PREFIX + (
890 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
894 def proto_of_message(packet: bytes) -> str:
895 return proto_name(CLASSES.get(packet[1], UNKNOWN))
898 def imei_from_packet(packet: bytes) -> Optional[str]:
899 if packet[1] == LOGIN.PROTO:
900 msg = parse_message(packet)
901 if isinstance(msg, LOGIN):
906 def is_goodbye_packet(packet: bytes) -> bool:
907 return packet[1] == HIBERNATION.PROTO
910 def inline_response(packet: bytes) -> Optional[bytes]:
914 if cls.RESPOND is Respond.INL:
915 return cls.Out().packed
919 def probe_buffer(buffer: bytes) -> bool:
920 framestart = buffer.find(b"xx")
923 if len(buffer) - framestart < 6:
928 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
929 """From a packet (without framing bytes) derive the XXX.In object"""
930 length, proto = unpack("BB", packet[:2])
932 if proto not in CLASSES:
933 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
934 f"Proto {proto} is unknown"
939 return CLASSES[proto].In(length, payload)
941 return CLASSES[proto].Out(length, payload)
942 except (DecodeError, ValueError, IndexError) as e:
945 retobj = UNKNOWN.In(length, payload)
947 retobj = UNKNOWN.Out(length, payload)
948 retobj.PROTO = proto # Override class attr with object attr
953 def exposed_protos() -> List[Tuple[str, bool]]:
955 (proto_name(GPS_POSITIONING), True),
956 (proto_name(WIFI_POSITIONING), False),
957 (proto_name(STATUS), True),