2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
122 "KWARGS": newcls.IN_KWARGS,
123 "decode": newcls.in_decode,
124 "encode": newcls.in_encode,
127 newcls.Out = super().__new__(
132 "KWARGS": newcls.OUT_KWARGS,
133 "decode": newcls.out_decode,
134 "encode": newcls.out_encode,
141 NON = 0 # Incoming, no response needed
142 INL = 1 # Birirectional, use `inline_response()`
143 EXT = 2 # Birirectional, use external responder
146 class GPS303Pkt(metaclass=MetaPkt):
147 RESPOND = Respond.NON # Do not send anything back by default
152 def __init__(self, *args, **kwargs):
153 assert len(args) == 0
154 for kw, typ, dfl in self.KWARGS:
155 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
157 print("KWARGS", self.KWARGS)
158 print("kwargs", kwargs)
160 self.__class__.__name__ + " stray kwargs " + str(kwargs)
164 return "{}({})".format(
165 self.__class__.__name__,
169 'bytes.fromhex("{}")'.format(v.hex())
170 if isinstance(v, bytes)
173 for k, v in self.__dict__.items()
174 if not k.startswith("_")
178 def in_decode(self, length, packet):
181 def out_decode(self, length, packet):
182 raise NotImplementedError(
183 self.__class__.__name__ + ".decode() not implemented"
187 raise NotImplementedError(
188 self.__class__.__name__ + ".encode() not implemented"
191 def out_encode(self):
196 payload = self.encode()
197 length = len(payload) + 1
198 return pack("BB", length, self.PROTO) + payload
201 def from_packet(cls, length, payload):
204 self.payload = payload
205 self.decode(length, payload)
209 class UNKNOWN(GPS303Pkt):
210 PROTO = 256 # > 255 is impossible in real packets
213 class LOGIN(GPS303Pkt):
215 RESPOND = Respond.INL
216 # Default response for ACK, can also respond with STOP_UPLOAD
218 def in_decode(self, length, payload):
219 self.imei = payload[:-1].hex()
220 self.ver = unpack("B", payload[-1:])[0]
224 class SUPERVISION(GPS303Pkt):
226 OUT_KWARGS = (("status", int, 1),)
228 def out_encode(self):
229 # 1: The device automatically answers Pickup effect
230 # 2: Automatically Answering Two-way Calls
231 # 3: Ring manually answer the two-way call
232 return pack("B", self.status)
235 class HEARTBEAT(GPS303Pkt):
237 RESPOND = Respond.INL
240 class _GPS_POSITIONING(GPS303Pkt):
241 RESPOND = Respond.INL
243 def in_decode(self, length, payload):
244 self.dtime = payload[:6]
245 if self.dtime == b"\0\0\0\0\0\0":
248 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249 self.devtime = datetime(
250 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
252 self.gps_data_length = payload[6] >> 4
253 self.gps_nb_sat = payload[6] & 0x0F
254 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
256 flip_lon = bool(flags & 0b0000100000000000) # bit 4
257 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
258 self.heading = flags & 0b0000001111111111 # bits 6 - last
259 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
265 def out_encode(self):
266 tup = datetime.utcnow().timetuple()
267 ttup = (tup[0] % 100,) + tup[1:6]
268 return pack("BBBBBB", *ttup)
271 class GPS_POSITIONING(_GPS_POSITIONING):
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
279 class STATUS(GPS303Pkt):
281 RESPOND = Respond.EXT
282 OUT_KWARGS = (("upload_interval", int, 25),)
284 def in_decode(self, length, payload):
285 self.batt, self.ver, self.timezone, self.intvl = unpack(
289 self.signal = payload[4]
294 def out_encode(self): # Set interval in minutes
295 return pack("B", self.upload_interval)
298 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
300 RESPOND = Respond.INL
303 class RESET(GPS303Pkt):
304 # Device sends when it got reset SMS
305 # Server can send to initiate factory reset
309 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
311 OUT_KWARGS = (("number", int, 3),)
313 def out_encode(self): # Number of whitelist entries
314 return pack("B", number)
317 class _WIFI_POSITIONING(GPS303Pkt):
318 def in_decode(self, length, payload):
319 self.dtime = payload[:6]
320 if self.dtime == b"\0\0\0\0\0\0":
323 self.devtime = datetime.strptime(
324 self.dtime.hex(), "%y%m%d%H%M%S"
325 ).astimezone(tz=timezone.utc)
327 for i in range(self.length): # length has special meaning here
328 slice = payload[6 + i * 7 : 13 + i * 7]
329 self.wifi_aps.append(
330 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
332 gsm_slice = payload[6 + self.length * 7 :]
333 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
335 for i in range(ncells):
336 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
337 locac, cellid, sigstr = unpack(
338 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
340 self.gsm_cells.append((locac, cellid, -sigstr))
344 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
346 RESPOND = Respond.INL
348 def out_encode(self):
349 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
352 class TIME(GPS303Pkt):
354 RESPOND = Respond.INL
356 def out_encode(self):
357 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
360 class PROHIBIT_LBS(GPS303Pkt):
362 OUT_KWARGS = (("status", int, 1),)
364 def out_encode(self): # Server sent, 0-off, 1-on
365 return pack("B", self.status)
368 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
371 # Data is in packed decimal
373 # 00/01 - Don't set / Set upload period
374 # HHMMHHMM - Upload period
376 # 00/01 - Don't set / Set time of boot
377 # HHMM - Time of boot
378 # 00/01 - Don't set / Set time of shutdown
379 # HHMM - Time of shutdown
380 def out_encode(self):
384 class _SET_PHONE(GPS303Pkt):
385 OUT_KWARGS = (("phone", str, ""),)
387 def out_encode(self):
388 return self.phone.encode()
391 class REMOTE_MONITOR_PHONE(_SET_PHONE):
395 class SOS_PHONE(_SET_PHONE):
399 class DAD_PHONE(_SET_PHONE):
403 class MOM_PHONE(_SET_PHONE):
407 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
411 class GPS_OFF_PERIOD(GPS303Pkt):
415 ("fm", hhmm, "0000"),
416 ("to", hhmm, "2359"),
419 def out_encode(self):
421 pack("B", self.onoff)
422 + bytes.fromhex(self.fm)
423 + bytes.fromhex(self.to)
427 class DND_PERIOD(GPS303Pkt):
432 ("fm1", hhmm, "0000"),
433 ("to1", hhmm, "2359"),
434 ("fm2", hhmm, "0000"),
435 ("to2", hhmm, "2359"),
438 def out_endode(self):
440 pack("B", self.onoff)
441 + pack("B", self.week)
442 + bytes.fromhex(self.fm1)
443 + bytes.fromhex(self.to1)
444 + bytes.fromhex(self.fm2)
445 + bytes.fromhex(self.to2)
449 class RESTART_SHUTDOWN(GPS303Pkt):
451 OUT_KWARGS = (("flag", int, 0),)
453 def out_encode(self):
456 return pack("B", self.flag)
459 class DEVICE(GPS303Pkt):
461 OUT_KWARGS = (("flag", int, 0),)
463 # 0 - Stop looking for equipment
464 # 1 - Start looking for equipment
465 def out_encode(self):
466 return pack("B", self.flag)
469 class ALARM_CLOCK(GPS303Pkt):
472 def out_encode(self):
473 # TODO implement parsing kwargs
474 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
476 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
480 class STOP_ALARM(GPS303Pkt):
483 def in_decode(self, length, payload):
484 self.flag = payload[0]
488 class SETUP(GPS303Pkt):
490 RESPOND = Respond.EXT
492 ("uploadintervalseconds", intx, 0x0300),
493 ("binaryswitch", intx, 0b00110001),
494 ("alarms", l3int, [0, 0, 0]),
495 ("dndtimeswitch", int, 0),
496 ("dndtimes", l3int, [0, 0, 0]),
497 ("gpstimeswitch", int, 0),
498 ("gpstimestart", int, 0),
499 ("gpstimestop", int, 0),
500 ("phonenumbers", l3str, ["", "", ""]),
503 def out_encode(self):
505 return pack("!I", x)[1:]
509 pack("!H", self.uploadintervalseconds),
510 pack("B", self.binaryswitch),
512 + [pack3b(el) for el in self.alarms]
514 pack("B", self.dndtimeswitch),
516 + [pack3b(el) for el in self.dndtimes]
518 pack("B", self.gpstimeswitch),
519 pack("!H", self.gpstimestart),
520 pack("!H", self.gpstimestop),
522 + [b";".join([el.encode() for el in self.phonenumbers])]
526 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
530 class RESTORE_PASSWORD(GPS303Pkt):
534 class WIFI_POSITIONING(_WIFI_POSITIONING):
536 RESPOND = Respond.EXT
537 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
539 def out_encode(self):
540 if self.lat is None or self.lon is None:
542 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
545 class MANUAL_POSITIONING(GPS303Pkt):
548 def in_decode(self, length, payload):
549 self.flag = payload[0] if len(payload) > 0 else None
554 4: "LBS search > 3 times",
555 5: "Same LBS and WiFi data",
556 6: "LBS prohibited, WiFi absent",
557 7: "GPS spacing < 50 m",
558 }.get(self.flag, "Unknown")
562 class BATTERY_CHARGE(GPS303Pkt):
566 class CHARGER_CONNECTED(GPS303Pkt):
570 class CHARGER_DISCONNECTED(GPS303Pkt):
574 class VIBRATION_RECEIVED(GPS303Pkt):
578 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
580 RESPOND = Respond.EXT
581 OUT_KWARGS = (("interval", int, 10),)
583 def in_decode(self, length, payload):
584 self.interval = unpack("!H", payload[:2])
587 def out_encode(self):
588 return pack("!H", interval)
591 class SOS_ALARM(GPS303Pkt):
595 class UNKNOWN_B3(GPS303Pkt):
597 IN_KWARGS = (("asciidata", str, ""),)
599 def in_decode(self, length, payload):
600 self.asciidata = payload.decode()
604 # Build dicts protocol number -> class and class name -> protocol number
607 if True: # just to indent the code, sorry!
610 for name, cls in globals().items()
612 and issubclass(cls, GPS303Pkt)
613 and not name.startswith("_")
615 if hasattr(cls, "PROTO"):
616 CLASSES[cls.PROTO] = cls
617 PROTOS[cls.__name__] = cls.PROTO
620 def class_by_prefix(prefix):
623 for name, proto in PROTOS.items()
624 if name.upper().startswith(prefix.upper())
629 return CLASSES[proto]
632 def proto_by_name(name):
633 return PROTOS.get(name, -1)
636 def proto_of_message(packet):
637 return unpack("B", packet[1:2])[0]
640 def inline_response(packet):
641 proto = proto_of_message(packet)
644 if cls.RESPOND is Respond.INL:
645 return cls.Out().packed
649 def parse_message(packet):
650 """From a packet (without framing bytes) derive the XXX.In object"""
651 length, proto = unpack("BB", packet[:2])
654 return CLASSES[proto].from_packet(length, payload)
656 retobj = UNKNOWN.from_packet(length, payload)
657 retobj.PROTO = proto # Override class attr with object attr