2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
46 "GPS_OFFLINE_POSITIONING",
51 "WIFI_OFFLINE_POSITIONING",
54 "GPS_LBS_SWITCH_TIMES",
55 "REMOTE_MONITOR_PHONE",
67 "SYNCHRONOUS_WHITELIST",
73 "CHARGER_DISCONNECTED",
75 "POSITION_UPLOAD_INTERVAL",
81 class DecodeError(Exception):
82 def __init__(self, e: Exception, **kwargs: Any) -> None:
84 for k, v in kwargs.items():
88 def maybe(typ: type) -> Callable[[Any], Any]:
89 return lambda x: None if x is None else typ(x)
92 def intx(x: Union[str, int]) -> int:
93 if isinstance(x, str):
98 def boolx(x: Union[str, bool]) -> bool:
99 if isinstance(x, str):
100 if x.upper() in ("ON", "TRUE", "1"):
102 if x.upper() in ("OFF", "FALSE", "0"):
104 raise ValueError(str(x) + " could not be parsed as a Boolean")
108 def hhmm(x: str) -> str:
109 """Check for the string that represents hours and minutes"""
110 if not isinstance(x, str) or len(x) != 4:
111 raise ValueError(str(x) + " is not a four-character string")
114 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
115 raise ValueError(str(x) + " does not contain valid hours and minutes")
119 def hhmmhhmm(x: str) -> str:
120 """Check for the string that represents hours and minutes twice"""
121 if not isinstance(x, str) or len(x) != 8:
122 raise ValueError(str(x) + " is not an eight-character string")
123 return hhmm(x[:4]) + hhmm(x[4:])
126 def l3str(x: Union[str, List[str]]) -> List[str]:
127 if isinstance(x, str):
131 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
132 raise ValueError(str(lx) + " is not a list of three strings")
136 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
137 def alrmspec(sub: str) -> Tuple[int, str]:
139 raise ValueError(sub + " does not represent day and time")
153 if isinstance(x, str):
154 lx = [alrmspec(sub) for sub in x.split(",")]
157 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
158 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
159 raise ValueError(str(lx) + " is a wrong alarms specification")
160 return [(d, hhmm(tm)) for d, tm in lx]
163 def l3int(x: Union[str, List[int]]) -> List[int]:
164 if isinstance(x, str):
165 lx = [int(el) for el in x.split(",")]
168 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
169 raise ValueError(str(lx) + " is not a list of three integers")
175 For each class corresponding to a message, automatically create
176 two nested classes `In` and `Out` that also inherit from their
177 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
178 copied to the `In` nested class under the name `KWARGS`, and
179 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
180 to the nested class `Out`. In addition, method `encode` is
181 defined in both classes equal to `in_encode()` and `out_encode()`
187 def __getattr__(self, name: str) -> Any:
190 def __setattr__(self, name: str, value: Any) -> None:
194 cls: Type["MetaPkt"],
196 bases: Tuple[type, ...],
197 attrs: Dict[str, Any],
199 newcls = super().__new__(cls, name, bases, attrs)
200 newcls.In = super().__new__(
205 "KWARGS": newcls.IN_KWARGS,
206 "decode": newcls.in_decode,
207 "encode": newcls.in_encode,
210 newcls.Out = super().__new__(
215 "KWARGS": newcls.OUT_KWARGS,
216 "decode": newcls.out_decode,
217 "encode": newcls.out_encode,
224 NON = 0 # Incoming, no response needed
225 INL = 1 # Birirectional, use `inline_response()`
226 EXT = 2 # Birirectional, use external responder
229 class GPS303Pkt(metaclass=MetaPkt):
230 RESPOND = Respond.NON # Do not send anything back by default
232 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
233 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
234 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
235 In: Type["GPS303Pkt"]
236 Out: Type["GPS303Pkt"]
240 def __getattr__(self, name: str) -> Any:
243 def __setattr__(self, name: str, value: Any) -> None:
246 def __init__(self, *args: Any, **kwargs: Any):
248 Construct the object _either_ from (length, payload),
249 _or_ from the values of individual fields
251 assert not args or (len(args) == 2 and not kwargs)
252 if args: # guaranteed to be two arguments at this point
253 self.length, self.payload = args
255 self.decode(self.length, self.payload)
257 raise DecodeError(e, obj=self)
259 for kw, typ, dfl in self.KWARGS:
260 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
263 self.__class__.__name__ + " stray kwargs " + str(kwargs)
266 def __repr__(self) -> str:
267 return "{}({})".format(
268 self.__class__.__name__,
272 'bytes.fromhex("{}")'.format(v.hex())
273 if isinstance(v, bytes)
276 for k, v in self.__dict__.items()
277 if not k.startswith("_")
281 decode: Callable[["GPS303Pkt", int, bytes], None]
283 def in_decode(self, length: int, packet: bytes) -> None:
284 # Overridden in subclasses, otherwise do not decode payload
287 def out_decode(self, length: int, packet: bytes) -> None:
288 # Overridden in subclasses, otherwise do not decode payload
291 encode: Callable[["GPS303Pkt"], bytes]
293 def in_encode(self) -> bytes:
294 # Necessary to emulate terminal, which is not implemented
295 raise NotImplementedError(
296 self.__class__.__name__ + ".encode() not implemented"
299 def out_encode(self) -> bytes:
300 # Overridden in subclasses, otherwise make empty payload
304 def packed(self) -> bytes:
305 payload = self.encode()
306 length = getattr(self, "length", len(payload) + 1)
307 return pack("BB", length, self.PROTO) + payload
310 class UNKNOWN(GPS303Pkt):
311 PROTO = 256 # > 255 is impossible in real packets
314 class LOGIN(GPS303Pkt):
316 RESPOND = Respond.INL
317 # Default response for ACK, can also respond with STOP_UPLOAD
318 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
320 def in_decode(self, length: int, payload: bytes) -> None:
321 self.imei = payload[:8].ljust(8, b"\0").hex()
322 self.ver = payload[8]
324 def in_encode(self) -> bytes:
325 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
330 class SUPERVISION(GPS303Pkt):
332 OUT_KWARGS = (("status", int, 1),)
334 def out_encode(self) -> bytes:
335 # 1: The device automatically answers Pickup effect
336 # 2: Automatically Answering Two-way Calls
337 # 3: Ring manually answer the two-way call
338 return pack("B", self.status)
341 class HEARTBEAT(GPS303Pkt):
343 RESPOND = Respond.INL
346 class _GPS_POSITIONING(GPS303Pkt):
347 RESPOND = Respond.INL
349 def in_decode(self, length: int, payload: bytes) -> None:
350 self.dtime = payload[:6]
351 if self.dtime == b"\0\0\0\0\0\0":
354 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
355 self.devtime = datetime(
356 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
358 self.gps_data_length = payload[6] >> 4
359 self.gps_nb_sat = payload[6] & 0x0F
360 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
361 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
362 flip_lon = bool(flags & 0b0000100000000000) # bit 4
363 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
364 self.heading = flags & 0b0000001111111111 # bits 6 - last
365 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
366 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
370 def out_encode(self) -> bytes:
371 tup = datetime.utcnow().timetuple()
372 ttup = (tup[0] % 100,) + tup[1:6]
373 return pack("BBBBBB", *ttup)
376 class GPS_POSITIONING(_GPS_POSITIONING):
380 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
384 class STATUS(GPS303Pkt):
386 RESPOND = Respond.EXT
390 ("timezone", int, 0),
392 ("signal", maybe(int), None),
394 OUT_KWARGS = (("upload_interval", int, 25),)
396 def in_decode(self, length: int, payload: bytes) -> None:
397 self.batt, self.ver, self.timezone, self.intvl = unpack(
401 self.signal: Optional[int] = payload[4]
405 def in_encode(self) -> bytes:
406 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
407 b"" if self.signal is None else pack("B", self.signal)
410 def out_encode(self) -> bytes: # Set interval in minutes
411 return pack("B", self.upload_interval)
414 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
417 def in_encode(self) -> bytes:
421 class RESET(GPS303Pkt):
422 # Device sends when it got reset SMS
423 # Server can send to initiate factory reset
427 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
429 OUT_KWARGS = (("number", int, 3),)
431 def out_encode(self) -> bytes: # Number of whitelist entries
432 return pack("B", self.number)
435 class _WIFI_POSITIONING(GPS303Pkt):
436 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
438 ("dtime", bytes, b"\0\0\0\0\0\0"),
439 ("wifi_aps", list, []),
442 ("gsm_cells", list, []),
445 def in_decode(self, length: int, payload: bytes) -> None:
446 self.dtime = payload[:6]
447 if self.dtime == b"\0\0\0\0\0\0":
450 self.devtime = datetime.strptime(
451 self.dtime.hex(), "%y%m%d%H%M%S"
452 ).astimezone(tz=timezone.utc)
454 for i in range(self.length): # length has special meaning here
455 slice = payload[6 + i * 7 : 13 + i * 7]
456 self.wifi_aps.append(
457 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
459 gsm_slice = payload[6 + self.length * 7 :]
460 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
462 for i in range(ncells):
463 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
464 locac, cellid, sigstr = unpack(
465 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
467 self.gsm_cells.append((locac, cellid, -sigstr))
469 def in_encode(self) -> bytes:
470 self.length = len(self.wifi_aps)
476 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
478 for mac, sigstr in self.wifi_aps
481 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
484 pack("!HHB", locac, cellid, -sigstr)
485 for locac, cellid, sigstr in self.gsm_cells
492 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
494 RESPOND = Respond.INL
496 def out_encode(self) -> bytes:
497 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
500 class TIME(GPS303Pkt):
502 RESPOND = Respond.INL
504 def out_encode(self) -> bytes:
505 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
508 class PROHIBIT_LBS(GPS303Pkt):
510 OUT_KWARGS = (("status", int, 1),)
512 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
513 return pack("B", self.status)
516 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
520 ("gps_off", boolx, False), # Clarify the meaning of 0/1
521 ("gps_interval_set", boolx, False),
522 ("gps_interval", hhmmhhmm, "00000000"),
523 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
524 ("boot_time_set", boolx, False),
525 ("boot_time", hhmm, "0000"),
526 ("shut_time_set", boolx, False),
527 ("shut_time", hhmm, "0000"),
530 def out_encode(self) -> bytes:
532 pack("B", self.gps_off)
533 + pack("B", self.gps_interval_set)
534 + bytes.fromhex(self.gps_interval)
535 + pack("B", self.lbs_off)
536 + pack("B", self.boot_time_set)
537 + bytes.fromhex(self.boot_time)
538 + pack("B", self.shut_time_set)
539 + bytes.fromhex(self.shut_time)
543 class _SET_PHONE(GPS303Pkt):
544 OUT_KWARGS = (("phone", str, ""),)
546 def out_encode(self) -> bytes:
548 return self.phone.encode("")
551 class REMOTE_MONITOR_PHONE(_SET_PHONE):
555 class SOS_PHONE(_SET_PHONE):
559 class DAD_PHONE(_SET_PHONE):
563 class MOM_PHONE(_SET_PHONE):
567 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
571 class GPS_OFF_PERIOD(GPS303Pkt):
575 ("fm", hhmm, "0000"),
576 ("to", hhmm, "2359"),
579 def out_encode(self) -> bytes:
581 pack("B", self.onoff)
582 + bytes.fromhex(self.fm)
583 + bytes.fromhex(self.to)
587 class DND_PERIOD(GPS303Pkt):
592 ("fm1", hhmm, "0000"),
593 ("to1", hhmm, "2359"),
594 ("fm2", hhmm, "0000"),
595 ("to2", hhmm, "2359"),
598 def out_encode(self) -> bytes:
600 pack("B", self.onoff)
601 + pack("B", self.week)
602 + bytes.fromhex(self.fm1)
603 + bytes.fromhex(self.to1)
604 + bytes.fromhex(self.fm2)
605 + bytes.fromhex(self.to2)
609 class RESTART_SHUTDOWN(GPS303Pkt):
611 OUT_KWARGS = (("flag", int, 0),)
613 def out_encode(self) -> bytes:
616 return pack("B", self.flag)
619 class DEVICE(GPS303Pkt):
621 OUT_KWARGS = (("flag", int, 0),)
623 # 0 - Stop looking for equipment
624 # 1 - Start looking for equipment
625 def out_encode(self) -> bytes:
626 return pack("B", self.flag)
629 class ALARM_CLOCK(GPS303Pkt):
632 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
634 ("alarms", l3alarms, []),
637 def out_encode(self) -> bytes:
639 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
643 class STOP_ALARM(GPS303Pkt):
646 def in_decode(self, length: int, payload: bytes) -> None:
647 self.flag = payload[0]
650 class SETUP(GPS303Pkt):
652 RESPOND = Respond.EXT
654 ("uploadintervalseconds", intx, 0x0300),
655 ("binaryswitch", intx, 0b00110001),
656 ("alarms", l3int, [0, 0, 0]),
657 ("dndtimeswitch", int, 0),
658 ("dndtimes", l3int, [0, 0, 0]),
659 ("gpstimeswitch", int, 0),
660 ("gpstimestart", int, 0),
661 ("gpstimestop", int, 0),
662 ("phonenumbers", l3str, ["", "", ""]),
665 def out_encode(self) -> bytes:
666 def pack3b(x: int) -> bytes:
667 return pack("!I", x)[1:]
671 pack("!H", self.uploadintervalseconds),
672 pack("B", self.binaryswitch),
674 + [pack3b(el) for el in self.alarms]
676 pack("B", self.dndtimeswitch),
678 + [pack3b(el) for el in self.dndtimes]
680 pack("B", self.gpstimeswitch),
681 pack("!H", self.gpstimestart),
682 pack("!H", self.gpstimestop),
684 + [b";".join([el.encode() for el in self.phonenumbers])]
687 def in_encode(self) -> bytes:
691 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
695 class RESTORE_PASSWORD(GPS303Pkt):
699 class WIFI_POSITIONING(_WIFI_POSITIONING):
701 RESPOND = Respond.EXT
702 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
704 def out_encode(self) -> bytes:
705 if self.latitude is None or self.longitude is None:
707 return "{:+#010.8g},{:+#010.8g}".format(
708 self.latitude, self.longitude
711 def out_decode(self, length: int, payload: bytes) -> None:
712 lat, lon = payload.decode().split(",")
713 self.latitude = float(lat)
714 self.longitude = float(lon)
717 class MANUAL_POSITIONING(GPS303Pkt):
720 def in_decode(self, length: int, payload: bytes) -> None:
721 self.flag = payload[0] if len(payload) > 0 else -1
726 4: "LBS search > 3 times",
727 5: "Same LBS and WiFi data",
728 6: "LBS prohibited, WiFi absent",
729 7: "GPS spacing < 50 m",
730 }.get(self.flag, "Unknown")
733 class BATTERY_CHARGE(GPS303Pkt):
737 class CHARGER_CONNECTED(GPS303Pkt):
741 class CHARGER_DISCONNECTED(GPS303Pkt):
745 class VIBRATION_RECEIVED(GPS303Pkt):
749 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
751 RESPOND = Respond.EXT
752 OUT_KWARGS = (("interval", int, 10),)
754 def in_decode(self, length: int, payload: bytes) -> None:
755 self.interval = unpack("!H", payload[:2])
757 def out_encode(self) -> bytes:
758 return pack("!H", self.interval)
761 class SOS_ALARM(GPS303Pkt):
765 class UNKNOWN_B3(GPS303Pkt):
767 IN_KWARGS = (("asciidata", str, ""),)
769 def in_decode(self, length: int, payload: bytes) -> None:
770 self.asciidata = payload.decode()
773 # Build dicts protocol number -> class and class name -> protocol number
776 if True: # just to indent the code, sorry!
779 for name, cls in globals().items()
781 and issubclass(cls, GPS303Pkt)
782 and not name.startswith("_")
784 if hasattr(cls, "PROTO"):
785 CLASSES[cls.PROTO] = cls
786 PROTOS[cls.__name__] = cls.PROTO
791 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
794 for name, proto in PROTOS.items()
795 if name.upper().startswith(prefix.upper())
800 return CLASSES[proto]
803 def proto_by_name(name: str) -> int:
804 return PROTOS.get(name, -1)
807 def proto_of_message(packet: bytes) -> int:
811 def inline_response(packet: bytes) -> Optional[bytes]:
812 proto = proto_of_message(packet)
815 if cls.RESPOND is Respond.INL:
816 return cls.Out().packed
820 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
821 """From a packet (without framing bytes) derive the XXX.In object"""
822 length, proto = unpack("BB", packet[:2])
824 if proto not in CLASSES:
825 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
826 f"Proto {proto} is unknown"
831 return CLASSES[proto].In(length, payload)
833 return CLASSES[proto].Out(length, payload)
834 except (DecodeError, ValueError, IndexError) as e:
837 retobj = UNKNOWN.In(length, payload)
839 retobj = UNKNOWN.Out(length, payload)
840 retobj.PROTO = proto # Override class attr with object attr