2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
46 "GPS_OFFLINE_POSITIONING",
51 "WIFI_OFFLINE_POSITIONING",
54 "GPS_LBS_SWITCH_TIMES",
55 "REMOTE_MONITOR_PHONE",
67 "SYNCHRONOUS_WHITELIST",
73 "CHARGER_DISCONNECTED",
75 "POSITION_UPLOAD_INTERVAL",
81 class DecodeError(Exception):
82 def __init__(self, e: Exception, **kwargs: Any) -> None:
84 for k, v in kwargs.items():
88 def intx(x: Union[str, int]) -> int:
89 if isinstance(x, str):
94 def hhmm(x: str) -> str:
95 """Check for the string that represents hours and minutes"""
96 if not isinstance(x, str) or len(x) != 4:
97 raise ValueError(str(x) + " is not a four-character string")
100 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
101 raise ValueError(str(x) + " does not contain valid hours and minutes")
105 def l3str(x: Union[str, List[str]]) -> List[str]:
106 if isinstance(x, str):
110 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
111 raise ValueError(str(lx) + " is not a list of three strings")
115 def l3int(x: Union[str, List[int]]) -> List[int]:
116 if isinstance(x, str):
117 lx = [int(el) for el in x.split(",")]
120 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
121 raise ValueError(str(lx) + " is not a list of three integers")
127 For each class corresponding to a message, automatically create
128 two nested classes `In` and `Out` that also inherit from their
129 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
130 copied to the `In` nested class under the name `KWARGS`, and
131 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
132 to the nested class `Out`. In addition, method `encode` is
133 defined in both classes equal to `in_encode()` and `out_encode()`
139 def __getattr__(self, name: str) -> Any:
142 def __setattr__(self, name: str, value: Any) -> None:
146 cls: Type["MetaPkt"],
148 bases: Tuple[type, ...],
149 attrs: Dict[str, Any],
151 newcls = super().__new__(cls, name, bases, attrs)
152 newcls.In = super().__new__(
157 "KWARGS": newcls.IN_KWARGS,
158 "decode": newcls.in_decode,
159 "encode": newcls.in_encode,
162 newcls.Out = super().__new__(
167 "KWARGS": newcls.OUT_KWARGS,
168 "decode": newcls.out_decode,
169 "encode": newcls.out_encode,
176 NON = 0 # Incoming, no response needed
177 INL = 1 # Birirectional, use `inline_response()`
178 EXT = 2 # Birirectional, use external responder
181 class GPS303Pkt(metaclass=MetaPkt):
182 RESPOND = Respond.NON # Do not send anything back by default
184 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
185 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
186 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
187 In: Type["GPS303Pkt"]
188 Out: Type["GPS303Pkt"]
192 def __getattr__(self, name: str) -> Any:
195 def __setattr__(self, name: str, value: Any) -> None:
198 def __init__(self, *args: Any, **kwargs: Any):
200 Construct the object _either_ from (length, payload),
201 _or_ from the values of individual fields
203 assert not args or (len(args) == 2 and not kwargs)
204 if args: # guaranteed to be two arguments at this point
205 self.length, self.payload = args
207 self.decode(self.length, self.payload)
209 raise DecodeError(e, obj=self)
211 for kw, typ, dfl in self.KWARGS:
212 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
215 self.__class__.__name__ + " stray kwargs " + str(kwargs)
218 def __repr__(self) -> str:
219 return "{}({})".format(
220 self.__class__.__name__,
224 'bytes.fromhex("{}")'.format(v.hex())
225 if isinstance(v, bytes)
228 for k, v in self.__dict__.items()
229 if not k.startswith("_")
233 decode: Callable[["GPS303Pkt", int, bytes], None]
235 def in_decode(self, length: int, packet: bytes) -> None:
236 # Overridden in subclasses, otherwise do not decode payload
239 def out_decode(self, length: int, packet: bytes) -> None:
240 # Overridden in subclasses, otherwise do not decode payload
243 encode: Callable[["GPS303Pkt"], bytes]
245 def in_encode(self) -> bytes:
246 # Necessary to emulate terminal, which is not implemented
247 raise NotImplementedError(
248 self.__class__.__name__ + ".encode() not implemented"
251 def out_encode(self) -> bytes:
252 # Overridden in subclasses, otherwise make empty payload
256 def packed(self) -> bytes:
257 payload = self.encode()
258 length = len(payload) + 1
259 return pack("BB", length, self.PROTO) + payload
262 class UNKNOWN(GPS303Pkt):
263 PROTO = 256 # > 255 is impossible in real packets
266 class LOGIN(GPS303Pkt):
268 RESPOND = Respond.INL
269 # Default response for ACK, can also respond with STOP_UPLOAD
271 def in_decode(self, length: int, payload: bytes) -> None:
272 self.imei = payload[:-1].hex()
273 self.ver = unpack("B", payload[-1:])[0]
276 class SUPERVISION(GPS303Pkt):
278 OUT_KWARGS = (("status", int, 1),)
280 def out_encode(self) -> bytes:
281 # 1: The device automatically answers Pickup effect
282 # 2: Automatically Answering Two-way Calls
283 # 3: Ring manually answer the two-way call
284 return pack("B", self.status)
287 class HEARTBEAT(GPS303Pkt):
289 RESPOND = Respond.INL
292 class _GPS_POSITIONING(GPS303Pkt):
293 RESPOND = Respond.INL
295 def in_decode(self, length: int, payload: bytes) -> None:
296 self.dtime = payload[:6]
297 if self.dtime == b"\0\0\0\0\0\0":
300 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
301 self.devtime = datetime(
302 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
304 self.gps_data_length = payload[6] >> 4
305 self.gps_nb_sat = payload[6] & 0x0F
306 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
307 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
308 flip_lon = bool(flags & 0b0000100000000000) # bit 4
309 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
310 self.heading = flags & 0b0000001111111111 # bits 6 - last
311 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
312 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
316 def out_encode(self) -> bytes:
317 tup = datetime.utcnow().timetuple()
318 ttup = (tup[0] % 100,) + tup[1:6]
319 return pack("BBBBBB", *ttup)
322 class GPS_POSITIONING(_GPS_POSITIONING):
326 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
330 class STATUS(GPS303Pkt):
332 RESPOND = Respond.EXT
333 OUT_KWARGS = (("upload_interval", int, 25),)
335 def in_decode(self, length: int, payload: bytes) -> None:
336 self.batt, self.ver, self.timezone, self.intvl = unpack(
340 self.signal: Optional[int] = payload[4]
344 def out_encode(self) -> bytes: # Set interval in minutes
345 return pack("B", self.upload_interval)
348 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
352 class RESET(GPS303Pkt):
353 # Device sends when it got reset SMS
354 # Server can send to initiate factory reset
358 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
360 OUT_KWARGS = (("number", int, 3),)
362 def out_encode(self) -> bytes: # Number of whitelist entries
363 return pack("B", self.number)
366 class _WIFI_POSITIONING(GPS303Pkt):
367 def in_decode(self, length: int, payload: bytes) -> None:
368 self.dtime = payload[:6]
369 if self.dtime == b"\0\0\0\0\0\0":
372 self.devtime = datetime.strptime(
373 self.dtime.hex(), "%y%m%d%H%M%S"
374 ).astimezone(tz=timezone.utc)
376 for i in range(self.length): # length has special meaning here
377 slice = payload[6 + i * 7 : 13 + i * 7]
378 self.wifi_aps.append(
379 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
381 gsm_slice = payload[6 + self.length * 7 :]
382 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
384 for i in range(ncells):
385 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
386 locac, cellid, sigstr = unpack(
387 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
389 self.gsm_cells.append((locac, cellid, -sigstr))
392 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
394 RESPOND = Respond.INL
396 def out_encode(self) -> bytes:
397 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
400 class TIME(GPS303Pkt):
402 RESPOND = Respond.INL
404 def out_encode(self) -> bytes:
405 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
408 class PROHIBIT_LBS(GPS303Pkt):
410 OUT_KWARGS = (("status", int, 1),)
412 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
413 return pack("B", self.status)
416 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
419 # Data is in packed decimal
421 # 00/01 - Don't set / Set upload period
422 # HHMMHHMM - Upload period
424 # 00/01 - Don't set / Set time of boot
425 # HHMM - Time of boot
426 # 00/01 - Don't set / Set time of shutdown
427 # HHMM - Time of shutdown
428 def out_encode(self) -> bytes:
432 class _SET_PHONE(GPS303Pkt):
433 OUT_KWARGS = (("phone", str, ""),)
435 def out_encode(self) -> bytes:
437 return self.phone.encode("")
440 class REMOTE_MONITOR_PHONE(_SET_PHONE):
444 class SOS_PHONE(_SET_PHONE):
448 class DAD_PHONE(_SET_PHONE):
452 class MOM_PHONE(_SET_PHONE):
456 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
460 class GPS_OFF_PERIOD(GPS303Pkt):
464 ("fm", hhmm, "0000"),
465 ("to", hhmm, "2359"),
468 def out_encode(self) -> bytes:
470 pack("B", self.onoff)
471 + bytes.fromhex(self.fm)
472 + bytes.fromhex(self.to)
476 class DND_PERIOD(GPS303Pkt):
481 ("fm1", hhmm, "0000"),
482 ("to1", hhmm, "2359"),
483 ("fm2", hhmm, "0000"),
484 ("to2", hhmm, "2359"),
487 def out_encode(self) -> bytes:
489 pack("B", self.onoff)
490 + pack("B", self.week)
491 + bytes.fromhex(self.fm1)
492 + bytes.fromhex(self.to1)
493 + bytes.fromhex(self.fm2)
494 + bytes.fromhex(self.to2)
498 class RESTART_SHUTDOWN(GPS303Pkt):
500 OUT_KWARGS = (("flag", int, 0),)
502 def out_encode(self) -> bytes:
505 return pack("B", self.flag)
508 class DEVICE(GPS303Pkt):
510 OUT_KWARGS = (("flag", int, 0),)
512 # 0 - Stop looking for equipment
513 # 1 - Start looking for equipment
514 def out_encode(self) -> bytes:
515 return pack("B", self.flag)
518 class ALARM_CLOCK(GPS303Pkt):
521 def out_encode(self) -> bytes:
522 # TODO implement parsing kwargs
523 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
525 pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
529 class STOP_ALARM(GPS303Pkt):
532 def in_decode(self, length: int, payload: bytes) -> None:
533 self.flag = payload[0]
536 class SETUP(GPS303Pkt):
538 RESPOND = Respond.EXT
540 ("uploadintervalseconds", intx, 0x0300),
541 ("binaryswitch", intx, 0b00110001),
542 ("alarms", l3int, [0, 0, 0]),
543 ("dndtimeswitch", int, 0),
544 ("dndtimes", l3int, [0, 0, 0]),
545 ("gpstimeswitch", int, 0),
546 ("gpstimestart", int, 0),
547 ("gpstimestop", int, 0),
548 ("phonenumbers", l3str, ["", "", ""]),
551 def out_encode(self) -> bytes:
552 def pack3b(x: int) -> bytes:
553 return pack("!I", x)[1:]
557 pack("!H", self.uploadintervalseconds),
558 pack("B", self.binaryswitch),
560 + [pack3b(el) for el in self.alarms]
562 pack("B", self.dndtimeswitch),
564 + [pack3b(el) for el in self.dndtimes]
566 pack("B", self.gpstimeswitch),
567 pack("!H", self.gpstimestart),
568 pack("!H", self.gpstimestop),
570 + [b";".join([el.encode() for el in self.phonenumbers])]
574 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
578 class RESTORE_PASSWORD(GPS303Pkt):
582 class WIFI_POSITIONING(_WIFI_POSITIONING):
584 RESPOND = Respond.EXT
585 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
587 def out_encode(self) -> bytes:
588 if self.latitude is None or self.longitude is None:
590 return "{:+#010.8g},{:+#010.8g}".format(
591 self.latitude, self.longitude
594 def out_decode(self, length: int, payload: bytes) -> None:
595 lat, lon = payload.decode().split(",")
596 self.latitude = float(lat)
597 self.longitude = float(lon)
600 class MANUAL_POSITIONING(GPS303Pkt):
603 def in_decode(self, length: int, payload: bytes) -> None:
604 self.flag = payload[0] if len(payload) > 0 else -1
609 4: "LBS search > 3 times",
610 5: "Same LBS and WiFi data",
611 6: "LBS prohibited, WiFi absent",
612 7: "GPS spacing < 50 m",
613 }.get(self.flag, "Unknown")
616 class BATTERY_CHARGE(GPS303Pkt):
620 class CHARGER_CONNECTED(GPS303Pkt):
624 class CHARGER_DISCONNECTED(GPS303Pkt):
628 class VIBRATION_RECEIVED(GPS303Pkt):
632 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
634 RESPOND = Respond.EXT
635 OUT_KWARGS = (("interval", int, 10),)
637 def in_decode(self, length: int, payload: bytes) -> None:
638 self.interval = unpack("!H", payload[:2])
640 def out_encode(self) -> bytes:
641 return pack("!H", self.interval)
644 class SOS_ALARM(GPS303Pkt):
648 class UNKNOWN_B3(GPS303Pkt):
650 IN_KWARGS = (("asciidata", str, ""),)
652 def in_decode(self, length: int, payload: bytes) -> None:
653 self.asciidata = payload.decode()
656 # Build dicts protocol number -> class and class name -> protocol number
659 if True: # just to indent the code, sorry!
662 for name, cls in globals().items()
664 and issubclass(cls, GPS303Pkt)
665 and not name.startswith("_")
667 if hasattr(cls, "PROTO"):
668 CLASSES[cls.PROTO] = cls
669 PROTOS[cls.__name__] = cls.PROTO
674 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
677 for name, proto in PROTOS.items()
678 if name.upper().startswith(prefix.upper())
683 return CLASSES[proto]
686 def proto_by_name(name: str) -> int:
687 return PROTOS.get(name, -1)
690 def proto_of_message(packet: bytes) -> int:
694 def inline_response(packet: bytes) -> Optional[bytes]:
695 proto = proto_of_message(packet)
698 if cls.RESPOND is Respond.INL:
699 return cls.Out().packed
703 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
704 """From a packet (without framing bytes) derive the XXX.In object"""
705 length, proto = unpack("BB", packet[:2])
707 if proto not in CLASSES:
708 cause: Union[DecodeError, ValueError] = ValueError(
709 f"Proto {proto} is unknown"
714 return CLASSES[proto].In(length, payload)
716 return CLASSES[proto].Out(length, payload)
717 except DecodeError as e:
720 retobj = UNKNOWN.In(length, payload)
722 retobj = UNKNOWN.Out(length, payload)
723 retobj.PROTO = proto # Override class attr with object attr