2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
46 "GPS_OFFLINE_POSITIONING",
51 "WIFI_OFFLINE_POSITIONING",
54 "GPS_LBS_SWITCH_TIMES",
55 "REMOTE_MONITOR_PHONE",
67 "SYNCHRONOUS_WHITELIST",
73 "CHARGER_DISCONNECTED",
75 "POSITION_UPLOAD_INTERVAL",
81 class DecodeError(Exception):
82 def __init__(self, e: Exception, **kwargs: Any) -> None:
84 for k, v in kwargs.items():
88 def maybe_int(x: Optional[int]) -> Optional[int]:
89 return None if x is None else int(x)
92 def intx(x: Union[str, int]) -> int:
93 if isinstance(x, str):
98 def boolx(x: Union[str, bool]) -> bool:
99 if isinstance(x, str):
100 if x.upper() in ("ON", "TRUE", "1"):
102 if x.upper() in ("OFF", "FALSE", "0"):
104 raise ValueError(str(x) + " could not be parsed as a Boolean")
108 def hhmm(x: str) -> str:
109 """Check for the string that represents hours and minutes"""
110 if not isinstance(x, str) or len(x) != 4:
111 raise ValueError(str(x) + " is not a four-character string")
114 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
115 raise ValueError(str(x) + " does not contain valid hours and minutes")
119 def hhmmhhmm(x: str) -> str:
120 """Check for the string that represents hours and minutes twice"""
121 if not isinstance(x, str) or len(x) != 8:
122 raise ValueError(str(x) + " is not an eight-character string")
123 return hhmm(x[:4]) + hhmm(x[4:])
126 def l3str(x: Union[str, List[str]]) -> List[str]:
127 if isinstance(x, str):
131 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
132 raise ValueError(str(lx) + " is not a list of three strings")
136 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
137 def alrmspec(sub: str) -> Tuple[int, str]:
139 raise ValueError(sub + " does not represent day and time")
153 if isinstance(x, str):
154 lx = [alrmspec(sub) for sub in x.split(",")]
157 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
158 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
159 raise ValueError(str(lx) + " is a wrong alarms specification")
160 return [(d, hhmm(tm)) for d, tm in lx]
163 def l3int(x: Union[str, List[int]]) -> List[int]:
164 if isinstance(x, str):
165 lx = [int(el) for el in x.split(",")]
168 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
169 raise ValueError(str(lx) + " is not a list of three integers")
175 For each class corresponding to a message, automatically create
176 two nested classes `In` and `Out` that also inherit from their
177 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
178 copied to the `In` nested class under the name `KWARGS`, and
179 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
180 to the nested class `Out`. In addition, method `encode` is
181 defined in both classes equal to `in_encode()` and `out_encode()`
187 def __getattr__(self, name: str) -> Any:
190 def __setattr__(self, name: str, value: Any) -> None:
194 cls: Type["MetaPkt"],
196 bases: Tuple[type, ...],
197 attrs: Dict[str, Any],
199 newcls = super().__new__(cls, name, bases, attrs)
200 newcls.In = super().__new__(
205 "KWARGS": newcls.IN_KWARGS,
206 "decode": newcls.in_decode,
207 "encode": newcls.in_encode,
210 newcls.Out = super().__new__(
215 "KWARGS": newcls.OUT_KWARGS,
216 "decode": newcls.out_decode,
217 "encode": newcls.out_encode,
224 NON = 0 # Incoming, no response needed
225 INL = 1 # Birirectional, use `inline_response()`
226 EXT = 2 # Birirectional, use external responder
229 class GPS303Pkt(metaclass=MetaPkt):
230 RESPOND = Respond.NON # Do not send anything back by default
232 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
233 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
234 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
235 In: Type["GPS303Pkt"]
236 Out: Type["GPS303Pkt"]
240 def __getattr__(self, name: str) -> Any:
243 def __setattr__(self, name: str, value: Any) -> None:
246 def __init__(self, *args: Any, **kwargs: Any):
248 Construct the object _either_ from (length, payload),
249 _or_ from the values of individual fields
251 assert not args or (len(args) == 2 and not kwargs)
252 if args: # guaranteed to be two arguments at this point
253 self.length, self.payload = args
255 self.decode(self.length, self.payload)
257 raise DecodeError(e, obj=self)
259 for kw, typ, dfl in self.KWARGS:
260 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
263 self.__class__.__name__ + " stray kwargs " + str(kwargs)
266 def __repr__(self) -> str:
267 return "{}({})".format(
268 self.__class__.__name__,
272 'bytes.fromhex("{}")'.format(v.hex())
273 if isinstance(v, bytes)
276 for k, v in self.__dict__.items()
277 if not k.startswith("_")
281 decode: Callable[["GPS303Pkt", int, bytes], None]
283 def in_decode(self, length: int, packet: bytes) -> None:
284 # Overridden in subclasses, otherwise do not decode payload
287 def out_decode(self, length: int, packet: bytes) -> None:
288 # Overridden in subclasses, otherwise do not decode payload
291 encode: Callable[["GPS303Pkt"], bytes]
293 def in_encode(self) -> bytes:
294 # Necessary to emulate terminal, which is not implemented
295 raise NotImplementedError(
296 self.__class__.__name__ + ".encode() not implemented"
299 def out_encode(self) -> bytes:
300 # Overridden in subclasses, otherwise make empty payload
304 def packed(self) -> bytes:
305 payload = self.encode()
306 length = len(payload) + 1
307 return pack("BB", length, self.PROTO) + payload
310 class UNKNOWN(GPS303Pkt):
311 PROTO = 256 # > 255 is impossible in real packets
314 class LOGIN(GPS303Pkt):
316 RESPOND = Respond.INL
317 # Default response for ACK, can also respond with STOP_UPLOAD
318 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
320 def in_decode(self, length: int, payload: bytes) -> None:
321 self.imei = payload[:8].ljust(8, b"\0").hex()
322 self.ver = payload[8]
324 def in_encode(self) -> bytes:
325 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
330 class SUPERVISION(GPS303Pkt):
332 OUT_KWARGS = (("status", int, 1),)
334 def out_encode(self) -> bytes:
335 # 1: The device automatically answers Pickup effect
336 # 2: Automatically Answering Two-way Calls
337 # 3: Ring manually answer the two-way call
338 return pack("B", self.status)
341 class HEARTBEAT(GPS303Pkt):
343 RESPOND = Respond.INL
346 class _GPS_POSITIONING(GPS303Pkt):
347 RESPOND = Respond.INL
349 def in_decode(self, length: int, payload: bytes) -> None:
350 self.dtime = payload[:6]
351 if self.dtime == b"\0\0\0\0\0\0":
354 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
355 self.devtime = datetime(
356 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
358 self.gps_data_length = payload[6] >> 4
359 self.gps_nb_sat = payload[6] & 0x0F
360 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
361 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
362 flip_lon = bool(flags & 0b0000100000000000) # bit 4
363 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
364 self.heading = flags & 0b0000001111111111 # bits 6 - last
365 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
366 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
370 def out_encode(self) -> bytes:
371 tup = datetime.utcnow().timetuple()
372 ttup = (tup[0] % 100,) + tup[1:6]
373 return pack("BBBBBB", *ttup)
376 class GPS_POSITIONING(_GPS_POSITIONING):
380 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
384 class STATUS(GPS303Pkt):
386 RESPOND = Respond.EXT
390 ("timezone", int, 0),
392 ("signal", maybe_int, None),
394 OUT_KWARGS = (("upload_interval", int, 25),)
396 def in_decode(self, length: int, payload: bytes) -> None:
397 self.batt, self.ver, self.timezone, self.intvl = unpack(
401 self.signal: Optional[int] = payload[4]
405 def in_encode(self) -> bytes:
407 pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
408 if self.signal is None
409 else pack("B", self.signal)
412 def out_encode(self) -> bytes: # Set interval in minutes
413 return pack("B", self.upload_interval)
416 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
419 def in_encode(self) -> bytes:
423 class RESET(GPS303Pkt):
424 # Device sends when it got reset SMS
425 # Server can send to initiate factory reset
429 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
431 OUT_KWARGS = (("number", int, 3),)
433 def out_encode(self) -> bytes: # Number of whitelist entries
434 return pack("B", self.number)
437 class _WIFI_POSITIONING(GPS303Pkt):
438 def in_decode(self, length: int, payload: bytes) -> None:
439 self.dtime = payload[:6]
440 if self.dtime == b"\0\0\0\0\0\0":
443 self.devtime = datetime.strptime(
444 self.dtime.hex(), "%y%m%d%H%M%S"
445 ).astimezone(tz=timezone.utc)
447 for i in range(self.length): # length has special meaning here
448 slice = payload[6 + i * 7 : 13 + i * 7]
449 self.wifi_aps.append(
450 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
452 gsm_slice = payload[6 + self.length * 7 :]
453 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
455 for i in range(ncells):
456 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
457 locac, cellid, sigstr = unpack(
458 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
460 self.gsm_cells.append((locac, cellid, -sigstr))
463 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
465 RESPOND = Respond.INL
467 def out_encode(self) -> bytes:
468 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
471 class TIME(GPS303Pkt):
473 RESPOND = Respond.INL
475 def out_encode(self) -> bytes:
476 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
479 class PROHIBIT_LBS(GPS303Pkt):
481 OUT_KWARGS = (("status", int, 1),)
483 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
484 return pack("B", self.status)
487 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
491 ("gps_off", boolx, False), # Clarify the meaning of 0/1
492 ("gps_interval_set", boolx, False),
493 ("gps_interval", hhmmhhmm, "00000000"),
494 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
495 ("boot_time_set", boolx, False),
496 ("boot_time", hhmm, "0000"),
497 ("shut_time_set", boolx, False),
498 ("shut_time", hhmm, "0000"),
501 def out_encode(self) -> bytes:
503 pack("B", self.gps_off)
504 + pack("B", self.gps_interval_set)
505 + bytes.fromhex(self.gps_interval)
506 + pack("B", self.lbs_off)
507 + pack("B", self.boot_time_set)
508 + bytes.fromhex(self.boot_time)
509 + pack("B", self.shut_time_set)
510 + bytes.fromhex(self.shut_time)
514 class _SET_PHONE(GPS303Pkt):
515 OUT_KWARGS = (("phone", str, ""),)
517 def out_encode(self) -> bytes:
519 return self.phone.encode("")
522 class REMOTE_MONITOR_PHONE(_SET_PHONE):
526 class SOS_PHONE(_SET_PHONE):
530 class DAD_PHONE(_SET_PHONE):
534 class MOM_PHONE(_SET_PHONE):
538 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
542 class GPS_OFF_PERIOD(GPS303Pkt):
546 ("fm", hhmm, "0000"),
547 ("to", hhmm, "2359"),
550 def out_encode(self) -> bytes:
552 pack("B", self.onoff)
553 + bytes.fromhex(self.fm)
554 + bytes.fromhex(self.to)
558 class DND_PERIOD(GPS303Pkt):
563 ("fm1", hhmm, "0000"),
564 ("to1", hhmm, "2359"),
565 ("fm2", hhmm, "0000"),
566 ("to2", hhmm, "2359"),
569 def out_encode(self) -> bytes:
571 pack("B", self.onoff)
572 + pack("B", self.week)
573 + bytes.fromhex(self.fm1)
574 + bytes.fromhex(self.to1)
575 + bytes.fromhex(self.fm2)
576 + bytes.fromhex(self.to2)
580 class RESTART_SHUTDOWN(GPS303Pkt):
582 OUT_KWARGS = (("flag", int, 0),)
584 def out_encode(self) -> bytes:
587 return pack("B", self.flag)
590 class DEVICE(GPS303Pkt):
592 OUT_KWARGS = (("flag", int, 0),)
594 # 0 - Stop looking for equipment
595 # 1 - Start looking for equipment
596 def out_encode(self) -> bytes:
597 return pack("B", self.flag)
600 class ALARM_CLOCK(GPS303Pkt):
603 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
605 ("alarms", l3alarms, []),
608 def out_encode(self) -> bytes:
610 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
614 class STOP_ALARM(GPS303Pkt):
617 def in_decode(self, length: int, payload: bytes) -> None:
618 self.flag = payload[0]
621 class SETUP(GPS303Pkt):
623 RESPOND = Respond.EXT
625 ("uploadintervalseconds", intx, 0x0300),
626 ("binaryswitch", intx, 0b00110001),
627 ("alarms", l3int, [0, 0, 0]),
628 ("dndtimeswitch", int, 0),
629 ("dndtimes", l3int, [0, 0, 0]),
630 ("gpstimeswitch", int, 0),
631 ("gpstimestart", int, 0),
632 ("gpstimestop", int, 0),
633 ("phonenumbers", l3str, ["", "", ""]),
636 def out_encode(self) -> bytes:
637 def pack3b(x: int) -> bytes:
638 return pack("!I", x)[1:]
642 pack("!H", self.uploadintervalseconds),
643 pack("B", self.binaryswitch),
645 + [pack3b(el) for el in self.alarms]
647 pack("B", self.dndtimeswitch),
649 + [pack3b(el) for el in self.dndtimes]
651 pack("B", self.gpstimeswitch),
652 pack("!H", self.gpstimestart),
653 pack("!H", self.gpstimestop),
655 + [b";".join([el.encode() for el in self.phonenumbers])]
659 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
663 class RESTORE_PASSWORD(GPS303Pkt):
667 class WIFI_POSITIONING(_WIFI_POSITIONING):
669 RESPOND = Respond.EXT
670 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
672 def out_encode(self) -> bytes:
673 if self.latitude is None or self.longitude is None:
675 return "{:+#010.8g},{:+#010.8g}".format(
676 self.latitude, self.longitude
679 def out_decode(self, length: int, payload: bytes) -> None:
680 lat, lon = payload.decode().split(",")
681 self.latitude = float(lat)
682 self.longitude = float(lon)
685 class MANUAL_POSITIONING(GPS303Pkt):
688 def in_decode(self, length: int, payload: bytes) -> None:
689 self.flag = payload[0] if len(payload) > 0 else -1
694 4: "LBS search > 3 times",
695 5: "Same LBS and WiFi data",
696 6: "LBS prohibited, WiFi absent",
697 7: "GPS spacing < 50 m",
698 }.get(self.flag, "Unknown")
701 class BATTERY_CHARGE(GPS303Pkt):
705 class CHARGER_CONNECTED(GPS303Pkt):
709 class CHARGER_DISCONNECTED(GPS303Pkt):
713 class VIBRATION_RECEIVED(GPS303Pkt):
717 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
719 RESPOND = Respond.EXT
720 OUT_KWARGS = (("interval", int, 10),)
722 def in_decode(self, length: int, payload: bytes) -> None:
723 self.interval = unpack("!H", payload[:2])
725 def out_encode(self) -> bytes:
726 return pack("!H", self.interval)
729 class SOS_ALARM(GPS303Pkt):
733 class UNKNOWN_B3(GPS303Pkt):
735 IN_KWARGS = (("asciidata", str, ""),)
737 def in_decode(self, length: int, payload: bytes) -> None:
738 self.asciidata = payload.decode()
741 # Build dicts protocol number -> class and class name -> protocol number
744 if True: # just to indent the code, sorry!
747 for name, cls in globals().items()
749 and issubclass(cls, GPS303Pkt)
750 and not name.startswith("_")
752 if hasattr(cls, "PROTO"):
753 CLASSES[cls.PROTO] = cls
754 PROTOS[cls.__name__] = cls.PROTO
759 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
762 for name, proto in PROTOS.items()
763 if name.upper().startswith(prefix.upper())
768 return CLASSES[proto]
771 def proto_by_name(name: str) -> int:
772 return PROTOS.get(name, -1)
775 def proto_of_message(packet: bytes) -> int:
779 def inline_response(packet: bytes) -> Optional[bytes]:
780 proto = proto_of_message(packet)
783 if cls.RESPOND is Respond.INL:
784 return cls.Out().packed
788 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
789 """From a packet (without framing bytes) derive the XXX.In object"""
790 length, proto = unpack("BB", packet[:2])
792 if proto not in CLASSES:
793 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
794 f"Proto {proto} is unknown"
799 return CLASSES[proto].In(length, payload)
801 return CLASSES[proto].Out(length, payload)
802 except (DecodeError, ValueError, IndexError) as e:
805 retobj = UNKNOWN.In(length, payload)
807 retobj = UNKNOWN.Out(length, payload)
808 retobj.PROTO = proto # Override class attr with object attr