2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
122 "KWARGS": newcls.IN_KWARGS,
123 "decode": newcls.in_decode,
124 "encode": newcls.in_encode,
127 newcls.Out = super().__new__(
132 "KWARGS": newcls.OUT_KWARGS,
133 "decode": newcls.out_decode,
134 "encode": newcls.out_encode,
141 NON = 0 # Incoming, no response needed
142 INL = 1 # Birirectional, use `inline_response()`
143 EXT = 2 # Birirectional, use external responder
146 class GPS303Pkt(metaclass=MetaPkt):
147 RESPOND = Respond.NON # Do not send anything back by default
152 def __init__(self, *args, **kwargs):
153 assert len(args) == 0
154 for kw, typ, dfl in self.KWARGS:
155 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
157 print("KWARGS", self.KWARGS)
158 print("kwargs", kwargs)
160 self.__class__.__name__ + " stray kwargs " + str(kwargs)
164 return "{}({})".format(
165 self.__class__.__name__,
169 'bytes.fromhex("{}")'.format(v.hex())
170 if isinstance(v, bytes)
173 for k, v in self.__dict__.items()
174 if not k.startswith("_")
178 def in_decode(self, length, packet):
181 def out_decode(self, length, packet):
182 raise NotImplementedError(
183 self.__class__.__name__ + ".decode() not implemented"
187 raise NotImplementedError(
188 self.__class__.__name__ + ".encode() not implemented"
191 def out_encode(self):
196 payload = self.encode()
197 length = len(payload) + 1
198 return pack("BB", length, self.PROTO) + payload
201 def from_packet(cls, length, payload):
204 self.payload = payload
205 self.decode(length, payload)
209 class UNKNOWN(GPS303Pkt):
210 PROTO = 256 # > 255 is impossible in real packets
213 class LOGIN(GPS303Pkt):
215 RESPOND = Respond.INL
216 # Default response for ACK, can also respond with STOP_UPLOAD
218 def in_decode(self, length, payload):
219 self.imei = payload[:-1].hex()
220 self.ver = unpack("B", payload[-1:])[0]
224 class SUPERVISION(GPS303Pkt):
226 OUT_KWARGS = (("status", int, 1),)
228 def out_encode(self):
229 # 1: The device automatically answers Pickup effect
230 # 2: Automatically Answering Two-way Calls
231 # 3: Ring manually answer the two-way call
232 return pack("B", self.status)
235 class HEARTBEAT(GPS303Pkt):
237 RESPOND = Respond.INL
240 class _GPS_POSITIONING(GPS303Pkt):
241 RESPOND = Respond.INL
243 def in_decode(self, length, payload):
244 self.dtime = payload[:6]
245 if self.dtime == b"\0\0\0\0\0\0":
248 self.devtime = datetime(
249 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
251 self.gps_data_length = payload[6] >> 4
252 self.gps_nb_sat = payload[6] & 0x0F
253 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
254 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
255 flip_lon = bool(flags & 0b0000100000000000) # bit 4
256 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
257 self.heading = flags & 0b0000001111111111 # bits 6 - last
258 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
259 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
264 def out_encode(self):
265 tup = datetime.utcnow().timetuple()
266 ttup = (tup[0] % 100,) + tup[1:6]
267 return pack("BBBBBB", *ttup)
270 class GPS_POSITIONING(_GPS_POSITIONING):
274 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
278 class STATUS(GPS303Pkt):
280 RESPOND = Respond.EXT
281 OUT_KWARGS = (("upload_interval", int, 25),)
283 def in_decode(self, length, payload):
284 self.batt, self.ver, self.timezone, self.intvl = unpack(
288 self.signal = payload[4]
293 def out_encode(self): # Set interval in minutes
294 return pack("B", self.upload_interval)
297 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
299 RESPOND = Respond.INL
302 class RESET(GPS303Pkt):
303 # Device sends when it got reset SMS
304 # Server can send to initiate factory reset
308 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
310 OUT_KWARGS = (("number", int, 3),)
312 def out_encode(self): # Number of whitelist entries
313 return pack("B", number)
316 class _WIFI_POSITIONING(GPS303Pkt):
317 def in_decode(self, length, payload):
318 self.dtime = payload[:6]
319 if self.dtime == b"\0\0\0\0\0\0":
322 self.devtime = datetime.strptime(
323 self.dtime.hex(), "%y%m%d%H%M%S"
324 ).astimezone(tz=timezone.utc)
326 for i in range(self.length): # length has special meaning here
327 slice = payload[6 + i * 7 : 13 + i * 7]
328 self.wifi_aps.append(
329 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
331 gsm_slice = payload[6 + self.length * 7 :]
332 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
334 for i in range(ncells):
335 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
336 locac, cellid, sigstr = unpack(
337 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
339 self.gsm_cells.append((locac, cellid, -sigstr))
343 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
345 RESPOND = Respond.INL
347 def out_encode(self):
348 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
351 class TIME(GPS303Pkt):
353 RESPOND = Respond.INL
355 def out_encode(self):
356 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
359 class PROHIBIT_LBS(GPS303Pkt):
361 OUT_KWARGS = (("status", int, 1),)
363 def out_encode(self): # Server sent, 0-off, 1-on
364 return pack("B", self.status)
367 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
370 # Data is in packed decimal
372 # 00/01 - Don't set / Set upload period
373 # HHMMHHMM - Upload period
375 # 00/01 - Don't set / Set time of boot
376 # HHMM - Time of boot
377 # 00/01 - Don't set / Set time of shutdown
378 # HHMM - Time of shutdown
379 def out_encode(self):
383 class _SET_PHONE(GPS303Pkt):
384 OUT_KWARGS = (("phone", str, ""),)
386 def out_encode(self):
387 return self.phone.encode()
390 class REMOTE_MONITOR_PHONE(_SET_PHONE):
394 class SOS_PHONE(_SET_PHONE):
398 class DAD_PHONE(_SET_PHONE):
402 class MOM_PHONE(_SET_PHONE):
406 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
410 class GPS_OFF_PERIOD(GPS303Pkt):
414 ("fm", hhmm, "0000"),
415 ("to", hhmm, "2359"),
418 def out_encode(self):
420 pack("B", self.onoff)
421 + bytes.fromhex(self.fm)
422 + bytes.fromhex(self.to)
426 class DND_PERIOD(GPS303Pkt):
431 ("fm1", hhmm, "0000"),
432 ("to1", hhmm, "2359"),
433 ("fm2", hhmm, "0000"),
434 ("to2", hhmm, "2359"),
437 def out_endode(self):
439 pack("B", self.onoff)
440 + pack("B", self.week)
441 + bytes.fromhex(self.fm1)
442 + bytes.fromhex(self.to1)
443 + bytes.fromhex(self.fm2)
444 + bytes.fromhex(self.to2)
448 class RESTART_SHUTDOWN(GPS303Pkt):
450 OUT_KWARGS = (("flag", int, 0),)
452 def out_encode(self):
455 return pack("B", self.flag)
458 class DEVICE(GPS303Pkt):
460 OUT_KWARGS = (("flag", int, 0),)
462 # 0 - Stop looking for equipment
463 # 1 - Start looking for equipment
464 def out_encode(self):
465 return pack("B", self.flag)
468 class ALARM_CLOCK(GPS303Pkt):
471 def out_encode(self):
472 # TODO implement parsing kwargs
473 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
475 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
479 class STOP_ALARM(GPS303Pkt):
482 def in_decode(self, length, payload):
483 self.flag = payload[0]
487 class SETUP(GPS303Pkt):
489 RESPOND = Respond.EXT
491 ("uploadintervalseconds", intx, 0x0300),
492 ("binaryswitch", intx, 0b00110001),
493 ("alarms", l3int, [0, 0, 0]),
494 ("dndtimeswitch", int, 0),
495 ("dndtimes", l3int, [0, 0, 0]),
496 ("gpstimeswitch", int, 0),
497 ("gpstimestart", int, 0),
498 ("gpstimestop", int, 0),
499 ("phonenumbers", l3str, ["", "", ""]),
502 def out_encode(self):
504 return pack("!I", x)[1:]
508 pack("!H", self.uploadintervalseconds),
509 pack("B", self.binaryswitch),
511 + [pack3b(el) for el in self.alarms]
513 pack("B", self.dndtimeswitch),
515 + [pack3b(el) for el in self.dndtimes]
517 pack("B", self.gpstimeswitch),
518 pack("!H", self.gpstimestart),
519 pack("!H", self.gpstimestop),
521 + [b";".join([el.encode() for el in self.phonenumbers])]
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
529 class RESTORE_PASSWORD(GPS303Pkt):
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
535 RESPOND = Respond.EXT
536 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
538 def out_encode(self):
539 if self.lat is None or self.lon is None:
541 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
544 class MANUAL_POSITIONING(GPS303Pkt):
547 def in_decode(self, length, payload):
548 self.flag = payload[0] if len(payload) > 0 else None
553 4: "LBS search > 3 times",
554 5: "Same LBS and WiFi data",
555 6: "LBS prohibited, WiFi absent",
556 7: "GPS spacing < 50 m",
557 }.get(self.flag, "Unknown")
561 class BATTERY_CHARGE(GPS303Pkt):
565 class CHARGER_CONNECTED(GPS303Pkt):
569 class CHARGER_DISCONNECTED(GPS303Pkt):
573 class VIBRATION_RECEIVED(GPS303Pkt):
577 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
579 RESPOND = Respond.EXT
580 OUT_KWARGS = (("interval", int, 10),)
582 def in_decode(self, length, payload):
583 self.interval = unpack("!H", payload[:2])
586 def out_encode(self):
587 return pack("!H", interval)
590 class SOS_ALARM(GPS303Pkt):
594 class UNKNOWN_B3(GPS303Pkt):
596 IN_KWARGS = (("asciidata", str, ""),)
598 def in_decode(self, length, payload):
599 self.asciidata = payload.decode()
603 # Build dicts protocol number -> class and class name -> protocol number
606 if True: # just to indent the code, sorry!
609 for name, cls in globals().items()
611 and issubclass(cls, GPS303Pkt)
612 and not name.startswith("_")
614 if hasattr(cls, "PROTO"):
615 CLASSES[cls.PROTO] = cls
616 PROTOS[cls.__name__] = cls.PROTO
619 def class_by_prefix(prefix):
622 for name, proto in PROTOS.items()
623 if name.upper().startswith(prefix.upper())
628 return CLASSES[proto]
631 def proto_by_name(name):
632 return PROTOS.get(name, -1)
635 def proto_of_message(packet):
636 return unpack("B", packet[1:2])[0]
639 def inline_response(packet):
640 proto = proto_of_message(packet)
643 if cls.RESPOND is Respond.INL:
644 return cls.Out().packed
648 def parse_message(packet):
649 """From a packet (without framing bytes) derive the XXX.In object"""
650 length, proto = unpack("BB", packet[:2])
653 return CLASSES[proto].from_packet(length, payload)
655 retobj = UNKNOWN.from_packet(length, payload)
656 retobj.PROTO = proto # Override class attr with object attr