2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
22 from types import SimpleNamespace
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
51 PROTO_PREFIX: str = "ZX:"
59 def __init__(self) -> None:
62 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
64 Process next segment of the stream. Return successfully deframed
65 packets as `bytes` and error messages as `str`.
68 self.buffer += segment
69 if len(self.buffer) > MAXBUFFER:
70 # We are receiving junk. Let's drop it or we run out of memory.
72 return [f"More than {MAXBUFFER} unparseable data, dropping"]
73 msgs: List[Union[bytes, str]] = []
75 framestart = self.buffer.find(b"xx")
76 if framestart == -1: # No frames, return whatever we have
78 if framestart > 0: # Should not happen, report
80 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
82 self.buffer = self.buffer[framestart:]
83 # At this point, buffer starts with a packet
84 if len(self.buffer) < 6: # no len and proto - cannot proceed
86 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
88 # Length field can legitimeely be much less than the
89 # length of the packet (e.g. WiFi positioning), but
90 # it _should not_ be greater. Still sometimes it is.
91 # Luckily, not by too much: by maybe two or three bytes?
92 # Do this embarrassing hack to avoid accidental match
93 # of some binary data in the packet against '\r\n'.
95 frameend = self.buffer.find(b"\r\n", frameend + 1)
96 if frameend == -1 or frameend >= (
98 ): # Found realistic match or none
100 if frameend == -1: # Incomplete frame, return what we have
102 packet = self.buffer[2:frameend]
103 self.buffer = self.buffer[frameend + 2 :]
104 if len(packet) < 2: # frameend comes too early
105 msgs.append(f"Packet too short: {packet.hex()}")
110 def close(self) -> bytes:
116 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
117 return b"xx" + buffer + b"\r\n"
120 ### Parser/Constructor ###
123 class DecodeError(Exception):
124 def __init__(self, e: Exception, **kwargs: Any) -> None:
126 for k, v in kwargs.items():
130 def maybe(typ: type) -> Callable[[Any], Any]:
131 return lambda x: None if x is None else typ(x)
134 def intx(x: Union[str, int]) -> int:
135 if isinstance(x, str):
140 def boolx(x: Union[str, bool]) -> bool:
141 if isinstance(x, str):
142 if x.upper() in ("ON", "TRUE", "1"):
144 if x.upper() in ("OFF", "FALSE", "0"):
146 raise ValueError(str(x) + " could not be parsed as a Boolean")
150 def hhmm(x: str) -> str:
151 """Check for the string that represents hours and minutes"""
152 if not isinstance(x, str) or len(x) != 4:
153 raise ValueError(str(x) + " is not a four-character string")
156 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
157 raise ValueError(str(x) + " does not contain valid hours and minutes")
161 def hhmmhhmm(x: str) -> str:
162 """Check for the string that represents hours and minutes twice"""
163 if not isinstance(x, str) or len(x) != 8:
164 raise ValueError(str(x) + " is not an eight-character string")
165 return hhmm(x[:4]) + hhmm(x[4:])
168 def l3str(x: Union[str, List[str]]) -> List[str]:
169 if isinstance(x, str):
173 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
174 raise ValueError(str(lx) + " is not a list of three strings")
178 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
179 def alrmspec(sub: str) -> Tuple[int, str]:
181 raise ValueError(sub + " does not represent day and time")
195 if isinstance(x, str):
196 lx = [alrmspec(sub) for sub in x.split(",")]
199 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
200 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
201 raise ValueError(str(lx) + " is a wrong alarms specification")
202 return [(d, hhmm(tm)) for d, tm in lx]
205 def l3int(x: Union[str, List[int]]) -> List[int]:
206 if isinstance(x, str):
207 lx = [int(el) for el in x.split(",")]
210 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
211 raise ValueError(str(lx) + " is not a list of three integers")
216 NON = 0 # Incoming, no response needed
217 INL = 1 # Birirectional, use `inline_response()`
218 EXT = 2 # Birirectional, use external responder
221 class GPS303Pkt(ProtoClass):
222 RESPOND = Respond.NON # Do not send anything back by default
224 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227 In: Type["GPS303Pkt"]
228 Out: Type["GPS303Pkt"]
232 def __getattr__(self, name: str) -> Any:
235 def __setattr__(self, name: str, value: Any) -> None:
238 def __init__(self, *args: Any, **kwargs: Any):
240 Construct the object _either_ from (length, payload),
241 _or_ from the values of individual fields
243 assert not args or (len(args) == 2 and not kwargs)
244 if args: # guaranteed to be two arguments at this point
245 self.length, self.payload = args
247 self.decode(self.length, self.payload)
249 raise DecodeError(e, obj=self)
251 for kw, typ, dfl in self.KWARGS:
252 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
255 self.__class__.__name__ + " stray kwargs " + str(kwargs)
258 def __repr__(self) -> str:
259 return "{}({})".format(
260 self.__class__.__name__,
264 'bytes.fromhex("{}")'.format(v.hex())
265 if isinstance(v, bytes)
268 for k, v in self.__dict__.items()
269 if not k.startswith("_")
273 decode: Callable[["GPS303Pkt", int, bytes], None]
275 def in_decode(self, length: int, packet: bytes) -> None:
276 # Overridden in subclasses, otherwise do not decode payload
279 def out_decode(self, length: int, packet: bytes) -> None:
280 # Overridden in subclasses, otherwise do not decode payload
283 encode: Callable[["GPS303Pkt"], bytes]
285 def in_encode(self) -> bytes:
286 # Necessary to emulate terminal, which is not implemented
287 raise NotImplementedError(
288 self.__class__.__name__ + ".encode() not implemented"
291 def out_encode(self) -> bytes:
292 # Overridden in subclasses, otherwise make empty payload
296 def proto_name(cls) -> str:
297 """Name of the command as used externally"""
298 return (PROTO_PREFIX + cls.__name__)[:16]
301 def packed(self) -> bytes:
302 payload = self.encode()
303 length = getattr(self, "length", len(payload) + 1)
304 return pack("BB", length, self.PROTO) + payload
307 class UNKNOWN(GPS303Pkt):
308 PROTO = 256 # > 255 is impossible in real packets
311 class LOGIN(GPS303Pkt):
313 RESPOND = Respond.INL
314 # Default response for ACK, can also respond with STOP_UPLOAD
315 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
317 def in_decode(self, length: int, payload: bytes) -> None:
318 self.imei = payload[:8].ljust(8, b"\0").hex()
319 self.ver = payload[8]
321 def in_encode(self) -> bytes:
322 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
327 class SUPERVISION(GPS303Pkt):
329 OUT_KWARGS = (("status", int, 1),)
331 def out_encode(self) -> bytes:
332 # 1: The device automatically answers Pickup effect
333 # 2: Automatically Answering Two-way Calls
334 # 3: Ring manually answer the two-way call
335 return pack("B", self.status)
338 class HEARTBEAT(GPS303Pkt):
340 RESPOND = Respond.INL
343 class _GPS_POSITIONING(GPS303Pkt):
344 RESPOND = Respond.INL
346 def in_decode(self, length: int, payload: bytes) -> None:
347 self.dtime = payload[:6]
348 if self.dtime == b"\0\0\0\0\0\0":
351 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
352 self.devtime = datetime(
353 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
355 self.gps_data_length = payload[6] >> 4
356 self.gps_nb_sat = payload[6] & 0x0F
357 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
358 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
359 flip_lon = bool(flags & 0b0000100000000000) # bit 4
360 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
361 self.heading = flags & 0b0000001111111111 # bits 6 - last
362 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
363 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
367 def out_encode(self) -> bytes:
368 tup = datetime.utcnow().timetuple()
369 ttup = (tup[0] % 100,) + tup[1:6]
370 return pack("BBBBBB", *ttup)
372 def rectified(self) -> CoordReport: # JSON-able dict
374 devtime=str(self.devtime),
375 battery_percentage=-1,
379 direction=self.heading,
380 latitude=self.latitude,
381 longitude=self.longitude,
385 class GPS_POSITIONING(_GPS_POSITIONING):
389 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
393 class STATUS(GPS303Pkt):
395 RESPOND = Respond.EXT
399 ("timezone", int, 0),
401 ("signal", maybe(int), None),
403 OUT_KWARGS = (("upload_interval", int, 25),)
405 def in_decode(self, length: int, payload: bytes) -> None:
406 self.batt, self.ver, self.timezone, self.intvl = unpack(
410 self.signal: Optional[int] = payload[4]
414 def in_encode(self) -> bytes:
415 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
416 b"" if self.signal is None else pack("B", self.signal)
419 def out_encode(self) -> bytes: # Set interval in minutes
420 return pack("B", self.upload_interval)
422 def rectified(self) -> StatusReport:
423 return StatusReport(battery_percentage=self.batt)
426 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
429 def in_encode(self) -> bytes:
433 class RESET(GPS303Pkt):
434 # Device sends when it got reset SMS
435 # Server can send to initiate factory reset
439 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
441 OUT_KWARGS = (("number", int, 3),)
443 def out_encode(self) -> bytes: # Number of whitelist entries
444 return pack("B", self.number)
447 class _WIFI_POSITIONING(GPS303Pkt):
448 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
450 ("dtime", bytes, b"\0\0\0\0\0\0"),
451 ("wifi_aps", list, []),
454 ("gsm_cells", list, []),
457 def in_decode(self, length: int, payload: bytes) -> None:
458 self.dtime = payload[:6]
459 if self.dtime == b"\0\0\0\0\0\0":
462 self.devtime = datetime.strptime(
463 self.dtime.hex(), "%y%m%d%H%M%S"
464 ).astimezone(tz=timezone.utc)
466 for i in range(self.length): # length has special meaning here
467 slice = payload[6 + i * 7 : 13 + i * 7]
468 self.wifi_aps.append(
469 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
471 gsm_slice = payload[6 + self.length * 7 :]
472 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
474 for i in range(ncells):
475 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
476 locac, cellid, sigstr = unpack(
477 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
479 self.gsm_cells.append((locac, cellid, -sigstr))
481 def in_encode(self) -> bytes:
482 self.length = len(self.wifi_aps)
488 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
490 for mac, sigstr in self.wifi_aps
493 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
496 pack("!HHB", locac, cellid, -sigstr)
497 for locac, cellid, sigstr in self.gsm_cells
503 def rectified(self) -> HintReport:
505 devtime=str(self.devtime),
506 battery_percentage=-1,
509 gsm_cells=self.gsm_cells,
510 wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
514 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
516 RESPOND = Respond.INL
518 def out_encode(self) -> bytes:
519 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
522 class TIME(GPS303Pkt):
524 RESPOND = Respond.INL
526 def out_encode(self) -> bytes:
527 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
530 class PROHIBIT_LBS(GPS303Pkt):
532 OUT_KWARGS = (("status", int, 1),)
534 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
535 return pack("B", self.status)
538 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
542 ("gps_off", boolx, False), # Clarify the meaning of 0/1
543 ("gps_interval_set", boolx, False),
544 ("gps_interval", hhmmhhmm, "00000000"),
545 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
546 ("boot_time_set", boolx, False),
547 ("boot_time", hhmm, "0000"),
548 ("shut_time_set", boolx, False),
549 ("shut_time", hhmm, "0000"),
552 def out_encode(self) -> bytes:
554 pack("B", self.gps_off)
555 + pack("B", self.gps_interval_set)
556 + bytes.fromhex(self.gps_interval)
557 + pack("B", self.lbs_off)
558 + pack("B", self.boot_time_set)
559 + bytes.fromhex(self.boot_time)
560 + pack("B", self.shut_time_set)
561 + bytes.fromhex(self.shut_time)
565 class _SET_PHONE(GPS303Pkt):
566 OUT_KWARGS = (("phone", str, ""),)
568 def out_encode(self) -> bytes:
570 return self.phone.encode("")
573 class REMOTE_MONITOR_PHONE(_SET_PHONE):
577 class SOS_PHONE(_SET_PHONE):
581 class DAD_PHONE(_SET_PHONE):
585 class MOM_PHONE(_SET_PHONE):
589 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
593 class GPS_OFF_PERIOD(GPS303Pkt):
597 ("fm", hhmm, "0000"),
598 ("to", hhmm, "2359"),
601 def out_encode(self) -> bytes:
603 pack("B", self.onoff)
604 + bytes.fromhex(self.fm)
605 + bytes.fromhex(self.to)
609 class DND_PERIOD(GPS303Pkt):
614 ("fm1", hhmm, "0000"),
615 ("to1", hhmm, "2359"),
616 ("fm2", hhmm, "0000"),
617 ("to2", hhmm, "2359"),
620 def out_encode(self) -> bytes:
622 pack("B", self.onoff)
623 + pack("B", self.week)
624 + bytes.fromhex(self.fm1)
625 + bytes.fromhex(self.to1)
626 + bytes.fromhex(self.fm2)
627 + bytes.fromhex(self.to2)
631 class RESTART_SHUTDOWN(GPS303Pkt):
633 OUT_KWARGS = (("flag", int, 0),)
635 def out_encode(self) -> bytes:
638 return pack("B", self.flag)
641 class DEVICE(GPS303Pkt):
643 OUT_KWARGS = (("flag", int, 0),)
645 # 0 - Stop looking for equipment
646 # 1 - Start looking for equipment
647 def out_encode(self) -> bytes:
648 return pack("B", self.flag)
651 class ALARM_CLOCK(GPS303Pkt):
654 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
656 ("alarms", l3alarms, []),
659 def out_encode(self) -> bytes:
661 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
665 class STOP_ALARM(GPS303Pkt):
668 def in_decode(self, length: int, payload: bytes) -> None:
669 self.flag = payload[0]
672 class SETUP(GPS303Pkt):
674 RESPOND = Respond.EXT
676 ("uploadintervalseconds", intx, 0x0300),
677 ("binaryswitch", intx, 0b00110001),
678 ("alarms", l3int, [0, 0, 0]),
679 ("dndtimeswitch", int, 0),
680 ("dndtimes", l3int, [0, 0, 0]),
681 ("gpstimeswitch", int, 0),
682 ("gpstimestart", int, 0),
683 ("gpstimestop", int, 0),
684 ("phonenumbers", l3str, ["", "", ""]),
687 def out_encode(self) -> bytes:
688 def pack3b(x: int) -> bytes:
689 return pack("!I", x)[1:]
693 pack("!H", self.uploadintervalseconds),
694 pack("B", self.binaryswitch),
696 + [pack3b(el) for el in self.alarms]
698 pack("B", self.dndtimeswitch),
700 + [pack3b(el) for el in self.dndtimes]
702 pack("B", self.gpstimeswitch),
703 pack("!H", self.gpstimestart),
704 pack("!H", self.gpstimestop),
706 + [b";".join([el.encode() for el in self.phonenumbers])]
709 def in_encode(self) -> bytes:
713 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
717 class RESTORE_PASSWORD(GPS303Pkt):
721 class WIFI_POSITIONING(_WIFI_POSITIONING):
723 RESPOND = Respond.EXT
724 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
726 def out_encode(self) -> bytes:
727 if self.latitude is None or self.longitude is None:
729 return "{:+#010.8g},{:+#010.8g}".format(
730 self.latitude, self.longitude
733 def out_decode(self, length: int, payload: bytes) -> None:
734 lat, lon = payload.decode().split(",")
735 self.latitude = float(lat)
736 self.longitude = float(lon)
739 class MANUAL_POSITIONING(GPS303Pkt):
742 def in_decode(self, length: int, payload: bytes) -> None:
743 self.flag = payload[0] if len(payload) > 0 else -1
748 4: "LBS search > 3 times",
749 5: "Same LBS and WiFi data",
750 6: "LBS prohibited, WiFi absent",
751 7: "GPS spacing < 50 m",
752 }.get(self.flag, "Unknown")
755 class BATTERY_CHARGE(GPS303Pkt):
759 class CHARGER_CONNECTED(GPS303Pkt):
763 class CHARGER_DISCONNECTED(GPS303Pkt):
767 class VIBRATION_RECEIVED(GPS303Pkt):
771 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
773 RESPOND = Respond.EXT
774 OUT_KWARGS = (("interval", int, 10),)
776 def in_decode(self, length: int, payload: bytes) -> None:
777 self.interval = unpack("!H", payload[:2])
779 def out_encode(self) -> bytes:
780 return pack("!H", self.interval)
783 class SOS_ALARM(GPS303Pkt):
787 class UNKNOWN_B3(GPS303Pkt):
789 IN_KWARGS = (("asciidata", str, ""),)
791 def in_decode(self, length: int, payload: bytes) -> None:
792 self.asciidata = payload.decode()
795 # Build dicts protocol number -> class and class name -> protocol number
798 if True: # just to indent the code, sorry!
801 for name, cls in globals().items()
803 and issubclass(cls, GPS303Pkt)
804 and not name.startswith("_")
806 if hasattr(cls, "PROTO"):
807 CLASSES[cls.PROTO] = cls
808 PROTOS[cls.__name__] = cls.PROTO
813 ) -> Union[Type[GPS303Pkt], List[str]]:
814 if prefix.startswith(PROTO_PREFIX):
815 pname = prefix[len(PROTO_PREFIX) :]
817 raise KeyError(pname)
820 for name, proto in PROTOS.items()
821 if name.upper().startswith(prefix.upper())
824 return [name for name, _ in lst]
826 return CLASSES[proto]
829 def proto_handled(proto: str) -> bool:
830 return proto.startswith(PROTO_PREFIX)
833 def proto_of_message(packet: bytes) -> str:
834 return CLASSES.get(packet[1], UNKNOWN).proto_name()
837 def imei_from_packet(packet: bytes) -> Optional[str]:
838 if packet[1] == LOGIN.PROTO:
839 msg = parse_message(packet)
840 if isinstance(msg, LOGIN):
845 def is_goodbye_packet(packet: bytes) -> bool:
846 return packet[1] == HIBERNATION.PROTO
849 def inline_response(packet: bytes) -> Optional[bytes]:
853 if cls.RESPOND is Respond.INL:
854 return cls.Out().packed
858 def probe_buffer(buffer: bytes) -> bool:
859 framestart = buffer.find(b"xx")
862 if len(buffer) - framestart < 6:
867 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
868 """From a packet (without framing bytes) derive the XXX.In object"""
869 length, proto = unpack("BB", packet[:2])
871 if proto not in CLASSES:
872 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
873 f"Proto {proto} is unknown"
878 return CLASSES[proto].In(length, payload)
880 return CLASSES[proto].Out(length, payload)
881 except (DecodeError, ValueError, IndexError) as e:
884 retobj = UNKNOWN.In(length, payload)
886 retobj = UNKNOWN.Out(length, payload)
887 retobj.PROTO = proto # Override class attr with object attr
892 def exposed_protos() -> List[Tuple[str, bool]]:
894 (cls.proto_name(), cls.RESPOND is Respond.EXT)
895 for cls in CLASSES.values()
896 if hasattr(cls, "rectified")