2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
46 "GPS_OFFLINE_POSITIONING",
51 "WIFI_OFFLINE_POSITIONING",
54 "GPS_LBS_SWITCH_TIMES",
55 "REMOTE_MONITOR_PHONE",
67 "SYNCHRONOUS_WHITELIST",
73 "CHARGER_DISCONNECTED",
75 "POSITION_UPLOAD_INTERVAL",
81 class DecodeError(Exception):
82 def __init__(self, e: Exception, **kwargs: Any) -> None:
84 for k, v in kwargs.items():
88 def intx(x: Union[str, int]) -> int:
89 if isinstance(x, str):
94 def boolx(x: Union[str, bool]) -> bool:
95 if isinstance(x, str):
96 if x.upper() in ("ON", "TRUE", "1"):
98 if x.upper() in ("OFF", "FALSE", "0"):
100 raise ValueError(str(x) + " could not be parsed as a Boolean")
104 def hhmm(x: str) -> str:
105 """Check for the string that represents hours and minutes"""
106 if not isinstance(x, str) or len(x) != 4:
107 raise ValueError(str(x) + " is not a four-character string")
110 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
111 raise ValueError(str(x) + " does not contain valid hours and minutes")
115 def hhmmhhmm(x: str) -> str:
116 """Check for the string that represents hours and minutes twice"""
117 if not isinstance(x, str) or len(x) != 8:
118 raise ValueError(str(x) + " is not an eight-character string")
119 return hhmm(x[:4]) + hhmm(x[4:])
122 def l3str(x: Union[str, List[str]]) -> List[str]:
123 if isinstance(x, str):
127 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
128 raise ValueError(str(lx) + " is not a list of three strings")
132 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
133 def alrmspec(sub: str) -> Tuple[int, str]:
135 raise ValueError(sub + " does not represent day and time")
149 if isinstance(x, str):
150 lx = [alrmspec(sub) for sub in x.split(",")]
153 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
154 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
155 raise ValueError(str(lx) + " is a wrong alarms specification")
156 return [(d, hhmm(tm)) for d, tm in lx]
159 def l3int(x: Union[str, List[int]]) -> List[int]:
160 if isinstance(x, str):
161 lx = [int(el) for el in x.split(",")]
164 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
165 raise ValueError(str(lx) + " is not a list of three integers")
171 For each class corresponding to a message, automatically create
172 two nested classes `In` and `Out` that also inherit from their
173 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
174 copied to the `In` nested class under the name `KWARGS`, and
175 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
176 to the nested class `Out`. In addition, method `encode` is
177 defined in both classes equal to `in_encode()` and `out_encode()`
183 def __getattr__(self, name: str) -> Any:
186 def __setattr__(self, name: str, value: Any) -> None:
190 cls: Type["MetaPkt"],
192 bases: Tuple[type, ...],
193 attrs: Dict[str, Any],
195 newcls = super().__new__(cls, name, bases, attrs)
196 newcls.In = super().__new__(
201 "KWARGS": newcls.IN_KWARGS,
202 "decode": newcls.in_decode,
203 "encode": newcls.in_encode,
206 newcls.Out = super().__new__(
211 "KWARGS": newcls.OUT_KWARGS,
212 "decode": newcls.out_decode,
213 "encode": newcls.out_encode,
220 NON = 0 # Incoming, no response needed
221 INL = 1 # Birirectional, use `inline_response()`
222 EXT = 2 # Birirectional, use external responder
225 class GPS303Pkt(metaclass=MetaPkt):
226 RESPOND = Respond.NON # Do not send anything back by default
228 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
229 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
230 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
231 In: Type["GPS303Pkt"]
232 Out: Type["GPS303Pkt"]
236 def __getattr__(self, name: str) -> Any:
239 def __setattr__(self, name: str, value: Any) -> None:
242 def __init__(self, *args: Any, **kwargs: Any):
244 Construct the object _either_ from (length, payload),
245 _or_ from the values of individual fields
247 assert not args or (len(args) == 2 and not kwargs)
248 if args: # guaranteed to be two arguments at this point
249 self.length, self.payload = args
251 self.decode(self.length, self.payload)
253 raise DecodeError(e, obj=self)
255 for kw, typ, dfl in self.KWARGS:
256 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
259 self.__class__.__name__ + " stray kwargs " + str(kwargs)
262 def __repr__(self) -> str:
263 return "{}({})".format(
264 self.__class__.__name__,
268 'bytes.fromhex("{}")'.format(v.hex())
269 if isinstance(v, bytes)
272 for k, v in self.__dict__.items()
273 if not k.startswith("_")
277 decode: Callable[["GPS303Pkt", int, bytes], None]
279 def in_decode(self, length: int, packet: bytes) -> None:
280 # Overridden in subclasses, otherwise do not decode payload
283 def out_decode(self, length: int, packet: bytes) -> None:
284 # Overridden in subclasses, otherwise do not decode payload
287 encode: Callable[["GPS303Pkt"], bytes]
289 def in_encode(self) -> bytes:
290 # Necessary to emulate terminal, which is not implemented
291 raise NotImplementedError(
292 self.__class__.__name__ + ".encode() not implemented"
295 def out_encode(self) -> bytes:
296 # Overridden in subclasses, otherwise make empty payload
300 def packed(self) -> bytes:
301 payload = self.encode()
302 length = len(payload) + 1
303 return pack("BB", length, self.PROTO) + payload
306 class UNKNOWN(GPS303Pkt):
307 PROTO = 256 # > 255 is impossible in real packets
310 class LOGIN(GPS303Pkt):
312 RESPOND = Respond.INL
313 # Default response for ACK, can also respond with STOP_UPLOAD
315 def in_decode(self, length: int, payload: bytes) -> None:
316 self.imei = payload[:-1].hex()
317 self.ver = unpack("B", payload[-1:])[0]
320 class SUPERVISION(GPS303Pkt):
322 OUT_KWARGS = (("status", int, 1),)
324 def out_encode(self) -> bytes:
325 # 1: The device automatically answers Pickup effect
326 # 2: Automatically Answering Two-way Calls
327 # 3: Ring manually answer the two-way call
328 return pack("B", self.status)
331 class HEARTBEAT(GPS303Pkt):
333 RESPOND = Respond.INL
336 class _GPS_POSITIONING(GPS303Pkt):
337 RESPOND = Respond.INL
339 def in_decode(self, length: int, payload: bytes) -> None:
340 self.dtime = payload[:6]
341 if self.dtime == b"\0\0\0\0\0\0":
344 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
345 self.devtime = datetime(
346 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
348 self.gps_data_length = payload[6] >> 4
349 self.gps_nb_sat = payload[6] & 0x0F
350 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
351 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
352 flip_lon = bool(flags & 0b0000100000000000) # bit 4
353 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
354 self.heading = flags & 0b0000001111111111 # bits 6 - last
355 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
356 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
360 def out_encode(self) -> bytes:
361 tup = datetime.utcnow().timetuple()
362 ttup = (tup[0] % 100,) + tup[1:6]
363 return pack("BBBBBB", *ttup)
366 class GPS_POSITIONING(_GPS_POSITIONING):
370 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
374 class STATUS(GPS303Pkt):
376 RESPOND = Respond.EXT
377 OUT_KWARGS = (("upload_interval", int, 25),)
379 def in_decode(self, length: int, payload: bytes) -> None:
380 self.batt, self.ver, self.timezone, self.intvl = unpack(
384 self.signal: Optional[int] = payload[4]
388 def out_encode(self) -> bytes: # Set interval in minutes
389 return pack("B", self.upload_interval)
392 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
396 class RESET(GPS303Pkt):
397 # Device sends when it got reset SMS
398 # Server can send to initiate factory reset
402 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
404 OUT_KWARGS = (("number", int, 3),)
406 def out_encode(self) -> bytes: # Number of whitelist entries
407 return pack("B", self.number)
410 class _WIFI_POSITIONING(GPS303Pkt):
411 def in_decode(self, length: int, payload: bytes) -> None:
412 self.dtime = payload[:6]
413 if self.dtime == b"\0\0\0\0\0\0":
416 self.devtime = datetime.strptime(
417 self.dtime.hex(), "%y%m%d%H%M%S"
418 ).astimezone(tz=timezone.utc)
420 for i in range(self.length): # length has special meaning here
421 slice = payload[6 + i * 7 : 13 + i * 7]
422 self.wifi_aps.append(
423 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
425 gsm_slice = payload[6 + self.length * 7 :]
426 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
428 for i in range(ncells):
429 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
430 locac, cellid, sigstr = unpack(
431 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
433 self.gsm_cells.append((locac, cellid, -sigstr))
436 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
438 RESPOND = Respond.INL
440 def out_encode(self) -> bytes:
441 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
444 class TIME(GPS303Pkt):
446 RESPOND = Respond.INL
448 def out_encode(self) -> bytes:
449 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
452 class PROHIBIT_LBS(GPS303Pkt):
454 OUT_KWARGS = (("status", int, 1),)
456 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
457 return pack("B", self.status)
460 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
464 ("gps_off", boolx, False), # Clarify the meaning of 0/1
465 ("gps_interval_set", boolx, False),
466 ("gps_interval", hhmmhhmm, "00000000"),
467 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
468 ("boot_time_set", boolx, False),
469 ("boot_time", hhmm, "0000"),
470 ("shut_time_set", boolx, False),
471 ("shut_time", hhmm, "0000"),
474 def out_encode(self) -> bytes:
476 pack("B", self.gps_off)
477 + pack("B", self.gps_interval_set)
478 + bytes.fromhex(self.gps_interval)
479 + pack("B", self.lbs_off)
480 + pack("B", self.boot_time_set)
481 + bytes.fromhex(self.boot_time)
482 + pack("B", self.shut_time_set)
483 + bytes.fromhex(self.shut_time)
487 class _SET_PHONE(GPS303Pkt):
488 OUT_KWARGS = (("phone", str, ""),)
490 def out_encode(self) -> bytes:
492 return self.phone.encode("")
495 class REMOTE_MONITOR_PHONE(_SET_PHONE):
499 class SOS_PHONE(_SET_PHONE):
503 class DAD_PHONE(_SET_PHONE):
507 class MOM_PHONE(_SET_PHONE):
511 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
515 class GPS_OFF_PERIOD(GPS303Pkt):
519 ("fm", hhmm, "0000"),
520 ("to", hhmm, "2359"),
523 def out_encode(self) -> bytes:
525 pack("B", self.onoff)
526 + bytes.fromhex(self.fm)
527 + bytes.fromhex(self.to)
531 class DND_PERIOD(GPS303Pkt):
536 ("fm1", hhmm, "0000"),
537 ("to1", hhmm, "2359"),
538 ("fm2", hhmm, "0000"),
539 ("to2", hhmm, "2359"),
542 def out_encode(self) -> bytes:
544 pack("B", self.onoff)
545 + pack("B", self.week)
546 + bytes.fromhex(self.fm1)
547 + bytes.fromhex(self.to1)
548 + bytes.fromhex(self.fm2)
549 + bytes.fromhex(self.to2)
553 class RESTART_SHUTDOWN(GPS303Pkt):
555 OUT_KWARGS = (("flag", int, 0),)
557 def out_encode(self) -> bytes:
560 return pack("B", self.flag)
563 class DEVICE(GPS303Pkt):
565 OUT_KWARGS = (("flag", int, 0),)
567 # 0 - Stop looking for equipment
568 # 1 - Start looking for equipment
569 def out_encode(self) -> bytes:
570 return pack("B", self.flag)
573 class ALARM_CLOCK(GPS303Pkt):
576 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
578 ("alarms", l3alarms, []),
581 def out_encode(self) -> bytes:
583 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
587 class STOP_ALARM(GPS303Pkt):
590 def in_decode(self, length: int, payload: bytes) -> None:
591 self.flag = payload[0]
594 class SETUP(GPS303Pkt):
596 RESPOND = Respond.EXT
598 ("uploadintervalseconds", intx, 0x0300),
599 ("binaryswitch", intx, 0b00110001),
600 ("alarms", l3int, [0, 0, 0]),
601 ("dndtimeswitch", int, 0),
602 ("dndtimes", l3int, [0, 0, 0]),
603 ("gpstimeswitch", int, 0),
604 ("gpstimestart", int, 0),
605 ("gpstimestop", int, 0),
606 ("phonenumbers", l3str, ["", "", ""]),
609 def out_encode(self) -> bytes:
610 def pack3b(x: int) -> bytes:
611 return pack("!I", x)[1:]
615 pack("!H", self.uploadintervalseconds),
616 pack("B", self.binaryswitch),
618 + [pack3b(el) for el in self.alarms]
620 pack("B", self.dndtimeswitch),
622 + [pack3b(el) for el in self.dndtimes]
624 pack("B", self.gpstimeswitch),
625 pack("!H", self.gpstimestart),
626 pack("!H", self.gpstimestop),
628 + [b";".join([el.encode() for el in self.phonenumbers])]
632 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
636 class RESTORE_PASSWORD(GPS303Pkt):
640 class WIFI_POSITIONING(_WIFI_POSITIONING):
642 RESPOND = Respond.EXT
643 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
645 def out_encode(self) -> bytes:
646 if self.latitude is None or self.longitude is None:
648 return "{:+#010.8g},{:+#010.8g}".format(
649 self.latitude, self.longitude
652 def out_decode(self, length: int, payload: bytes) -> None:
653 lat, lon = payload.decode().split(",")
654 self.latitude = float(lat)
655 self.longitude = float(lon)
658 class MANUAL_POSITIONING(GPS303Pkt):
661 def in_decode(self, length: int, payload: bytes) -> None:
662 self.flag = payload[0] if len(payload) > 0 else -1
667 4: "LBS search > 3 times",
668 5: "Same LBS and WiFi data",
669 6: "LBS prohibited, WiFi absent",
670 7: "GPS spacing < 50 m",
671 }.get(self.flag, "Unknown")
674 class BATTERY_CHARGE(GPS303Pkt):
678 class CHARGER_CONNECTED(GPS303Pkt):
682 class CHARGER_DISCONNECTED(GPS303Pkt):
686 class VIBRATION_RECEIVED(GPS303Pkt):
690 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
692 RESPOND = Respond.EXT
693 OUT_KWARGS = (("interval", int, 10),)
695 def in_decode(self, length: int, payload: bytes) -> None:
696 self.interval = unpack("!H", payload[:2])
698 def out_encode(self) -> bytes:
699 return pack("!H", self.interval)
702 class SOS_ALARM(GPS303Pkt):
706 class UNKNOWN_B3(GPS303Pkt):
708 IN_KWARGS = (("asciidata", str, ""),)
710 def in_decode(self, length: int, payload: bytes) -> None:
711 self.asciidata = payload.decode()
714 # Build dicts protocol number -> class and class name -> protocol number
717 if True: # just to indent the code, sorry!
720 for name, cls in globals().items()
722 and issubclass(cls, GPS303Pkt)
723 and not name.startswith("_")
725 if hasattr(cls, "PROTO"):
726 CLASSES[cls.PROTO] = cls
727 PROTOS[cls.__name__] = cls.PROTO
732 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
735 for name, proto in PROTOS.items()
736 if name.upper().startswith(prefix.upper())
741 return CLASSES[proto]
744 def proto_by_name(name: str) -> int:
745 return PROTOS.get(name, -1)
748 def proto_of_message(packet: bytes) -> int:
752 def inline_response(packet: bytes) -> Optional[bytes]:
753 proto = proto_of_message(packet)
756 if cls.RESPOND is Respond.INL:
757 return cls.Out().packed
761 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
762 """From a packet (without framing bytes) derive the XXX.In object"""
763 length, proto = unpack("BB", packet[:2])
765 if proto not in CLASSES:
766 cause: Union[DecodeError, ValueError] = ValueError(
767 f"Proto {proto} is unknown"
772 return CLASSES[proto].In(length, payload)
774 return CLASSES[proto].Out(length, payload)
775 except DecodeError as e:
778 retobj = UNKNOWN.In(length, payload)
780 retobj = UNKNOWN.Out(length, payload)
781 retobj.PROTO = proto # Override class attr with object attr