2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
46 "GPS_OFFLINE_POSITIONING",
51 "WIFI_OFFLINE_POSITIONING",
54 "GPS_LBS_SWITCH_TIMES",
55 "REMOTE_MONITOR_PHONE",
67 "SYNCHRONOUS_WHITELIST",
73 "CHARGER_DISCONNECTED",
75 "POSITION_UPLOAD_INTERVAL",
81 class DecodeError(Exception):
82 def __init__(self, e: Exception, **kwargs: Any) -> None:
84 for k, v in kwargs.items():
88 def intx(x: Union[str, int]) -> int:
89 if isinstance(x, str):
94 def hhmm(x: str) -> str:
95 """Check for the string that represents hours and minutes"""
96 if not isinstance(x, str) or len(x) != 4:
97 raise ValueError(str(x) + " is not a four-character string")
100 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
101 raise ValueError(str(x) + " does not contain valid hours and minutes")
105 def l3str(x: Union[str, List[str]]) -> List[str]:
106 if isinstance(x, str):
108 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
109 raise ValueError(str(lx) + " is not a list of three strings")
113 def l3int(x: Union[str, List[int]]) -> List[int]:
114 if isinstance(x, str):
115 lx = [int(el) for el in x.split(",")]
116 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
117 raise ValueError(str(lx) + " is not a list of three integers")
123 For each class corresponding to a message, automatically create
124 two nested classes `In` and `Out` that also inherit from their
125 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
126 copied to the `In` nested class under the name `KWARGS`, and
127 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
128 to the nested class `Out`. In addition, method `encode` is
129 defined in both classes equal to `in_encode()` and `out_encode()`
135 def __getattr__(self, name: str) -> Any:
138 def __setattr__(self, name: str, value: Any) -> None:
142 cls: Type["MetaPkt"],
144 bases: Tuple[type, ...],
145 attrs: Dict[str, Any],
147 newcls = super().__new__(cls, name, bases, attrs)
148 newcls.In = super().__new__(
153 "KWARGS": newcls.IN_KWARGS,
154 "decode": newcls.in_decode,
155 "encode": newcls.in_encode,
158 newcls.Out = super().__new__(
163 "KWARGS": newcls.OUT_KWARGS,
164 "decode": newcls.out_decode,
165 "encode": newcls.out_encode,
172 NON = 0 # Incoming, no response needed
173 INL = 1 # Birirectional, use `inline_response()`
174 EXT = 2 # Birirectional, use external responder
177 class GPS303Pkt(metaclass=MetaPkt):
178 RESPOND = Respond.NON # Do not send anything back by default
180 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
181 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
182 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
183 In: Type["GPS303Pkt"]
184 Out: Type["GPS303Pkt"]
188 def __getattr__(self, name: str) -> Any:
191 def __setattr__(self, name: str, value: Any) -> None:
194 def __init__(self, *args: Any, **kwargs: Any):
196 Construct the object _either_ from (length, payload),
197 _or_ from the values of individual fields
199 assert not args or (len(args) == 2 and not kwargs)
200 if args: # guaranteed to be two arguments at this point
201 self.length, self.payload = args
203 self.decode(self.length, self.payload)
205 raise DecodeError(e, obj=self)
207 for kw, typ, dfl in self.KWARGS:
208 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
211 self.__class__.__name__ + " stray kwargs " + str(kwargs)
214 def __repr__(self) -> str:
215 return "{}({})".format(
216 self.__class__.__name__,
220 'bytes.fromhex("{}")'.format(v.hex())
221 if isinstance(v, bytes)
224 for k, v in self.__dict__.items()
225 if not k.startswith("_")
229 decode: Callable[["GPS303Pkt", int, bytes], None]
231 def in_decode(self, length: int, packet: bytes) -> None:
232 # Overridden in subclasses, otherwise do not decode payload
235 def out_decode(self, length: int, packet: bytes) -> None:
236 # Overridden in subclasses, otherwise do not decode payload
239 encode: Callable[["GPS303Pkt"], bytes]
241 def in_encode(self) -> bytes:
242 # Necessary to emulate terminal, which is not implemented
243 raise NotImplementedError(
244 self.__class__.__name__ + ".encode() not implemented"
247 def out_encode(self) -> bytes:
248 # Overridden in subclasses, otherwise make empty payload
252 def packed(self) -> bytes:
253 payload = self.encode()
254 length = len(payload) + 1
255 return pack("BB", length, self.PROTO) + payload
258 class UNKNOWN(GPS303Pkt):
259 PROTO = 256 # > 255 is impossible in real packets
262 class LOGIN(GPS303Pkt):
264 RESPOND = Respond.INL
265 # Default response for ACK, can also respond with STOP_UPLOAD
267 def in_decode(self, length: int, payload: bytes) -> None:
268 self.imei = payload[:-1].hex()
269 self.ver = unpack("B", payload[-1:])[0]
272 class SUPERVISION(GPS303Pkt):
274 OUT_KWARGS = (("status", int, 1),)
276 def out_encode(self) -> bytes:
277 # 1: The device automatically answers Pickup effect
278 # 2: Automatically Answering Two-way Calls
279 # 3: Ring manually answer the two-way call
280 return pack("B", self.status)
283 class HEARTBEAT(GPS303Pkt):
285 RESPOND = Respond.INL
288 class _GPS_POSITIONING(GPS303Pkt):
289 RESPOND = Respond.INL
291 def in_decode(self, length: int, payload: bytes) -> None:
292 self.dtime = payload[:6]
293 if self.dtime == b"\0\0\0\0\0\0":
296 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
297 self.devtime = datetime(
298 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
300 self.gps_data_length = payload[6] >> 4
301 self.gps_nb_sat = payload[6] & 0x0F
302 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
303 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
304 flip_lon = bool(flags & 0b0000100000000000) # bit 4
305 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
306 self.heading = flags & 0b0000001111111111 # bits 6 - last
307 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
308 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
312 def out_encode(self) -> bytes:
313 tup = datetime.utcnow().timetuple()
314 ttup = (tup[0] % 100,) + tup[1:6]
315 return pack("BBBBBB", *ttup)
318 class GPS_POSITIONING(_GPS_POSITIONING):
322 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
326 class STATUS(GPS303Pkt):
328 RESPOND = Respond.EXT
329 OUT_KWARGS = (("upload_interval", int, 25),)
331 def in_decode(self, length: int, payload: bytes) -> None:
332 self.batt, self.ver, self.timezone, self.intvl = unpack(
336 self.signal: Optional[int] = payload[4]
340 def out_encode(self) -> bytes: # Set interval in minutes
341 return pack("B", self.upload_interval)
344 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
348 class RESET(GPS303Pkt):
349 # Device sends when it got reset SMS
350 # Server can send to initiate factory reset
354 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
356 OUT_KWARGS = (("number", int, 3),)
358 def out_encode(self) -> bytes: # Number of whitelist entries
359 return pack("B", self.number)
362 class _WIFI_POSITIONING(GPS303Pkt):
363 def in_decode(self, length: int, payload: bytes) -> None:
364 self.dtime = payload[:6]
365 if self.dtime == b"\0\0\0\0\0\0":
368 self.devtime = datetime.strptime(
369 self.dtime.hex(), "%y%m%d%H%M%S"
370 ).astimezone(tz=timezone.utc)
372 for i in range(self.length): # length has special meaning here
373 slice = payload[6 + i * 7 : 13 + i * 7]
374 self.wifi_aps.append(
375 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
377 gsm_slice = payload[6 + self.length * 7 :]
378 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
380 for i in range(ncells):
381 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
382 locac, cellid, sigstr = unpack(
383 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
385 self.gsm_cells.append((locac, cellid, -sigstr))
388 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
390 RESPOND = Respond.INL
392 def out_encode(self) -> bytes:
393 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
396 class TIME(GPS303Pkt):
398 RESPOND = Respond.INL
400 def out_encode(self) -> bytes:
401 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
404 class PROHIBIT_LBS(GPS303Pkt):
406 OUT_KWARGS = (("status", int, 1),)
408 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
409 return pack("B", self.status)
412 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
415 # Data is in packed decimal
417 # 00/01 - Don't set / Set upload period
418 # HHMMHHMM - Upload period
420 # 00/01 - Don't set / Set time of boot
421 # HHMM - Time of boot
422 # 00/01 - Don't set / Set time of shutdown
423 # HHMM - Time of shutdown
424 def out_encode(self) -> bytes:
428 class _SET_PHONE(GPS303Pkt):
429 OUT_KWARGS = (("phone", str, ""),)
431 def out_encode(self) -> bytes:
433 return self.phone.encode("")
436 class REMOTE_MONITOR_PHONE(_SET_PHONE):
440 class SOS_PHONE(_SET_PHONE):
444 class DAD_PHONE(_SET_PHONE):
448 class MOM_PHONE(_SET_PHONE):
452 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
456 class GPS_OFF_PERIOD(GPS303Pkt):
460 ("fm", hhmm, "0000"),
461 ("to", hhmm, "2359"),
464 def out_encode(self) -> bytes:
466 pack("B", self.onoff)
467 + bytes.fromhex(self.fm)
468 + bytes.fromhex(self.to)
472 class DND_PERIOD(GPS303Pkt):
477 ("fm1", hhmm, "0000"),
478 ("to1", hhmm, "2359"),
479 ("fm2", hhmm, "0000"),
480 ("to2", hhmm, "2359"),
483 def out_encode(self) -> bytes:
485 pack("B", self.onoff)
486 + pack("B", self.week)
487 + bytes.fromhex(self.fm1)
488 + bytes.fromhex(self.to1)
489 + bytes.fromhex(self.fm2)
490 + bytes.fromhex(self.to2)
494 class RESTART_SHUTDOWN(GPS303Pkt):
496 OUT_KWARGS = (("flag", int, 0),)
498 def out_encode(self) -> bytes:
501 return pack("B", self.flag)
504 class DEVICE(GPS303Pkt):
506 OUT_KWARGS = (("flag", int, 0),)
508 # 0 - Stop looking for equipment
509 # 1 - Start looking for equipment
510 def out_encode(self) -> bytes:
511 return pack("B", self.flag)
514 class ALARM_CLOCK(GPS303Pkt):
517 def out_encode(self) -> bytes:
518 # TODO implement parsing kwargs
519 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
521 pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
525 class STOP_ALARM(GPS303Pkt):
528 def in_decode(self, length: int, payload: bytes) -> None:
529 self.flag = payload[0]
532 class SETUP(GPS303Pkt):
534 RESPOND = Respond.EXT
536 ("uploadintervalseconds", intx, 0x0300),
537 ("binaryswitch", intx, 0b00110001),
538 ("alarms", l3int, [0, 0, 0]),
539 ("dndtimeswitch", int, 0),
540 ("dndtimes", l3int, [0, 0, 0]),
541 ("gpstimeswitch", int, 0),
542 ("gpstimestart", int, 0),
543 ("gpstimestop", int, 0),
544 ("phonenumbers", l3str, ["", "", ""]),
547 def out_encode(self) -> bytes:
548 def pack3b(x: int) -> bytes:
549 return pack("!I", x)[1:]
553 pack("!H", self.uploadintervalseconds),
554 pack("B", self.binaryswitch),
556 + [pack3b(el) for el in self.alarms]
558 pack("B", self.dndtimeswitch),
560 + [pack3b(el) for el in self.dndtimes]
562 pack("B", self.gpstimeswitch),
563 pack("!H", self.gpstimestart),
564 pack("!H", self.gpstimestop),
566 + [b";".join([el.encode() for el in self.phonenumbers])]
570 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
574 class RESTORE_PASSWORD(GPS303Pkt):
578 class WIFI_POSITIONING(_WIFI_POSITIONING):
580 RESPOND = Respond.EXT
581 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
583 def out_encode(self) -> bytes:
584 if self.latitude is None or self.longitude is None:
586 return "{:+#010.8g},{:+#010.8g}".format(
587 self.latitude, self.longitude
590 def out_decode(self, length: int, payload: bytes) -> None:
591 lat, lon = payload.decode().split(",")
592 self.latitude = float(lat)
593 self.longitude = float(lon)
596 class MANUAL_POSITIONING(GPS303Pkt):
599 def in_decode(self, length: int, payload: bytes) -> None:
600 self.flag = payload[0] if len(payload) > 0 else -1
605 4: "LBS search > 3 times",
606 5: "Same LBS and WiFi data",
607 6: "LBS prohibited, WiFi absent",
608 7: "GPS spacing < 50 m",
609 }.get(self.flag, "Unknown")
612 class BATTERY_CHARGE(GPS303Pkt):
616 class CHARGER_CONNECTED(GPS303Pkt):
620 class CHARGER_DISCONNECTED(GPS303Pkt):
624 class VIBRATION_RECEIVED(GPS303Pkt):
628 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
630 RESPOND = Respond.EXT
631 OUT_KWARGS = (("interval", int, 10),)
633 def in_decode(self, length: int, payload: bytes) -> None:
634 self.interval = unpack("!H", payload[:2])
636 def out_encode(self) -> bytes:
637 return pack("!H", self.interval)
640 class SOS_ALARM(GPS303Pkt):
644 class UNKNOWN_B3(GPS303Pkt):
646 IN_KWARGS = (("asciidata", str, ""),)
648 def in_decode(self, length: int, payload: bytes) -> None:
649 self.asciidata = payload.decode()
652 # Build dicts protocol number -> class and class name -> protocol number
655 if True: # just to indent the code, sorry!
658 for name, cls in globals().items()
660 and issubclass(cls, GPS303Pkt)
661 and not name.startswith("_")
663 if hasattr(cls, "PROTO"):
664 CLASSES[cls.PROTO] = cls
665 PROTOS[cls.__name__] = cls.PROTO
670 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
673 for name, proto in PROTOS.items()
674 if name.upper().startswith(prefix.upper())
679 return CLASSES[proto]
682 def proto_by_name(name: str) -> int:
683 return PROTOS.get(name, -1)
686 def proto_of_message(packet: bytes) -> int:
690 def inline_response(packet: bytes) -> Optional[bytes]:
691 proto = proto_of_message(packet)
694 if cls.RESPOND is Respond.INL:
695 return cls.Out().packed
699 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
700 """From a packet (without framing bytes) derive the XXX.In object"""
701 length, proto = unpack("BB", packet[:2])
703 if proto not in CLASSES:
704 cause: Union[DecodeError, ValueError] = ValueError(
705 f"Proto {proto} is unknown"
710 return CLASSES[proto].In(length, payload)
712 return CLASSES[proto].Out(length, payload)
713 except DecodeError as e:
716 retobj = UNKNOWN.In(length, payload)
718 retobj = UNKNOWN.Out(length, payload)
719 retobj.PROTO = proto # Override class attr with object attr