2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
43 "GPS_LBS_SWITCH_TIMES",
44 "REMOTE_MONITOR_PHONE",
56 "SYNCHRONOUS_WHITELIST",
62 "CHARGER_DISCONNECTED",
64 "POSITION_UPLOAD_INTERVAL",
70 class DecodeError(Exception):
71 def __init__(self, e, **kwargs):
73 for k, v in kwargs.items():
77 if isinstance(x, str):
83 """Check for the string that represents hours and minutes"""
84 if not isinstance(x, str) or len(x) != 4:
85 raise ValueError(str(x) + " is not a four-character string")
88 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
89 raise ValueError(str(x) + " does not contain valid hours and minutes")
94 if isinstance(x, str):
96 if len(x) != 3 or not all(isinstance(el, str) for el in x):
97 raise ValueError(str(x) + " is not a list of three strings")
102 if isinstance(x, str):
104 x = [int(el) for el in x]
105 if len(x) != 3 or not all(isinstance(el, int) for el in x):
106 raise ValueError(str(x) + " is not a list of three integers")
112 For each class corresponding to a message, automatically create
113 two nested classes `In` and `Out` that also inherit from their
114 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
115 copied to the `In` nested class under the name `KWARGS`, and
116 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
117 to the nested class `Out`. In addition, method `encode` is
118 defined in both classes equal to `in_encode()` and `out_encode()`
122 def __new__(cls, name, bases, attrs):
123 newcls = super().__new__(cls, name, bases, attrs)
124 newcls.In = super().__new__(
129 "KWARGS": newcls.IN_KWARGS,
130 "decode": newcls.in_decode,
131 "encode": newcls.in_encode,
134 newcls.Out = super().__new__(
139 "KWARGS": newcls.OUT_KWARGS,
140 "decode": newcls.out_decode,
141 "encode": newcls.out_encode,
148 NON = 0 # Incoming, no response needed
149 INL = 1 # Birirectional, use `inline_response()`
150 EXT = 2 # Birirectional, use external responder
153 class GPS303Pkt(metaclass=MetaPkt):
154 RESPOND = Respond.NON # Do not send anything back by default
159 def __init__(self, *args, **kwargs):
161 Construct the object _either_ from (length, payload),
162 _or_ from the values of individual fields
164 assert not args or (len(args) == 2 and not kwargs)
165 if args: # guaranteed to be two arguments at this point
166 self.length, self.payload = args
168 self.decode(self.length, self.payload)
170 raise DecodeError(e, obj=self)
172 for kw, typ, dfl in self.KWARGS:
173 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
176 self.__class__.__name__ + " stray kwargs " + str(kwargs)
180 return "{}({})".format(
181 self.__class__.__name__,
185 'bytes.fromhex("{}")'.format(v.hex())
186 if isinstance(v, bytes)
189 for k, v in self.__dict__.items()
190 if not k.startswith("_")
194 def in_decode(self, length, packet):
195 # Overridden in subclasses, otherwise do not decode payload
198 def out_decode(self, length, packet):
199 # Overridden in subclasses, otherwise do not decode payload
203 # Necessary to emulate terminal, which is not implemented
204 raise NotImplementedError(
205 self.__class__.__name__ + ".encode() not implemented"
208 def out_encode(self):
209 # Overridden in subclasses, otherwise make empty payload
214 payload = self.encode()
215 length = len(payload) + 1
216 return pack("BB", length, self.PROTO) + payload
219 class UNKNOWN(GPS303Pkt):
220 PROTO = 256 # > 255 is impossible in real packets
223 class LOGIN(GPS303Pkt):
225 RESPOND = Respond.INL
226 # Default response for ACK, can also respond with STOP_UPLOAD
228 def in_decode(self, length, payload):
229 self.imei = payload[:-1].hex()
230 self.ver = unpack("B", payload[-1:])[0]
234 class SUPERVISION(GPS303Pkt):
236 OUT_KWARGS = (("status", int, 1),)
238 def out_encode(self):
239 # 1: The device automatically answers Pickup effect
240 # 2: Automatically Answering Two-way Calls
241 # 3: Ring manually answer the two-way call
242 return pack("B", self.status)
245 class HEARTBEAT(GPS303Pkt):
247 RESPOND = Respond.INL
250 class _GPS_POSITIONING(GPS303Pkt):
251 RESPOND = Respond.INL
253 def in_decode(self, length, payload):
254 self.dtime = payload[:6]
255 if self.dtime == b"\0\0\0\0\0\0":
258 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
259 self.devtime = datetime(
260 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
262 self.gps_data_length = payload[6] >> 4
263 self.gps_nb_sat = payload[6] & 0x0F
264 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
265 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
266 flip_lon = bool(flags & 0b0000100000000000) # bit 4
267 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
268 self.heading = flags & 0b0000001111111111 # bits 6 - last
269 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
270 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
275 def out_encode(self):
276 tup = datetime.utcnow().timetuple()
277 ttup = (tup[0] % 100,) + tup[1:6]
278 return pack("BBBBBB", *ttup)
281 class GPS_POSITIONING(_GPS_POSITIONING):
285 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
289 class STATUS(GPS303Pkt):
291 RESPOND = Respond.EXT
292 OUT_KWARGS = (("upload_interval", int, 25),)
294 def in_decode(self, length, payload):
295 self.batt, self.ver, self.timezone, self.intvl = unpack(
299 self.signal = payload[4]
304 def out_encode(self): # Set interval in minutes
305 return pack("B", self.upload_interval)
308 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
312 class RESET(GPS303Pkt):
313 # Device sends when it got reset SMS
314 # Server can send to initiate factory reset
318 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
320 OUT_KWARGS = (("number", int, 3),)
322 def out_encode(self): # Number of whitelist entries
323 return pack("B", number)
326 class _WIFI_POSITIONING(GPS303Pkt):
327 def in_decode(self, length, payload):
328 self.dtime = payload[:6]
329 if self.dtime == b"\0\0\0\0\0\0":
332 self.devtime = datetime.strptime(
333 self.dtime.hex(), "%y%m%d%H%M%S"
334 ).astimezone(tz=timezone.utc)
336 for i in range(self.length): # length has special meaning here
337 slice = payload[6 + i * 7 : 13 + i * 7]
338 self.wifi_aps.append(
339 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
341 gsm_slice = payload[6 + self.length * 7 :]
342 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
344 for i in range(ncells):
345 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
346 locac, cellid, sigstr = unpack(
347 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
349 self.gsm_cells.append((locac, cellid, -sigstr))
353 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
355 RESPOND = Respond.INL
357 def out_encode(self):
358 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
361 class TIME(GPS303Pkt):
363 RESPOND = Respond.INL
365 def out_encode(self):
366 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
369 class PROHIBIT_LBS(GPS303Pkt):
371 OUT_KWARGS = (("status", int, 1),)
373 def out_encode(self): # Server sent, 0-off, 1-on
374 return pack("B", self.status)
377 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
380 # Data is in packed decimal
382 # 00/01 - Don't set / Set upload period
383 # HHMMHHMM - Upload period
385 # 00/01 - Don't set / Set time of boot
386 # HHMM - Time of boot
387 # 00/01 - Don't set / Set time of shutdown
388 # HHMM - Time of shutdown
389 def out_encode(self):
393 class _SET_PHONE(GPS303Pkt):
394 OUT_KWARGS = (("phone", str, ""),)
396 def out_encode(self):
397 return self.phone.encode()
400 class REMOTE_MONITOR_PHONE(_SET_PHONE):
404 class SOS_PHONE(_SET_PHONE):
408 class DAD_PHONE(_SET_PHONE):
412 class MOM_PHONE(_SET_PHONE):
416 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
420 class GPS_OFF_PERIOD(GPS303Pkt):
424 ("fm", hhmm, "0000"),
425 ("to", hhmm, "2359"),
428 def out_encode(self):
430 pack("B", self.onoff)
431 + bytes.fromhex(self.fm)
432 + bytes.fromhex(self.to)
436 class DND_PERIOD(GPS303Pkt):
441 ("fm1", hhmm, "0000"),
442 ("to1", hhmm, "2359"),
443 ("fm2", hhmm, "0000"),
444 ("to2", hhmm, "2359"),
447 def out_endode(self):
449 pack("B", self.onoff)
450 + pack("B", self.week)
451 + bytes.fromhex(self.fm1)
452 + bytes.fromhex(self.to1)
453 + bytes.fromhex(self.fm2)
454 + bytes.fromhex(self.to2)
458 class RESTART_SHUTDOWN(GPS303Pkt):
460 OUT_KWARGS = (("flag", int, 0),)
462 def out_encode(self):
465 return pack("B", self.flag)
468 class DEVICE(GPS303Pkt):
470 OUT_KWARGS = (("flag", int, 0),)
472 # 0 - Stop looking for equipment
473 # 1 - Start looking for equipment
474 def out_encode(self):
475 return pack("B", self.flag)
478 class ALARM_CLOCK(GPS303Pkt):
481 def out_encode(self):
482 # TODO implement parsing kwargs
483 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
485 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
489 class STOP_ALARM(GPS303Pkt):
492 def in_decode(self, length, payload):
493 self.flag = payload[0]
497 class SETUP(GPS303Pkt):
499 RESPOND = Respond.EXT
501 ("uploadintervalseconds", intx, 0x0300),
502 ("binaryswitch", intx, 0b00110001),
503 ("alarms", l3int, [0, 0, 0]),
504 ("dndtimeswitch", int, 0),
505 ("dndtimes", l3int, [0, 0, 0]),
506 ("gpstimeswitch", int, 0),
507 ("gpstimestart", int, 0),
508 ("gpstimestop", int, 0),
509 ("phonenumbers", l3str, ["", "", ""]),
512 def out_encode(self):
514 return pack("!I", x)[1:]
518 pack("!H", self.uploadintervalseconds),
519 pack("B", self.binaryswitch),
521 + [pack3b(el) for el in self.alarms]
523 pack("B", self.dndtimeswitch),
525 + [pack3b(el) for el in self.dndtimes]
527 pack("B", self.gpstimeswitch),
528 pack("!H", self.gpstimestart),
529 pack("!H", self.gpstimestop),
531 + [b";".join([el.encode() for el in self.phonenumbers])]
535 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
539 class RESTORE_PASSWORD(GPS303Pkt):
543 class WIFI_POSITIONING(_WIFI_POSITIONING):
545 RESPOND = Respond.EXT
546 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
548 def out_encode(self):
549 if self.latitude is None or self.longitude is None:
551 return "{:+#010.8g},{:+#010.8g}".format(
552 self.latitude, self.longitude
555 def out_decode(self, length, payload):
556 lat, lon = payload.decode().split(",")
557 self.latitude = float(lat)
558 self.longitude = float(lon)
561 class MANUAL_POSITIONING(GPS303Pkt):
564 def in_decode(self, length, payload):
565 self.flag = payload[0] if len(payload) > 0 else None
570 4: "LBS search > 3 times",
571 5: "Same LBS and WiFi data",
572 6: "LBS prohibited, WiFi absent",
573 7: "GPS spacing < 50 m",
574 }.get(self.flag, "Unknown")
578 class BATTERY_CHARGE(GPS303Pkt):
582 class CHARGER_CONNECTED(GPS303Pkt):
586 class CHARGER_DISCONNECTED(GPS303Pkt):
590 class VIBRATION_RECEIVED(GPS303Pkt):
594 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
596 RESPOND = Respond.EXT
597 OUT_KWARGS = (("interval", int, 10),)
599 def in_decode(self, length, payload):
600 self.interval = unpack("!H", payload[:2])
603 def out_encode(self):
604 return pack("!H", interval)
607 class SOS_ALARM(GPS303Pkt):
611 class UNKNOWN_B3(GPS303Pkt):
613 IN_KWARGS = (("asciidata", str, ""),)
615 def in_decode(self, length, payload):
616 self.asciidata = payload.decode()
620 # Build dicts protocol number -> class and class name -> protocol number
623 if True: # just to indent the code, sorry!
626 for name, cls in globals().items()
628 and issubclass(cls, GPS303Pkt)
629 and not name.startswith("_")
631 if hasattr(cls, "PROTO"):
632 CLASSES[cls.PROTO] = cls
633 PROTOS[cls.__name__] = cls.PROTO
636 def class_by_prefix(prefix):
639 for name, proto in PROTOS.items()
640 if name.upper().startswith(prefix.upper())
645 return CLASSES[proto]
648 def proto_by_name(name):
649 return PROTOS.get(name, -1)
652 def proto_of_message(packet):
653 return unpack("B", packet[1:2])[0]
656 def inline_response(packet):
657 proto = proto_of_message(packet)
660 if cls.RESPOND is Respond.INL:
661 return cls.Out().packed
665 def parse_message(packet, is_incoming=True):
666 """From a packet (without framing bytes) derive the XXX.In object"""
667 length, proto = unpack("BB", packet[:2])
669 if proto not in CLASSES:
670 cause = ValueError(f"Proto {proto} is unknown")
674 return CLASSES[proto].In(length, payload)
676 return CLASSES[proto].Out(length, payload)
677 except DecodeError as e:
680 retobj = UNKNOWN.In(length, payload)
682 retobj = UNKNOWN.Out(length, payload)
683 retobj.PROTO = proto # Override class attr with object attr