2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
122 "KWARGS": newcls.IN_KWARGS,
123 "decode": newcls.in_decode,
124 "encode": newcls.in_encode,
127 newcls.Out = super().__new__(
132 "KWARGS": newcls.OUT_KWARGS,
133 "decode": newcls.out_decode,
134 "encode": newcls.out_encode,
141 NON = 0 # Incoming, no response needed
142 INL = 1 # Birirectional, use `inline_response()`
143 EXT = 2 # Birirectional, use external responder
146 class GPS303Pkt(metaclass=MetaPkt):
147 RESPOND = Respond.NON # Do not send anything back by default
152 def __init__(self, *args, **kwargs):
154 Construct the object _either_ from (length, payload),
155 _or_ from the values of individual fields
157 assert not args or (len(args) == 2 and not kwargs)
158 if args: # guaranteed to be two arguments at this point
159 self.length, self.payload = args
160 self.decode(self.length, self.payload)
162 for kw, typ, dfl in self.KWARGS:
163 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
166 self.__class__.__name__ + " stray kwargs " + str(kwargs)
170 return "{}({})".format(
171 self.__class__.__name__,
175 'bytes.fromhex("{}")'.format(v.hex())
176 if isinstance(v, bytes)
179 for k, v in self.__dict__.items()
180 if not k.startswith("_")
184 def in_decode(self, length, packet):
185 # Overridden in subclasses, otherwise do not decode payload
188 def out_decode(self, length, packet):
189 # Necessary to emulate terminal, which is not implemented
190 raise NotImplementedError(
191 self.__class__.__name__ + ".decode() not implemented"
195 # Necessary to emulate terminal, which is not implemented
196 raise NotImplementedError(
197 self.__class__.__name__ + ".encode() not implemented"
200 def out_encode(self):
201 # Overridden in subclasses, otherwise make empty payload
206 payload = self.encode()
207 length = len(payload) + 1
208 return pack("BB", length, self.PROTO) + payload
211 class UNKNOWN(GPS303Pkt):
212 PROTO = 256 # > 255 is impossible in real packets
215 class LOGIN(GPS303Pkt):
217 RESPOND = Respond.INL
218 # Default response for ACK, can also respond with STOP_UPLOAD
220 def in_decode(self, length, payload):
221 self.imei = payload[:-1].hex()
222 self.ver = unpack("B", payload[-1:])[0]
226 class SUPERVISION(GPS303Pkt):
228 OUT_KWARGS = (("status", int, 1),)
230 def out_encode(self):
231 # 1: The device automatically answers Pickup effect
232 # 2: Automatically Answering Two-way Calls
233 # 3: Ring manually answer the two-way call
234 return pack("B", self.status)
237 class HEARTBEAT(GPS303Pkt):
239 RESPOND = Respond.INL
242 class _GPS_POSITIONING(GPS303Pkt):
243 RESPOND = Respond.INL
245 def in_decode(self, length, payload):
246 self.dtime = payload[:6]
247 if self.dtime == b"\0\0\0\0\0\0":
250 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
251 self.devtime = datetime(
252 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
254 self.gps_data_length = payload[6] >> 4
255 self.gps_nb_sat = payload[6] & 0x0F
256 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
257 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
258 flip_lon = bool(flags & 0b0000100000000000) # bit 4
259 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
260 self.heading = flags & 0b0000001111111111 # bits 6 - last
261 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
262 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
267 def out_encode(self):
268 tup = datetime.utcnow().timetuple()
269 ttup = (tup[0] % 100,) + tup[1:6]
270 return pack("BBBBBB", *ttup)
273 class GPS_POSITIONING(_GPS_POSITIONING):
277 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
281 class STATUS(GPS303Pkt):
283 RESPOND = Respond.EXT
284 OUT_KWARGS = (("upload_interval", int, 25),)
286 def in_decode(self, length, payload):
287 self.batt, self.ver, self.timezone, self.intvl = unpack(
291 self.signal = payload[4]
296 def out_encode(self): # Set interval in minutes
297 return pack("B", self.upload_interval)
300 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
302 RESPOND = Respond.INL
305 class RESET(GPS303Pkt):
306 # Device sends when it got reset SMS
307 # Server can send to initiate factory reset
311 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
313 OUT_KWARGS = (("number", int, 3),)
315 def out_encode(self): # Number of whitelist entries
316 return pack("B", number)
319 class _WIFI_POSITIONING(GPS303Pkt):
320 def in_decode(self, length, payload):
321 self.dtime = payload[:6]
322 if self.dtime == b"\0\0\0\0\0\0":
325 self.devtime = datetime.strptime(
326 self.dtime.hex(), "%y%m%d%H%M%S"
327 ).astimezone(tz=timezone.utc)
329 for i in range(self.length): # length has special meaning here
330 slice = payload[6 + i * 7 : 13 + i * 7]
331 self.wifi_aps.append(
332 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
334 gsm_slice = payload[6 + self.length * 7 :]
335 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
337 for i in range(ncells):
338 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
339 locac, cellid, sigstr = unpack(
340 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
342 self.gsm_cells.append((locac, cellid, -sigstr))
346 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
348 RESPOND = Respond.INL
350 def out_encode(self):
351 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
354 class TIME(GPS303Pkt):
356 RESPOND = Respond.INL
358 def out_encode(self):
359 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
362 class PROHIBIT_LBS(GPS303Pkt):
364 OUT_KWARGS = (("status", int, 1),)
366 def out_encode(self): # Server sent, 0-off, 1-on
367 return pack("B", self.status)
370 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
373 # Data is in packed decimal
375 # 00/01 - Don't set / Set upload period
376 # HHMMHHMM - Upload period
378 # 00/01 - Don't set / Set time of boot
379 # HHMM - Time of boot
380 # 00/01 - Don't set / Set time of shutdown
381 # HHMM - Time of shutdown
382 def out_encode(self):
386 class _SET_PHONE(GPS303Pkt):
387 OUT_KWARGS = (("phone", str, ""),)
389 def out_encode(self):
390 return self.phone.encode()
393 class REMOTE_MONITOR_PHONE(_SET_PHONE):
397 class SOS_PHONE(_SET_PHONE):
401 class DAD_PHONE(_SET_PHONE):
405 class MOM_PHONE(_SET_PHONE):
409 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
413 class GPS_OFF_PERIOD(GPS303Pkt):
417 ("fm", hhmm, "0000"),
418 ("to", hhmm, "2359"),
421 def out_encode(self):
423 pack("B", self.onoff)
424 + bytes.fromhex(self.fm)
425 + bytes.fromhex(self.to)
429 class DND_PERIOD(GPS303Pkt):
434 ("fm1", hhmm, "0000"),
435 ("to1", hhmm, "2359"),
436 ("fm2", hhmm, "0000"),
437 ("to2", hhmm, "2359"),
440 def out_endode(self):
442 pack("B", self.onoff)
443 + pack("B", self.week)
444 + bytes.fromhex(self.fm1)
445 + bytes.fromhex(self.to1)
446 + bytes.fromhex(self.fm2)
447 + bytes.fromhex(self.to2)
451 class RESTART_SHUTDOWN(GPS303Pkt):
453 OUT_KWARGS = (("flag", int, 0),)
455 def out_encode(self):
458 return pack("B", self.flag)
461 class DEVICE(GPS303Pkt):
463 OUT_KWARGS = (("flag", int, 0),)
465 # 0 - Stop looking for equipment
466 # 1 - Start looking for equipment
467 def out_encode(self):
468 return pack("B", self.flag)
471 class ALARM_CLOCK(GPS303Pkt):
474 def out_encode(self):
475 # TODO implement parsing kwargs
476 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
478 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
482 class STOP_ALARM(GPS303Pkt):
485 def in_decode(self, length, payload):
486 self.flag = payload[0]
490 class SETUP(GPS303Pkt):
492 RESPOND = Respond.EXT
494 ("uploadintervalseconds", intx, 0x0300),
495 ("binaryswitch", intx, 0b00110001),
496 ("alarms", l3int, [0, 0, 0]),
497 ("dndtimeswitch", int, 0),
498 ("dndtimes", l3int, [0, 0, 0]),
499 ("gpstimeswitch", int, 0),
500 ("gpstimestart", int, 0),
501 ("gpstimestop", int, 0),
502 ("phonenumbers", l3str, ["", "", ""]),
505 def out_encode(self):
507 return pack("!I", x)[1:]
511 pack("!H", self.uploadintervalseconds),
512 pack("B", self.binaryswitch),
514 + [pack3b(el) for el in self.alarms]
516 pack("B", self.dndtimeswitch),
518 + [pack3b(el) for el in self.dndtimes]
520 pack("B", self.gpstimeswitch),
521 pack("!H", self.gpstimestart),
522 pack("!H", self.gpstimestop),
524 + [b";".join([el.encode() for el in self.phonenumbers])]
528 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
532 class RESTORE_PASSWORD(GPS303Pkt):
536 class WIFI_POSITIONING(_WIFI_POSITIONING):
538 RESPOND = Respond.EXT
539 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
541 def out_encode(self):
542 if self.lat is None or self.lon is None:
544 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
547 class MANUAL_POSITIONING(GPS303Pkt):
550 def in_decode(self, length, payload):
551 self.flag = payload[0] if len(payload) > 0 else None
556 4: "LBS search > 3 times",
557 5: "Same LBS and WiFi data",
558 6: "LBS prohibited, WiFi absent",
559 7: "GPS spacing < 50 m",
560 }.get(self.flag, "Unknown")
564 class BATTERY_CHARGE(GPS303Pkt):
568 class CHARGER_CONNECTED(GPS303Pkt):
572 class CHARGER_DISCONNECTED(GPS303Pkt):
576 class VIBRATION_RECEIVED(GPS303Pkt):
580 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
582 RESPOND = Respond.EXT
583 OUT_KWARGS = (("interval", int, 10),)
585 def in_decode(self, length, payload):
586 self.interval = unpack("!H", payload[:2])
589 def out_encode(self):
590 return pack("!H", interval)
593 class SOS_ALARM(GPS303Pkt):
597 class UNKNOWN_B3(GPS303Pkt):
599 IN_KWARGS = (("asciidata", str, ""),)
601 def in_decode(self, length, payload):
602 self.asciidata = payload.decode()
606 # Build dicts protocol number -> class and class name -> protocol number
609 if True: # just to indent the code, sorry!
612 for name, cls in globals().items()
614 and issubclass(cls, GPS303Pkt)
615 and not name.startswith("_")
617 if hasattr(cls, "PROTO"):
618 CLASSES[cls.PROTO] = cls
619 PROTOS[cls.__name__] = cls.PROTO
622 def class_by_prefix(prefix):
625 for name, proto in PROTOS.items()
626 if name.upper().startswith(prefix.upper())
631 return CLASSES[proto]
634 def proto_by_name(name):
635 return PROTOS.get(name, -1)
638 def proto_of_message(packet):
639 return unpack("B", packet[1:2])[0]
642 def inline_response(packet):
643 proto = proto_of_message(packet)
646 if cls.RESPOND is Respond.INL:
647 return cls.Out().packed
651 def parse_message(packet):
652 """From a packet (without framing bytes) derive the XXX.In object"""
653 length, proto = unpack("BB", packet[:2])
656 return CLASSES[proto].In(length, payload)
658 retobj = UNKNOWN.In(length, payload)
659 retobj.PROTO = proto # Override class attr with object attr