2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
22 from types import SimpleNamespace
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
60 def __init__(self) -> None:
63 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
65 Process next segment of the stream. Return successfully deframed
66 packets as `bytes` and error messages as `str`.
69 self.buffer += segment
70 if len(self.buffer) > MAXBUFFER:
71 # We are receiving junk. Let's drop it or we run out of memory.
73 return [f"More than {MAXBUFFER} unparseable data, dropping"]
74 msgs: List[Union[bytes, str]] = []
76 framestart = self.buffer.find(b"xx")
77 if framestart == -1: # No frames, return whatever we have
79 if framestart > 0: # Should not happen, report
81 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
83 self.buffer = self.buffer[framestart:]
84 # At this point, buffer starts with a packet
85 if len(self.buffer) < 6: # no len and proto - cannot proceed
87 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
89 # Length field can legitimeely be much less than the
90 # length of the packet (e.g. WiFi positioning), but
91 # it _should not_ be greater. Still sometimes it is.
92 # Luckily, not by too much: by maybe two or three bytes?
93 # Do this embarrassing hack to avoid accidental match
94 # of some binary data in the packet against '\r\n'.
96 frameend = self.buffer.find(b"\r\n", frameend + 1)
97 if frameend == -1 or frameend >= (
99 ): # Found realistic match or none
101 if frameend == -1: # Incomplete frame, return what we have
103 packet = self.buffer[2:frameend]
104 self.buffer = self.buffer[frameend + 2 :]
105 if len(packet) < 2: # frameend comes too early
106 msgs.append(f"Packet too short: {packet.hex()}")
111 def close(self) -> bytes:
117 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
118 return b"xx" + buffer + b"\r\n"
121 ### Parser/Constructor ###
124 class DecodeError(Exception):
125 def __init__(self, e: Exception, **kwargs: Any) -> None:
127 for k, v in kwargs.items():
131 def maybe(typ: type) -> Callable[[Any], Any]:
132 return lambda x: None if x is None else typ(x)
135 def intx(x: Union[str, int]) -> int:
136 if isinstance(x, str):
141 def boolx(x: Union[str, bool]) -> bool:
142 if isinstance(x, str):
143 if x.upper() in ("ON", "TRUE", "1"):
145 if x.upper() in ("OFF", "FALSE", "0"):
147 raise ValueError(str(x) + " could not be parsed as a Boolean")
151 def hhmm(x: str) -> str:
152 """Check for the string that represents hours and minutes"""
153 if not isinstance(x, str) or len(x) != 4:
154 raise ValueError(str(x) + " is not a four-character string")
157 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
158 raise ValueError(str(x) + " does not contain valid hours and minutes")
162 def hhmmhhmm(x: str) -> str:
163 """Check for the string that represents hours and minutes twice"""
164 if not isinstance(x, str) or len(x) != 8:
165 raise ValueError(str(x) + " is not an eight-character string")
166 return hhmm(x[:4]) + hhmm(x[4:])
169 def l3str(x: Union[str, List[str]]) -> List[str]:
170 if isinstance(x, str):
174 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
175 raise ValueError(str(lx) + " is not a list of three strings")
179 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
180 def alrmspec(sub: str) -> Tuple[int, str]:
182 raise ValueError(sub + " does not represent day and time")
196 if isinstance(x, str):
197 lx = [alrmspec(sub) for sub in x.split(",")]
200 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
201 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
202 raise ValueError(str(lx) + " is a wrong alarms specification")
203 return [(d, hhmm(tm)) for d, tm in lx]
206 def l3int(x: Union[str, List[int]]) -> List[int]:
207 if isinstance(x, str):
208 lx = [int(el) for el in x.split(",")]
211 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
212 raise ValueError(str(lx) + " is not a list of three integers")
217 NON = 0 # Incoming, no response needed
218 INL = 1 # Birirectional, use `inline_response()`
219 EXT = 2 # Birirectional, use external responder
222 class GPS303Pkt(ProtoClass):
223 RESPOND = Respond.NON # Do not send anything back by default
225 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
228 In: Type["GPS303Pkt"]
229 Out: Type["GPS303Pkt"]
233 def __getattr__(self, name: str) -> Any:
236 def __setattr__(self, name: str, value: Any) -> None:
239 def __init__(self, *args: Any, **kwargs: Any):
241 Construct the object _either_ from (length, payload),
242 _or_ from the values of individual fields
244 assert not args or (len(args) == 2 and not kwargs)
245 if args: # guaranteed to be two arguments at this point
246 self.length, self.payload = args
248 self.decode(self.length, self.payload)
250 raise DecodeError(e, obj=self)
252 for kw, typ, dfl in self.KWARGS:
253 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
256 self.__class__.__name__ + " stray kwargs " + str(kwargs)
259 def __repr__(self) -> str:
260 return "{}({})".format(
261 self.__class__.__name__,
265 'bytes.fromhex("{}")'.format(v.hex())
266 if isinstance(v, bytes)
269 for k, v in self.__dict__.items()
270 if not k.startswith("_")
274 decode: Callable[["GPS303Pkt", int, bytes], None]
276 def in_decode(self, length: int, packet: bytes) -> None:
277 # Overridden in subclasses, otherwise do not decode payload
280 def out_decode(self, length: int, packet: bytes) -> None:
281 # Overridden in subclasses, otherwise do not decode payload
284 encode: Callable[["GPS303Pkt"], bytes]
286 def in_encode(self) -> bytes:
287 # Necessary to emulate terminal, which is not implemented
288 raise NotImplementedError(
289 self.__class__.__name__ + ".encode() not implemented"
292 def out_encode(self) -> bytes:
293 # Overridden in subclasses, otherwise make empty payload
297 def packed(self) -> bytes:
298 payload = self.encode()
299 length = getattr(self, "length", len(payload) + 1)
300 return pack("BB", length, self.PROTO) + payload
303 class UNKNOWN(GPS303Pkt):
304 PROTO = 256 # > 255 is impossible in real packets
307 class LOGIN(GPS303Pkt):
309 RESPOND = Respond.INL
310 # Default response for ACK, can also respond with STOP_UPLOAD
311 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
313 def in_decode(self, length: int, payload: bytes) -> None:
314 self.imei = payload[:8].ljust(8, b"\0").hex()
315 self.ver = payload[8]
317 def in_encode(self) -> bytes:
318 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
323 class SUPERVISION(GPS303Pkt):
325 OUT_KWARGS = (("status", int, 1),)
327 def out_encode(self) -> bytes:
328 # 1: The device automatically answers Pickup effect
329 # 2: Automatically Answering Two-way Calls
330 # 3: Ring manually answer the two-way call
331 return pack("B", self.status)
334 class HEARTBEAT(GPS303Pkt):
336 RESPOND = Respond.INL
339 class _GPS_POSITIONING(GPS303Pkt):
340 RESPOND = Respond.INL
342 def in_decode(self, length: int, payload: bytes) -> None:
343 self.dtime = payload[:6]
344 if self.dtime == b"\0\0\0\0\0\0":
347 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
348 self.devtime = datetime(
349 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
351 self.gps_data_length = payload[6] >> 4
352 self.gps_nb_sat = payload[6] & 0x0F
353 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
354 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
355 flip_lon = bool(flags & 0b0000100000000000) # bit 4
356 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
357 self.heading = flags & 0b0000001111111111 # bits 6 - last
358 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
359 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
363 def out_encode(self) -> bytes:
364 tup = datetime.utcnow().timetuple()
365 ttup = (tup[0] % 100,) + tup[1:6]
366 return pack("BBBBBB", *ttup)
368 def rectified(self) -> CoordReport: # JSON-able dict
370 devtime=str(self.devtime),
371 battery_percentage=-1,
375 direction=self.heading,
376 latitude=self.latitude,
377 longitude=self.longitude,
381 class GPS_POSITIONING(_GPS_POSITIONING):
385 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
389 class STATUS(GPS303Pkt):
391 RESPOND = Respond.EXT
395 ("timezone", int, 0),
397 ("signal", maybe(int), None),
399 OUT_KWARGS = (("upload_interval", int, 25),)
401 def in_decode(self, length: int, payload: bytes) -> None:
402 self.batt, self.ver, self.timezone, self.intvl = unpack(
406 self.signal: Optional[int] = payload[4]
410 def in_encode(self) -> bytes:
411 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
412 b"" if self.signal is None else pack("B", self.signal)
415 def out_encode(self) -> bytes: # Set interval in minutes
416 return pack("B", self.upload_interval)
418 def rectified(self) -> StatusReport:
419 return StatusReport(battery_percentage=self.batt)
422 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
425 def in_encode(self) -> bytes:
429 class RESET(GPS303Pkt):
430 # Device sends when it got reset SMS
431 # Server can send to initiate factory reset
435 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
437 OUT_KWARGS = (("number", int, 3),)
439 def out_encode(self) -> bytes: # Number of whitelist entries
440 return pack("B", self.number)
443 class _WIFI_POSITIONING(GPS303Pkt):
444 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
446 ("dtime", bytes, b"\0\0\0\0\0\0"),
447 ("wifi_aps", list, []),
450 ("gsm_cells", list, []),
453 def in_decode(self, length: int, payload: bytes) -> None:
454 self.dtime = payload[:6]
455 if self.dtime == b"\0\0\0\0\0\0":
458 self.devtime = datetime.strptime(
459 self.dtime.hex(), "%y%m%d%H%M%S"
460 ).astimezone(tz=timezone.utc)
462 for i in range(self.length): # length has special meaning here
463 slice = payload[6 + i * 7 : 13 + i * 7]
464 self.wifi_aps.append(
465 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
467 gsm_slice = payload[6 + self.length * 7 :]
468 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
470 for i in range(ncells):
471 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
472 locac, cellid, sigstr = unpack(
473 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
475 self.gsm_cells.append((locac, cellid, -sigstr))
477 def in_encode(self) -> bytes:
478 self.length = len(self.wifi_aps)
484 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
486 for mac, sigstr in self.wifi_aps
489 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
492 pack("!HHB", locac, cellid, -sigstr)
493 for locac, cellid, sigstr in self.gsm_cells
499 def rectified(self) -> HintReport:
501 devtime=str(self.devtime),
502 battery_percentage=-1,
505 gsm_cells=self.gsm_cells,
506 wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
510 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
512 RESPOND = Respond.INL
514 def out_encode(self) -> bytes:
515 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
518 class TIME(GPS303Pkt):
520 RESPOND = Respond.INL
522 def out_encode(self) -> bytes:
523 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
526 class PROHIBIT_LBS(GPS303Pkt):
528 OUT_KWARGS = (("status", int, 1),)
530 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
531 return pack("B", self.status)
534 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
538 ("gps_off", boolx, False), # Clarify the meaning of 0/1
539 ("gps_interval_set", boolx, False),
540 ("gps_interval", hhmmhhmm, "00000000"),
541 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
542 ("boot_time_set", boolx, False),
543 ("boot_time", hhmm, "0000"),
544 ("shut_time_set", boolx, False),
545 ("shut_time", hhmm, "0000"),
548 def out_encode(self) -> bytes:
550 pack("B", self.gps_off)
551 + pack("B", self.gps_interval_set)
552 + bytes.fromhex(self.gps_interval)
553 + pack("B", self.lbs_off)
554 + pack("B", self.boot_time_set)
555 + bytes.fromhex(self.boot_time)
556 + pack("B", self.shut_time_set)
557 + bytes.fromhex(self.shut_time)
561 class _SET_PHONE(GPS303Pkt):
562 OUT_KWARGS = (("phone", str, ""),)
564 def out_encode(self) -> bytes:
566 return self.phone.encode("")
569 class REMOTE_MONITOR_PHONE(_SET_PHONE):
573 class SOS_PHONE(_SET_PHONE):
577 class DAD_PHONE(_SET_PHONE):
581 class MOM_PHONE(_SET_PHONE):
585 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
589 class GPS_OFF_PERIOD(GPS303Pkt):
593 ("fm", hhmm, "0000"),
594 ("to", hhmm, "2359"),
597 def out_encode(self) -> bytes:
599 pack("B", self.onoff)
600 + bytes.fromhex(self.fm)
601 + bytes.fromhex(self.to)
605 class DND_PERIOD(GPS303Pkt):
610 ("fm1", hhmm, "0000"),
611 ("to1", hhmm, "2359"),
612 ("fm2", hhmm, "0000"),
613 ("to2", hhmm, "2359"),
616 def out_encode(self) -> bytes:
618 pack("B", self.onoff)
619 + pack("B", self.week)
620 + bytes.fromhex(self.fm1)
621 + bytes.fromhex(self.to1)
622 + bytes.fromhex(self.fm2)
623 + bytes.fromhex(self.to2)
627 class RESTART_SHUTDOWN(GPS303Pkt):
629 OUT_KWARGS = (("flag", int, 0),)
631 def out_encode(self) -> bytes:
634 return pack("B", self.flag)
637 class DEVICE(GPS303Pkt):
639 OUT_KWARGS = (("flag", int, 0),)
641 # 0 - Stop looking for equipment
642 # 1 - Start looking for equipment
643 def out_encode(self) -> bytes:
644 return pack("B", self.flag)
647 class ALARM_CLOCK(GPS303Pkt):
650 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
652 ("alarms", l3alarms, []),
655 def out_encode(self) -> bytes:
657 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
661 class STOP_ALARM(GPS303Pkt):
664 def in_decode(self, length: int, payload: bytes) -> None:
665 self.flag = payload[0]
668 class SETUP(GPS303Pkt):
670 RESPOND = Respond.EXT
672 ("uploadintervalseconds", intx, 0x0300),
673 ("binaryswitch", intx, 0b00110001),
674 ("alarms", l3int, [0, 0, 0]),
675 ("dndtimeswitch", int, 0),
676 ("dndtimes", l3int, [0, 0, 0]),
677 ("gpstimeswitch", int, 0),
678 ("gpstimestart", int, 0),
679 ("gpstimestop", int, 0),
680 ("phonenumbers", l3str, ["", "", ""]),
683 def out_encode(self) -> bytes:
684 def pack3b(x: int) -> bytes:
685 return pack("!I", x)[1:]
689 pack("!H", self.uploadintervalseconds),
690 pack("B", self.binaryswitch),
692 + [pack3b(el) for el in self.alarms]
694 pack("B", self.dndtimeswitch),
696 + [pack3b(el) for el in self.dndtimes]
698 pack("B", self.gpstimeswitch),
699 pack("!H", self.gpstimestart),
700 pack("!H", self.gpstimestop),
702 + [b";".join([el.encode() for el in self.phonenumbers])]
705 def in_encode(self) -> bytes:
709 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
713 class RESTORE_PASSWORD(GPS303Pkt):
717 class WIFI_POSITIONING(_WIFI_POSITIONING):
719 RESPOND = Respond.EXT
720 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
722 def out_encode(self) -> bytes:
723 if self.latitude is None or self.longitude is None:
725 return "{:+#010.8g},{:+#010.8g}".format(
726 self.latitude, self.longitude
729 def out_decode(self, length: int, payload: bytes) -> None:
730 lat, lon = payload.decode().split(",")
731 self.latitude = float(lat)
732 self.longitude = float(lon)
735 class MANUAL_POSITIONING(GPS303Pkt):
738 def in_decode(self, length: int, payload: bytes) -> None:
739 self.flag = payload[0] if len(payload) > 0 else -1
744 4: "LBS search > 3 times",
745 5: "Same LBS and WiFi data",
746 6: "LBS prohibited, WiFi absent",
747 7: "GPS spacing < 50 m",
748 }.get(self.flag, "Unknown")
751 class BATTERY_CHARGE(GPS303Pkt):
755 class CHARGER_CONNECTED(GPS303Pkt):
759 class CHARGER_DISCONNECTED(GPS303Pkt):
763 class VIBRATION_RECEIVED(GPS303Pkt):
767 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
769 RESPOND = Respond.EXT
770 OUT_KWARGS = (("interval", int, 10),)
772 def in_decode(self, length: int, payload: bytes) -> None:
773 self.interval = unpack("!H", payload[:2])
775 def out_encode(self) -> bytes:
776 return pack("!H", self.interval)
779 class SOS_ALARM(GPS303Pkt):
783 class UNKNOWN_B3(GPS303Pkt):
785 IN_KWARGS = (("asciidata", str, ""),)
787 def in_decode(self, length: int, payload: bytes) -> None:
788 self.asciidata = payload.decode()
791 # Build dicts protocol number -> class and class name -> protocol number
794 if True: # just to indent the code, sorry!
797 for name, cls in globals().items()
799 and issubclass(cls, GPS303Pkt)
800 and not name.startswith("_")
802 if hasattr(cls, "PROTO"):
803 CLASSES[cls.PROTO] = cls
804 PROTOS[cls.__name__] = cls.PROTO
809 ) -> Union[Type[GPS303Pkt], List[str]]:
810 if prefix.startswith(PROTO_PREFIX):
811 pname = prefix[len(PROTO_PREFIX) :]
813 raise KeyError(pname)
816 for name, proto in PROTOS.items()
817 if name.upper().startswith(prefix.upper())
820 return [name for name, _ in lst]
822 return CLASSES[proto]
825 def proto_handled(proto: str) -> bool:
826 return proto.startswith(PROTO_PREFIX)
829 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
830 return PROTO_PREFIX + (
831 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
835 def proto_of_message(packet: bytes) -> str:
836 return proto_name(CLASSES.get(packet[1], UNKNOWN))
839 def imei_from_packet(packet: bytes) -> Optional[str]:
840 if packet[1] == LOGIN.PROTO:
841 msg = parse_message(packet)
842 if isinstance(msg, LOGIN):
847 def is_goodbye_packet(packet: bytes) -> bool:
848 return packet[1] == HIBERNATION.PROTO
851 def inline_response(packet: bytes) -> Optional[bytes]:
855 if cls.RESPOND is Respond.INL:
856 return cls.Out().packed
860 def probe_buffer(buffer: bytes) -> bool:
861 framestart = buffer.find(b"xx")
864 if len(buffer) - framestart < 6:
869 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
870 """From a packet (without framing bytes) derive the XXX.In object"""
871 length, proto = unpack("BB", packet[:2])
873 if proto not in CLASSES:
874 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
875 f"Proto {proto} is unknown"
880 return CLASSES[proto].In(length, payload)
882 return CLASSES[proto].Out(length, payload)
883 except (DecodeError, ValueError, IndexError) as e:
886 retobj = UNKNOWN.In(length, payload)
888 retobj = UNKNOWN.Out(length, payload)
889 retobj.PROTO = proto # Override class attr with object attr
894 def exposed_protos() -> List[Tuple[str, bool]]:
896 (proto_name(cls)[:16], cls.RESPOND is Respond.EXT)
897 for cls in CLASSES.values()
898 if hasattr(cls, "rectified")