2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
44 "GPS_LBS_SWITCH_TIMES",
45 "REMOTE_MONITOR_PHONE",
57 "SYNCHRONOUS_WHITELIST",
63 "CHARGER_DISCONNECTED",
65 "POSITION_UPLOAD_INTERVAL",
71 class DecodeError(Exception):
72 def __init__(self, e: Exception, **kwargs: Any) -> None:
74 for k, v in kwargs.items():
78 def intx(x: Union[str, int]) -> int:
79 if isinstance(x, str):
84 def hhmm(x: str) -> str:
85 """Check for the string that represents hours and minutes"""
86 if not isinstance(x, str) or len(x) != 4:
87 raise ValueError(str(x) + " is not a four-character string")
90 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
91 raise ValueError(str(x) + " does not contain valid hours and minutes")
95 def l3str(x: Union[str, List[str]]) -> List[str]:
96 if isinstance(x, str):
98 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
99 raise ValueError(str(lx) + " is not a list of three strings")
103 def l3int(x: Union[str, List[int]]) -> List[int]:
104 if isinstance(x, str):
105 lx = [int(el) for el in x.split(",")]
106 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
107 raise ValueError(str(lx) + " is not a list of three integers")
113 For each class corresponding to a message, automatically create
114 two nested classes `In` and `Out` that also inherit from their
115 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
116 copied to the `In` nested class under the name `KWARGS`, and
117 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
118 to the nested class `Out`. In addition, method `encode` is
119 defined in both classes equal to `in_encode()` and `out_encode()`
124 cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
126 newcls = super().__new__(cls, name, bases, attrs)
135 "KWARGS": newcls.IN_KWARGS,
136 "decode": newcls.in_decode,
137 "encode": newcls.in_encode,
149 "KWARGS": newcls.OUT_KWARGS,
150 "decode": newcls.out_decode,
151 "encode": newcls.out_encode,
159 NON = 0 # Incoming, no response needed
160 INL = 1 # Birirectional, use `inline_response()`
161 EXT = 2 # Birirectional, use external responder
164 class GPS303Pkt(metaclass=MetaPkt):
165 RESPOND = Respond.NON # Do not send anything back by default
167 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
168 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
169 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
170 In: Type["GPS303Pkt"]
171 Out: Type["GPS303Pkt"]
173 def __init__(self, *args: Any, **kwargs: Any):
175 Construct the object _either_ from (length, payload),
176 _or_ from the values of individual fields
178 assert not args or (len(args) == 2 and not kwargs)
179 if args: # guaranteed to be two arguments at this point
180 self.length, self.payload = args
182 self.decode(self.length, self.payload)
184 raise DecodeError(e, obj=self)
186 for kw, typ, dfl in self.KWARGS:
187 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
190 self.__class__.__name__ + " stray kwargs " + str(kwargs)
193 def __repr__(self) -> str:
194 return "{}({})".format(
195 self.__class__.__name__,
199 'bytes.fromhex("{}")'.format(v.hex())
200 if isinstance(v, bytes)
203 for k, v in self.__dict__.items()
204 if not k.startswith("_")
208 decode: Callable[["GPS303Pkt", int, bytes], None]
210 def in_decode(self, length: int, packet: bytes) -> None:
211 # Overridden in subclasses, otherwise do not decode payload
214 def out_decode(self, length: int, packet: bytes) -> None:
215 # Overridden in subclasses, otherwise do not decode payload
218 encode: Callable[["GPS303Pkt"], bytes]
220 def in_encode(self) -> bytes:
221 # Necessary to emulate terminal, which is not implemented
222 raise NotImplementedError(
223 self.__class__.__name__ + ".encode() not implemented"
226 def out_encode(self) -> bytes:
227 # Overridden in subclasses, otherwise make empty payload
231 def packed(self) -> bytes:
232 payload = self.encode()
233 length = len(payload) + 1
234 return pack("BB", length, self.PROTO) + payload
237 class UNKNOWN(GPS303Pkt):
238 PROTO = 256 # > 255 is impossible in real packets
241 class LOGIN(GPS303Pkt):
243 RESPOND = Respond.INL
244 # Default response for ACK, can also respond with STOP_UPLOAD
246 def in_decode(self, length: int, payload: bytes) -> None:
247 self.imei = payload[:-1].hex()
248 self.ver = unpack("B", payload[-1:])[0]
251 class SUPERVISION(GPS303Pkt):
253 OUT_KWARGS = (("status", int, 1),)
255 def out_encode(self) -> bytes:
256 # 1: The device automatically answers Pickup effect
257 # 2: Automatically Answering Two-way Calls
258 # 3: Ring manually answer the two-way call
259 return pack("B", self.status)
262 class HEARTBEAT(GPS303Pkt):
264 RESPOND = Respond.INL
267 class _GPS_POSITIONING(GPS303Pkt):
268 RESPOND = Respond.INL
270 def in_decode(self, length: int, payload: bytes) -> None:
271 self.dtime = payload[:6]
272 if self.dtime == b"\0\0\0\0\0\0":
275 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
276 self.devtime = datetime(
277 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
279 self.gps_data_length = payload[6] >> 4
280 self.gps_nb_sat = payload[6] & 0x0F
281 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
282 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
283 flip_lon = bool(flags & 0b0000100000000000) # bit 4
284 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
285 self.heading = flags & 0b0000001111111111 # bits 6 - last
286 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
287 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
291 def out_encode(self) -> bytes:
292 tup = datetime.utcnow().timetuple()
293 ttup = (tup[0] % 100,) + tup[1:6]
294 return pack("BBBBBB", *ttup)
297 class GPS_POSITIONING(_GPS_POSITIONING):
301 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
305 class STATUS(GPS303Pkt):
307 RESPOND = Respond.EXT
308 OUT_KWARGS = (("upload_interval", int, 25),)
310 def in_decode(self, length: int, payload: bytes) -> None:
311 self.batt, self.ver, self.timezone, self.intvl = unpack(
315 self.signal: Optional[int] = payload[4]
319 def out_encode(self) -> bytes: # Set interval in minutes
320 return pack("B", self.upload_interval)
323 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
327 class RESET(GPS303Pkt):
328 # Device sends when it got reset SMS
329 # Server can send to initiate factory reset
333 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
335 OUT_KWARGS = (("number", int, 3),)
337 def out_encode(self) -> bytes: # Number of whitelist entries
338 return pack("B", self.number)
341 class _WIFI_POSITIONING(GPS303Pkt):
342 def in_decode(self, length: int, payload: bytes) -> None:
343 self.dtime = payload[:6]
344 if self.dtime == b"\0\0\0\0\0\0":
347 self.devtime = datetime.strptime(
348 self.dtime.hex(), "%y%m%d%H%M%S"
349 ).astimezone(tz=timezone.utc)
351 for i in range(self.length): # length has special meaning here
352 slice = payload[6 + i * 7 : 13 + i * 7]
353 self.wifi_aps.append(
354 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
356 gsm_slice = payload[6 + self.length * 7 :]
357 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
359 for i in range(ncells):
360 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
361 locac, cellid, sigstr = unpack(
362 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
364 self.gsm_cells.append((locac, cellid, -sigstr))
367 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
369 RESPOND = Respond.INL
371 def out_encode(self) -> bytes:
372 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
375 class TIME(GPS303Pkt):
377 RESPOND = Respond.INL
379 def out_encode(self) -> bytes:
380 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
383 class PROHIBIT_LBS(GPS303Pkt):
385 OUT_KWARGS = (("status", int, 1),)
387 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
388 return pack("B", self.status)
391 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
394 # Data is in packed decimal
396 # 00/01 - Don't set / Set upload period
397 # HHMMHHMM - Upload period
399 # 00/01 - Don't set / Set time of boot
400 # HHMM - Time of boot
401 # 00/01 - Don't set / Set time of shutdown
402 # HHMM - Time of shutdown
403 def out_encode(self) -> bytes:
407 class _SET_PHONE(GPS303Pkt):
408 OUT_KWARGS = (("phone", str, ""),)
410 def out_encode(self) -> bytes:
412 return self.phone.encode("")
415 class REMOTE_MONITOR_PHONE(_SET_PHONE):
419 class SOS_PHONE(_SET_PHONE):
423 class DAD_PHONE(_SET_PHONE):
427 class MOM_PHONE(_SET_PHONE):
431 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
435 class GPS_OFF_PERIOD(GPS303Pkt):
439 ("fm", hhmm, "0000"),
440 ("to", hhmm, "2359"),
443 def out_encode(self) -> bytes:
445 pack("B", self.onoff)
446 + bytes.fromhex(self.fm)
447 + bytes.fromhex(self.to)
451 class DND_PERIOD(GPS303Pkt):
456 ("fm1", hhmm, "0000"),
457 ("to1", hhmm, "2359"),
458 ("fm2", hhmm, "0000"),
459 ("to2", hhmm, "2359"),
462 def out_encode(self) -> bytes:
464 pack("B", self.onoff)
465 + pack("B", self.week)
466 + bytes.fromhex(self.fm1)
467 + bytes.fromhex(self.to1)
468 + bytes.fromhex(self.fm2)
469 + bytes.fromhex(self.to2)
473 class RESTART_SHUTDOWN(GPS303Pkt):
475 OUT_KWARGS = (("flag", int, 0),)
477 def out_encode(self) -> bytes:
480 return pack("B", self.flag)
483 class DEVICE(GPS303Pkt):
485 OUT_KWARGS = (("flag", int, 0),)
487 # 0 - Stop looking for equipment
488 # 1 - Start looking for equipment
489 def out_encode(self) -> bytes:
490 return pack("B", self.flag)
493 class ALARM_CLOCK(GPS303Pkt):
496 def out_encode(self) -> bytes:
497 # TODO implement parsing kwargs
498 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
500 pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
504 class STOP_ALARM(GPS303Pkt):
507 def in_decode(self, length: int, payload: bytes) -> None:
508 self.flag = payload[0]
511 class SETUP(GPS303Pkt):
513 RESPOND = Respond.EXT
515 ("uploadintervalseconds", intx, 0x0300),
516 ("binaryswitch", intx, 0b00110001),
517 ("alarms", l3int, [0, 0, 0]),
518 ("dndtimeswitch", int, 0),
519 ("dndtimes", l3int, [0, 0, 0]),
520 ("gpstimeswitch", int, 0),
521 ("gpstimestart", int, 0),
522 ("gpstimestop", int, 0),
523 ("phonenumbers", l3str, ["", "", ""]),
526 def out_encode(self) -> bytes:
527 def pack3b(x: int) -> bytes:
528 return pack("!I", x)[1:]
532 pack("!H", self.uploadintervalseconds),
533 pack("B", self.binaryswitch),
535 + [pack3b(el) for el in self.alarms]
537 pack("B", self.dndtimeswitch),
539 + [pack3b(el) for el in self.dndtimes]
541 pack("B", self.gpstimeswitch),
542 pack("!H", self.gpstimestart),
543 pack("!H", self.gpstimestop),
545 + [b";".join([el.encode() for el in self.phonenumbers])]
549 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
553 class RESTORE_PASSWORD(GPS303Pkt):
557 class WIFI_POSITIONING(_WIFI_POSITIONING):
559 RESPOND = Respond.EXT
560 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
562 def out_encode(self) -> bytes:
563 if self.latitude is None or self.longitude is None:
565 return "{:+#010.8g},{:+#010.8g}".format(
566 self.latitude, self.longitude
569 def out_decode(self, length: int, payload: bytes) -> None:
570 lat, lon = payload.decode().split(",")
571 self.latitude = float(lat)
572 self.longitude = float(lon)
575 class MANUAL_POSITIONING(GPS303Pkt):
578 def in_decode(self, length: int, payload: bytes) -> None:
579 self.flag = payload[0] if len(payload) > 0 else -1
584 4: "LBS search > 3 times",
585 5: "Same LBS and WiFi data",
586 6: "LBS prohibited, WiFi absent",
587 7: "GPS spacing < 50 m",
588 }.get(self.flag, "Unknown")
591 class BATTERY_CHARGE(GPS303Pkt):
595 class CHARGER_CONNECTED(GPS303Pkt):
599 class CHARGER_DISCONNECTED(GPS303Pkt):
603 class VIBRATION_RECEIVED(GPS303Pkt):
607 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
609 RESPOND = Respond.EXT
610 OUT_KWARGS = (("interval", int, 10),)
612 def in_decode(self, length: int, payload: bytes) -> None:
613 self.interval = unpack("!H", payload[:2])
615 def out_encode(self) -> bytes:
616 return pack("!H", self.interval)
619 class SOS_ALARM(GPS303Pkt):
623 class UNKNOWN_B3(GPS303Pkt):
625 IN_KWARGS = (("asciidata", str, ""),)
627 def in_decode(self, length: int, payload: bytes) -> None:
628 self.asciidata = payload.decode()
631 # Build dicts protocol number -> class and class name -> protocol number
634 if True: # just to indent the code, sorry!
637 for name, cls in globals().items()
639 and issubclass(cls, GPS303Pkt)
640 and not name.startswith("_")
642 if hasattr(cls, "PROTO"):
643 CLASSES[cls.PROTO] = cls
644 PROTOS[cls.__name__] = cls.PROTO
647 def class_by_prefix(prefix: str) -> Union[type, List[Tuple[str, int]]]:
650 for name, proto in PROTOS.items()
651 if name.upper().startswith(prefix.upper())
656 return CLASSES[proto]
659 def proto_by_name(name: str) -> int:
660 return PROTOS.get(name, -1)
663 def proto_of_message(packet: bytes) -> int:
667 def inline_response(packet: bytes) -> Optional[bytes]:
668 proto = proto_of_message(packet)
671 if cls.RESPOND is Respond.INL:
672 return cls.Out().packed
676 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
677 """From a packet (without framing bytes) derive the XXX.In object"""
678 length, proto = unpack("BB", packet[:2])
680 if proto not in CLASSES:
681 cause: Union[DecodeError, ValueError] = ValueError(
682 f"Proto {proto} is unknown"
687 return CLASSES[proto].In(length, payload)
689 return CLASSES[proto].Out(length, payload)
690 except DecodeError as e:
693 retobj = UNKNOWN.In(length, payload)
695 retobj = UNKNOWN.Out(length, payload)
696 retobj.PROTO = proto # Override class attr with object attr