2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
22 from types import SimpleNamespace
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
51 PMODNAME = __name__.split(".")[-1]
52 PROTO_PREFIX: str = "ZX:"
60 def __init__(self) -> None:
63 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
65 Process next segment of the stream. Return successfully deframed
66 packets as `bytes` and error messages as `str`.
69 self.buffer += segment
70 if len(self.buffer) > MAXBUFFER:
71 # We are receiving junk. Let's drop it or we run out of memory.
73 return [f"More than {MAXBUFFER} unparseable data, dropping"]
74 msgs: List[Union[bytes, str]] = []
76 framestart = self.buffer.find(b"xx")
77 if framestart == -1: # No frames, return whatever we have
79 if framestart > 0: # Should not happen, report
81 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
83 self.buffer = self.buffer[framestart:]
84 # At this point, buffer starts with a packet
85 if len(self.buffer) < 6: # no len and proto - cannot proceed
87 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
89 # Length field can legitimeely be much less than the
90 # length of the packet (e.g. WiFi positioning), but
91 # it _should not_ be greater. Still sometimes it is.
92 # Luckily, not by too much: by maybe two or three bytes?
93 # Do this embarrassing hack to avoid accidental match
94 # of some binary data in the packet against '\r\n'.
96 frameend = self.buffer.find(b"\r\n", frameend + 1)
97 if frameend == -1 or frameend >= (
99 ): # Found realistic match or none
101 if frameend == -1: # Incomplete frame, return what we have
103 packet = self.buffer[2:frameend]
104 self.buffer = self.buffer[frameend + 2 :]
105 if len(packet) < 2: # frameend comes too early
106 msgs.append(f"Packet too short: {packet.hex()}")
111 def close(self) -> bytes:
117 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
118 return b"xx" + buffer + b"\r\n"
121 ### Parser/Constructor ###
124 class DecodeError(Exception):
125 def __init__(self, e: Exception, **kwargs: Any) -> None:
127 for k, v in kwargs.items():
131 def maybe(typ: type) -> Callable[[Any], Any]:
132 return lambda x: None if x is None else typ(x)
135 def intx(x: Union[str, int]) -> int:
136 if isinstance(x, str):
141 def boolx(x: Union[str, bool]) -> bool:
142 if isinstance(x, str):
143 if x.upper() in ("ON", "TRUE", "1"):
145 if x.upper() in ("OFF", "FALSE", "0"):
147 raise ValueError(str(x) + " could not be parsed as a Boolean")
151 def hhmm(x: str) -> str:
152 """Check for the string that represents hours and minutes"""
153 if not isinstance(x, str) or len(x) != 4:
154 raise ValueError(str(x) + " is not a four-character string")
157 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
158 raise ValueError(str(x) + " does not contain valid hours and minutes")
162 def hhmmhhmm(x: str) -> str:
163 """Check for the string that represents hours and minutes twice"""
164 if not isinstance(x, str) or len(x) != 8:
165 raise ValueError(str(x) + " is not an eight-character string")
166 return hhmm(x[:4]) + hhmm(x[4:])
169 def l3str(x: Union[str, List[str]]) -> List[str]:
170 if isinstance(x, str):
174 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
175 raise ValueError(str(lx) + " is not a list of three strings")
179 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
180 def alrmspec(sub: str) -> Tuple[int, str]:
182 raise ValueError(sub + " does not represent day and time")
196 if isinstance(x, str):
197 lx = [alrmspec(sub) for sub in x.split(",")]
200 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
201 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
202 raise ValueError(str(lx) + " is a wrong alarms specification")
203 return [(d, hhmm(tm)) for d, tm in lx]
206 def l3int(x: Union[str, List[int]]) -> List[int]:
207 if isinstance(x, str):
208 lx = [int(el) for el in x.split(",")]
211 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
212 raise ValueError(str(lx) + " is not a list of three integers")
217 NON = 0 # Incoming, no response needed
218 INL = 1 # Birirectional, use `inline_response()`
219 EXT = 2 # Birirectional, use external responder
222 class GPS303Pkt(ProtoClass):
223 RESPOND = Respond.NON # Do not send anything back by default
225 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
228 In: Type["GPS303Pkt"]
229 Out: Type["GPS303Pkt"]
233 def __getattr__(self, name: str) -> Any:
236 def __setattr__(self, name: str, value: Any) -> None:
239 def __init__(self, *args: Any, **kwargs: Any):
241 Construct the object _either_ from (length, payload),
242 _or_ from the values of individual fields
244 assert not args or (len(args) == 2 and not kwargs)
245 if args: # guaranteed to be two arguments at this point
246 self.length, self.payload = args
248 self.decode(self.length, self.payload)
250 raise DecodeError(e, obj=self)
252 for kw, typ, dfl in self.KWARGS:
253 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
256 self.__class__.__name__ + " stray kwargs " + str(kwargs)
259 def __repr__(self) -> str:
260 return "{}({})".format(
261 self.__class__.__name__,
265 'bytes.fromhex("{}")'.format(v.hex())
266 if isinstance(v, bytes)
269 for k, v in self.__dict__.items()
270 if not k.startswith("_")
274 decode: Callable[["GPS303Pkt", int, bytes], None]
276 def in_decode(self, length: int, packet: bytes) -> None:
277 # Overridden in subclasses, otherwise do not decode payload
280 def out_decode(self, length: int, packet: bytes) -> None:
281 # Overridden in subclasses, otherwise do not decode payload
284 encode: Callable[["GPS303Pkt"], bytes]
286 def in_encode(self) -> bytes:
287 # Necessary to emulate terminal, which is not implemented
288 raise NotImplementedError(
289 self.__class__.__name__ + ".encode() not implemented"
292 def out_encode(self) -> bytes:
293 # Overridden in subclasses, otherwise make empty payload
297 def proto_name(cls) -> str:
298 """Name of the command as used externally"""
299 return (PROTO_PREFIX + cls.__name__)[:16]
302 def packed(self) -> bytes:
303 payload = self.encode()
304 length = getattr(self, "length", len(payload) + 1)
305 return pack("BB", length, self.PROTO) + payload
308 class UNKNOWN(GPS303Pkt):
309 PROTO = 256 # > 255 is impossible in real packets
312 class LOGIN(GPS303Pkt):
314 RESPOND = Respond.INL
315 # Default response for ACK, can also respond with STOP_UPLOAD
316 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
318 def in_decode(self, length: int, payload: bytes) -> None:
319 self.imei = payload[:8].ljust(8, b"\0").hex()
320 self.ver = payload[8]
322 def in_encode(self) -> bytes:
323 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
328 class SUPERVISION(GPS303Pkt):
330 OUT_KWARGS = (("status", int, 1),)
332 def out_encode(self) -> bytes:
333 # 1: The device automatically answers Pickup effect
334 # 2: Automatically Answering Two-way Calls
335 # 3: Ring manually answer the two-way call
336 return pack("B", self.status)
339 class HEARTBEAT(GPS303Pkt):
341 RESPOND = Respond.INL
344 class _GPS_POSITIONING(GPS303Pkt):
345 RESPOND = Respond.INL
347 def in_decode(self, length: int, payload: bytes) -> None:
348 self.dtime = payload[:6]
349 if self.dtime == b"\0\0\0\0\0\0":
352 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
353 self.devtime = datetime(
354 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
356 self.gps_data_length = payload[6] >> 4
357 self.gps_nb_sat = payload[6] & 0x0F
358 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
359 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
360 flip_lon = bool(flags & 0b0000100000000000) # bit 4
361 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
362 self.heading = flags & 0b0000001111111111 # bits 6 - last
363 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
364 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
368 def out_encode(self) -> bytes:
369 tup = datetime.utcnow().timetuple()
370 ttup = (tup[0] % 100,) + tup[1:6]
371 return pack("BBBBBB", *ttup)
373 def rectified(self) -> CoordReport: # JSON-able dict
375 devtime=str(self.devtime),
376 battery_percentage=None,
380 direction=self.heading,
381 latitude=self.latitude,
382 longitude=self.longitude,
386 class GPS_POSITIONING(_GPS_POSITIONING):
390 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
394 class STATUS(GPS303Pkt):
396 RESPOND = Respond.EXT
400 ("timezone", int, 0),
402 ("signal", maybe(int), None),
404 OUT_KWARGS = (("upload_interval", int, 25),)
406 def in_decode(self, length: int, payload: bytes) -> None:
407 self.batt, self.ver, self.timezone, self.intvl = unpack(
411 self.signal: Optional[int] = payload[4]
415 def in_encode(self) -> bytes:
416 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
417 b"" if self.signal is None else pack("B", self.signal)
420 def out_encode(self) -> bytes: # Set interval in minutes
421 return pack("B", self.upload_interval)
423 def rectified(self) -> StatusReport:
424 return StatusReport(battery_percentage=self.batt)
427 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
430 def in_encode(self) -> bytes:
434 class RESET(GPS303Pkt):
435 # Device sends when it got reset SMS
436 # Server can send to initiate factory reset
440 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
442 OUT_KWARGS = (("number", int, 3),)
444 def out_encode(self) -> bytes: # Number of whitelist entries
445 return pack("B", self.number)
448 class _WIFI_POSITIONING(GPS303Pkt):
449 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
451 ("dtime", bytes, b"\0\0\0\0\0\0"),
452 ("wifi_aps", list, []),
455 ("gsm_cells", list, []),
458 def in_decode(self, length: int, payload: bytes) -> None:
459 self.dtime = payload[:6]
460 if self.dtime == b"\0\0\0\0\0\0":
463 self.devtime = datetime.strptime(
464 self.dtime.hex(), "%y%m%d%H%M%S"
465 ).astimezone(tz=timezone.utc)
467 for i in range(self.length): # length has special meaning here
468 slice = payload[6 + i * 7 : 13 + i * 7]
469 self.wifi_aps.append(
470 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
472 gsm_slice = payload[6 + self.length * 7 :]
473 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
475 for i in range(ncells):
476 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
477 locac, cellid, sigstr = unpack(
478 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
480 self.gsm_cells.append((locac, cellid, -sigstr))
482 def in_encode(self) -> bytes:
483 self.length = len(self.wifi_aps)
489 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
491 for mac, sigstr in self.wifi_aps
494 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
497 pack("!HHB", locac, cellid, -sigstr)
498 for locac, cellid, sigstr in self.gsm_cells
504 def rectified(self) -> HintReport:
506 devtime=str(self.devtime),
507 battery_percentage=None,
510 gsm_cells=self.gsm_cells,
511 wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
515 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
517 RESPOND = Respond.INL
519 def out_encode(self) -> bytes:
520 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
523 class TIME(GPS303Pkt):
525 RESPOND = Respond.INL
527 def out_encode(self) -> bytes:
528 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
531 class PROHIBIT_LBS(GPS303Pkt):
533 OUT_KWARGS = (("status", int, 1),)
535 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
536 return pack("B", self.status)
539 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
543 ("gps_off", boolx, False), # Clarify the meaning of 0/1
544 ("gps_interval_set", boolx, False),
545 ("gps_interval", hhmmhhmm, "00000000"),
546 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
547 ("boot_time_set", boolx, False),
548 ("boot_time", hhmm, "0000"),
549 ("shut_time_set", boolx, False),
550 ("shut_time", hhmm, "0000"),
553 def out_encode(self) -> bytes:
555 pack("B", self.gps_off)
556 + pack("B", self.gps_interval_set)
557 + bytes.fromhex(self.gps_interval)
558 + pack("B", self.lbs_off)
559 + pack("B", self.boot_time_set)
560 + bytes.fromhex(self.boot_time)
561 + pack("B", self.shut_time_set)
562 + bytes.fromhex(self.shut_time)
566 class _SET_PHONE(GPS303Pkt):
567 OUT_KWARGS = (("phone", str, ""),)
569 def out_encode(self) -> bytes:
571 return self.phone.encode("")
574 class REMOTE_MONITOR_PHONE(_SET_PHONE):
578 class SOS_PHONE(_SET_PHONE):
582 class DAD_PHONE(_SET_PHONE):
586 class MOM_PHONE(_SET_PHONE):
590 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
594 class GPS_OFF_PERIOD(GPS303Pkt):
598 ("fm", hhmm, "0000"),
599 ("to", hhmm, "2359"),
602 def out_encode(self) -> bytes:
604 pack("B", self.onoff)
605 + bytes.fromhex(self.fm)
606 + bytes.fromhex(self.to)
610 class DND_PERIOD(GPS303Pkt):
615 ("fm1", hhmm, "0000"),
616 ("to1", hhmm, "2359"),
617 ("fm2", hhmm, "0000"),
618 ("to2", hhmm, "2359"),
621 def out_encode(self) -> bytes:
623 pack("B", self.onoff)
624 + pack("B", self.week)
625 + bytes.fromhex(self.fm1)
626 + bytes.fromhex(self.to1)
627 + bytes.fromhex(self.fm2)
628 + bytes.fromhex(self.to2)
632 class RESTART_SHUTDOWN(GPS303Pkt):
634 OUT_KWARGS = (("flag", int, 0),)
636 def out_encode(self) -> bytes:
639 return pack("B", self.flag)
642 class DEVICE(GPS303Pkt):
644 OUT_KWARGS = (("flag", int, 0),)
646 # 0 - Stop looking for equipment
647 # 1 - Start looking for equipment
648 def out_encode(self) -> bytes:
649 return pack("B", self.flag)
652 class ALARM_CLOCK(GPS303Pkt):
655 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
657 ("alarms", l3alarms, []),
660 def out_encode(self) -> bytes:
662 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
666 class STOP_ALARM(GPS303Pkt):
669 def in_decode(self, length: int, payload: bytes) -> None:
670 self.flag = payload[0]
673 class SETUP(GPS303Pkt):
675 RESPOND = Respond.EXT
677 ("uploadintervalseconds", intx, 0x0300),
678 ("binaryswitch", intx, 0b00110001),
679 ("alarms", l3int, [0, 0, 0]),
680 ("dndtimeswitch", int, 0),
681 ("dndtimes", l3int, [0, 0, 0]),
682 ("gpstimeswitch", int, 0),
683 ("gpstimestart", int, 0),
684 ("gpstimestop", int, 0),
685 ("phonenumbers", l3str, ["", "", ""]),
688 def out_encode(self) -> bytes:
689 def pack3b(x: int) -> bytes:
690 return pack("!I", x)[1:]
694 pack("!H", self.uploadintervalseconds),
695 pack("B", self.binaryswitch),
697 + [pack3b(el) for el in self.alarms]
699 pack("B", self.dndtimeswitch),
701 + [pack3b(el) for el in self.dndtimes]
703 pack("B", self.gpstimeswitch),
704 pack("!H", self.gpstimestart),
705 pack("!H", self.gpstimestop),
707 + [b";".join([el.encode() for el in self.phonenumbers])]
710 def in_encode(self) -> bytes:
714 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
718 class RESTORE_PASSWORD(GPS303Pkt):
722 class WIFI_POSITIONING(_WIFI_POSITIONING):
724 RESPOND = Respond.EXT
725 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
727 def out_encode(self) -> bytes:
728 if self.latitude is None or self.longitude is None:
730 return "{:+#010.8g},{:+#010.8g}".format(
731 self.latitude, self.longitude
734 def out_decode(self, length: int, payload: bytes) -> None:
735 lat, lon = payload.decode().split(",")
736 self.latitude = float(lat)
737 self.longitude = float(lon)
740 class MANUAL_POSITIONING(GPS303Pkt):
743 def in_decode(self, length: int, payload: bytes) -> None:
744 self.flag = payload[0] if len(payload) > 0 else -1
749 4: "LBS search > 3 times",
750 5: "Same LBS and WiFi data",
751 6: "LBS prohibited, WiFi absent",
752 7: "GPS spacing < 50 m",
753 }.get(self.flag, "Unknown")
756 class BATTERY_CHARGE(GPS303Pkt):
760 class CHARGER_CONNECTED(GPS303Pkt):
764 class CHARGER_DISCONNECTED(GPS303Pkt):
768 class VIBRATION_RECEIVED(GPS303Pkt):
772 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
774 RESPOND = Respond.EXT
775 OUT_KWARGS = (("interval", int, 10),)
777 def in_decode(self, length: int, payload: bytes) -> None:
778 self.interval = unpack("!H", payload[:2])
780 def out_encode(self) -> bytes:
781 return pack("!H", self.interval)
784 class SOS_ALARM(GPS303Pkt):
788 class UNKNOWN_B3(GPS303Pkt):
790 IN_KWARGS = (("asciidata", str, ""),)
792 def in_decode(self, length: int, payload: bytes) -> None:
793 self.asciidata = payload.decode()
796 # Build dicts protocol number -> class and class name -> protocol number
799 if True: # just to indent the code, sorry!
802 for name, cls in globals().items()
804 and issubclass(cls, GPS303Pkt)
805 and not name.startswith("_")
807 if hasattr(cls, "PROTO"):
808 CLASSES[cls.PROTO] = cls
809 PROTOS[cls.__name__] = cls.PROTO
814 ) -> Union[Type[GPS303Pkt], List[str]]:
815 if prefix.startswith(PROTO_PREFIX):
816 pname = prefix[len(PROTO_PREFIX) :]
818 raise KeyError(pname)
821 for name, proto in PROTOS.items()
822 if name.upper().startswith(prefix.upper())
825 return [name for name, _ in lst]
827 return CLASSES[proto]
830 def proto_handled(proto: str) -> bool:
831 return proto.startswith(PROTO_PREFIX)
834 def proto_of_message(packet: bytes) -> str:
835 return CLASSES.get(packet[1], UNKNOWN).proto_name()
838 def imei_from_packet(packet: bytes) -> Optional[str]:
839 if packet[1] == LOGIN.PROTO:
840 msg = parse_message(packet)
841 if isinstance(msg, LOGIN):
846 def is_goodbye_packet(packet: bytes) -> bool:
847 return packet[1] == HIBERNATION.PROTO
850 def inline_response(packet: bytes) -> Optional[bytes]:
854 if cls.RESPOND is Respond.INL:
855 return cls.Out().packed
859 def probe_buffer(buffer: bytes) -> bool:
860 framestart = buffer.find(b"xx")
863 if len(buffer) - framestart < 6:
868 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
869 """From a packet (without framing bytes) derive the XXX.In object"""
870 length, proto = unpack("BB", packet[:2])
872 if proto not in CLASSES:
873 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
874 f"Proto {proto} is unknown"
879 return CLASSES[proto].In(length, payload)
881 return CLASSES[proto].Out(length, payload)
882 except (DecodeError, ValueError, IndexError) as e:
885 retobj = UNKNOWN.In(length, payload)
887 retobj = UNKNOWN.Out(length, payload)
888 retobj.PROTO = proto # Override class attr with object attr
893 def exposed_protos() -> List[Tuple[str, bool]]:
895 (cls.proto_name(), cls.RESPOND is Respond.EXT)
896 for cls in CLASSES.values()
897 if hasattr(cls, "rectified")
901 def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]:
902 if cmd == "poweroff":
903 return HIBERNATION.Out()