2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import Any, Callable, Tuple
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
44 "GPS_LBS_SWITCH_TIMES",
45 "REMOTE_MONITOR_PHONE",
57 "SYNCHRONOUS_WHITELIST",
63 "CHARGER_DISCONNECTED",
65 "POSITION_UPLOAD_INTERVAL",
71 class DecodeError(Exception):
72 def __init__(self, e, **kwargs):
74 for k, v in kwargs.items():
78 if isinstance(x, str):
84 """Check for the string that represents hours and minutes"""
85 if not isinstance(x, str) or len(x) != 4:
86 raise ValueError(str(x) + " is not a four-character string")
89 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
90 raise ValueError(str(x) + " does not contain valid hours and minutes")
95 if isinstance(x, str):
97 if len(x) != 3 or not all(isinstance(el, str) for el in x):
98 raise ValueError(str(x) + " is not a list of three strings")
103 if isinstance(x, str):
105 x = [int(el) for el in x]
106 if len(x) != 3 or not all(isinstance(el, int) for el in x):
107 raise ValueError(str(x) + " is not a list of three integers")
113 For each class corresponding to a message, automatically create
114 two nested classes `In` and `Out` that also inherit from their
115 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
116 copied to the `In` nested class under the name `KWARGS`, and
117 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
118 to the nested class `Out`. In addition, method `encode` is
119 defined in both classes equal to `in_encode()` and `out_encode()`
123 def __new__(cls, name, bases, attrs):
124 newcls = super().__new__(cls, name, bases, attrs)
125 newcls.In = super().__new__(
130 "KWARGS": newcls.IN_KWARGS,
131 "decode": newcls.in_decode,
132 "encode": newcls.in_encode,
135 newcls.Out = super().__new__(
140 "KWARGS": newcls.OUT_KWARGS,
141 "decode": newcls.out_decode,
142 "encode": newcls.out_encode,
149 NON = 0 # Incoming, no response needed
150 INL = 1 # Birirectional, use `inline_response()`
151 EXT = 2 # Birirectional, use external responder
154 class GPS303Pkt(metaclass=MetaPkt):
155 RESPOND = Respond.NON # Do not send anything back by default
157 IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
158 OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
160 def __init__(self, *args, **kwargs):
162 Construct the object _either_ from (length, payload),
163 _or_ from the values of individual fields
165 assert not args or (len(args) == 2 and not kwargs)
166 if args: # guaranteed to be two arguments at this point
167 self.length, self.payload = args
169 self.decode(self.length, self.payload)
171 raise DecodeError(e, obj=self)
173 for kw, typ, dfl in self.KWARGS:
174 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
177 self.__class__.__name__ + " stray kwargs " + str(kwargs)
181 return "{}({})".format(
182 self.__class__.__name__,
186 'bytes.fromhex("{}")'.format(v.hex())
187 if isinstance(v, bytes)
190 for k, v in self.__dict__.items()
191 if not k.startswith("_")
195 def in_decode(self, length, packet):
196 # Overridden in subclasses, otherwise do not decode payload
199 def out_decode(self, length, packet):
200 # Overridden in subclasses, otherwise do not decode payload
204 # Necessary to emulate terminal, which is not implemented
205 raise NotImplementedError(
206 self.__class__.__name__ + ".encode() not implemented"
209 def out_encode(self):
210 # Overridden in subclasses, otherwise make empty payload
215 payload = self.encode()
216 length = len(payload) + 1
217 return pack("BB", length, self.PROTO) + payload
220 class UNKNOWN(GPS303Pkt):
221 PROTO = 256 # > 255 is impossible in real packets
224 class LOGIN(GPS303Pkt):
226 RESPOND = Respond.INL
227 # Default response for ACK, can also respond with STOP_UPLOAD
229 def in_decode(self, length, payload):
230 self.imei = payload[:-1].hex()
231 self.ver = unpack("B", payload[-1:])[0]
235 class SUPERVISION(GPS303Pkt):
237 OUT_KWARGS = (("status", int, 1),)
239 def out_encode(self):
240 # 1: The device automatically answers Pickup effect
241 # 2: Automatically Answering Two-way Calls
242 # 3: Ring manually answer the two-way call
243 return pack("B", self.status)
246 class HEARTBEAT(GPS303Pkt):
248 RESPOND = Respond.INL
251 class _GPS_POSITIONING(GPS303Pkt):
252 RESPOND = Respond.INL
254 def in_decode(self, length, payload):
255 self.dtime = payload[:6]
256 if self.dtime == b"\0\0\0\0\0\0":
259 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
260 self.devtime = datetime(
261 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
263 self.gps_data_length = payload[6] >> 4
264 self.gps_nb_sat = payload[6] & 0x0F
265 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
266 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
267 flip_lon = bool(flags & 0b0000100000000000) # bit 4
268 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
269 self.heading = flags & 0b0000001111111111 # bits 6 - last
270 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
271 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
276 def out_encode(self):
277 tup = datetime.utcnow().timetuple()
278 ttup = (tup[0] % 100,) + tup[1:6]
279 return pack("BBBBBB", *ttup)
282 class GPS_POSITIONING(_GPS_POSITIONING):
286 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
290 class STATUS(GPS303Pkt):
292 RESPOND = Respond.EXT
293 OUT_KWARGS = (("upload_interval", int, 25),)
295 def in_decode(self, length, payload):
296 self.batt, self.ver, self.timezone, self.intvl = unpack(
300 self.signal = payload[4]
305 def out_encode(self): # Set interval in minutes
306 return pack("B", self.upload_interval)
309 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
313 class RESET(GPS303Pkt):
314 # Device sends when it got reset SMS
315 # Server can send to initiate factory reset
319 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
321 OUT_KWARGS = (("number", int, 3),)
323 def out_encode(self): # Number of whitelist entries
324 return pack("B", number)
327 class _WIFI_POSITIONING(GPS303Pkt):
328 def in_decode(self, length, payload):
329 self.dtime = payload[:6]
330 if self.dtime == b"\0\0\0\0\0\0":
333 self.devtime = datetime.strptime(
334 self.dtime.hex(), "%y%m%d%H%M%S"
335 ).astimezone(tz=timezone.utc)
337 for i in range(self.length): # length has special meaning here
338 slice = payload[6 + i * 7 : 13 + i * 7]
339 self.wifi_aps.append(
340 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
342 gsm_slice = payload[6 + self.length * 7 :]
343 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
345 for i in range(ncells):
346 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
347 locac, cellid, sigstr = unpack(
348 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
350 self.gsm_cells.append((locac, cellid, -sigstr))
354 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
356 RESPOND = Respond.INL
358 def out_encode(self):
359 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
362 class TIME(GPS303Pkt):
364 RESPOND = Respond.INL
366 def out_encode(self):
367 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
370 class PROHIBIT_LBS(GPS303Pkt):
372 OUT_KWARGS = (("status", int, 1),)
374 def out_encode(self): # Server sent, 0-off, 1-on
375 return pack("B", self.status)
378 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
381 # Data is in packed decimal
383 # 00/01 - Don't set / Set upload period
384 # HHMMHHMM - Upload period
386 # 00/01 - Don't set / Set time of boot
387 # HHMM - Time of boot
388 # 00/01 - Don't set / Set time of shutdown
389 # HHMM - Time of shutdown
390 def out_encode(self):
394 class _SET_PHONE(GPS303Pkt):
395 OUT_KWARGS = (("phone", str, ""),)
397 def out_encode(self):
398 return self.phone.encode()
401 class REMOTE_MONITOR_PHONE(_SET_PHONE):
405 class SOS_PHONE(_SET_PHONE):
409 class DAD_PHONE(_SET_PHONE):
413 class MOM_PHONE(_SET_PHONE):
417 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
421 class GPS_OFF_PERIOD(GPS303Pkt):
425 ("fm", hhmm, "0000"),
426 ("to", hhmm, "2359"),
429 def out_encode(self):
431 pack("B", self.onoff)
432 + bytes.fromhex(self.fm)
433 + bytes.fromhex(self.to)
437 class DND_PERIOD(GPS303Pkt):
442 ("fm1", hhmm, "0000"),
443 ("to1", hhmm, "2359"),
444 ("fm2", hhmm, "0000"),
445 ("to2", hhmm, "2359"),
448 def out_endode(self):
450 pack("B", self.onoff)
451 + pack("B", self.week)
452 + bytes.fromhex(self.fm1)
453 + bytes.fromhex(self.to1)
454 + bytes.fromhex(self.fm2)
455 + bytes.fromhex(self.to2)
459 class RESTART_SHUTDOWN(GPS303Pkt):
461 OUT_KWARGS = (("flag", int, 0),)
463 def out_encode(self):
466 return pack("B", self.flag)
469 class DEVICE(GPS303Pkt):
471 OUT_KWARGS = (("flag", int, 0),)
473 # 0 - Stop looking for equipment
474 # 1 - Start looking for equipment
475 def out_encode(self):
476 return pack("B", self.flag)
479 class ALARM_CLOCK(GPS303Pkt):
482 def out_encode(self):
483 # TODO implement parsing kwargs
484 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
486 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
490 class STOP_ALARM(GPS303Pkt):
493 def in_decode(self, length, payload):
494 self.flag = payload[0]
498 class SETUP(GPS303Pkt):
500 RESPOND = Respond.EXT
502 ("uploadintervalseconds", intx, 0x0300),
503 ("binaryswitch", intx, 0b00110001),
504 ("alarms", l3int, [0, 0, 0]),
505 ("dndtimeswitch", int, 0),
506 ("dndtimes", l3int, [0, 0, 0]),
507 ("gpstimeswitch", int, 0),
508 ("gpstimestart", int, 0),
509 ("gpstimestop", int, 0),
510 ("phonenumbers", l3str, ["", "", ""]),
513 def out_encode(self):
515 return pack("!I", x)[1:]
519 pack("!H", self.uploadintervalseconds),
520 pack("B", self.binaryswitch),
522 + [pack3b(el) for el in self.alarms]
524 pack("B", self.dndtimeswitch),
526 + [pack3b(el) for el in self.dndtimes]
528 pack("B", self.gpstimeswitch),
529 pack("!H", self.gpstimestart),
530 pack("!H", self.gpstimestop),
532 + [b";".join([el.encode() for el in self.phonenumbers])]
536 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
540 class RESTORE_PASSWORD(GPS303Pkt):
544 class WIFI_POSITIONING(_WIFI_POSITIONING):
546 RESPOND = Respond.EXT
547 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
549 def out_encode(self):
550 if self.latitude is None or self.longitude is None:
552 return "{:+#010.8g},{:+#010.8g}".format(
553 self.latitude, self.longitude
556 def out_decode(self, length, payload):
557 lat, lon = payload.decode().split(",")
558 self.latitude = float(lat)
559 self.longitude = float(lon)
562 class MANUAL_POSITIONING(GPS303Pkt):
565 def in_decode(self, length, payload):
566 self.flag = payload[0] if len(payload) > 0 else None
571 4: "LBS search > 3 times",
572 5: "Same LBS and WiFi data",
573 6: "LBS prohibited, WiFi absent",
574 7: "GPS spacing < 50 m",
575 }.get(self.flag, "Unknown")
579 class BATTERY_CHARGE(GPS303Pkt):
583 class CHARGER_CONNECTED(GPS303Pkt):
587 class CHARGER_DISCONNECTED(GPS303Pkt):
591 class VIBRATION_RECEIVED(GPS303Pkt):
595 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
597 RESPOND = Respond.EXT
598 OUT_KWARGS = (("interval", int, 10),)
600 def in_decode(self, length, payload):
601 self.interval = unpack("!H", payload[:2])
604 def out_encode(self):
605 return pack("!H", interval)
608 class SOS_ALARM(GPS303Pkt):
612 class UNKNOWN_B3(GPS303Pkt):
614 IN_KWARGS = (("asciidata", str, ""),)
616 def in_decode(self, length, payload):
617 self.asciidata = payload.decode()
621 # Build dicts protocol number -> class and class name -> protocol number
624 if True: # just to indent the code, sorry!
627 for name, cls in globals().items()
629 and issubclass(cls, GPS303Pkt)
630 and not name.startswith("_")
632 if hasattr(cls, "PROTO"):
633 CLASSES[cls.PROTO] = cls
634 PROTOS[cls.__name__] = cls.PROTO
637 def class_by_prefix(prefix):
640 for name, proto in PROTOS.items()
641 if name.upper().startswith(prefix.upper())
646 return CLASSES[proto]
649 def proto_by_name(name):
650 return PROTOS.get(name, -1)
653 def proto_of_message(packet):
654 return unpack("B", packet[1:2])[0]
657 def inline_response(packet):
658 proto = proto_of_message(packet)
661 if cls.RESPOND is Respond.INL:
662 return cls.Out().packed
666 def parse_message(packet, is_incoming=True):
667 """From a packet (without framing bytes) derive the XXX.In object"""
668 length, proto = unpack("BB", packet[:2])
670 if proto not in CLASSES:
671 cause = ValueError(f"Proto {proto} is unknown")
675 return CLASSES[proto].In(length, payload)
677 return CLASSES[proto].Out(length, payload)
678 except DecodeError as e:
681 retobj = UNKNOWN.In(length, payload)
683 retobj = UNKNOWN.Out(length, payload)
684 retobj.PROTO = proto # Override class attr with object attr