2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
122 "KWARGS": newcls.IN_KWARGS,
123 "decode": newcls.in_decode,
124 "encode": newcls.in_encode,
127 newcls.Out = super().__new__(
132 "KWARGS": newcls.OUT_KWARGS,
133 "decode": newcls.out_decode,
134 "encode": newcls.out_encode,
141 NON = 0 # Incoming, no response needed
142 INL = 1 # Birirectional, use `inline_response()`
143 EXT = 2 # Birirectional, use external responder
146 class GPS303Pkt(metaclass=MetaPkt):
147 RESPOND = Respond.NON # Do not send anything back by default
152 def __init__(self, *args, **kwargs):
154 Construct the object _either_ from (length, payload),
155 _or_ from the values of individual fields
157 assert not args or (len(args) == 2 and not kwargs)
158 if args: # guaranteed to be two arguments at this point
159 self.length, self.payload = args
160 self.decode(self.length, self.payload)
162 for kw, typ, dfl in self.KWARGS:
163 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
166 self.__class__.__name__ + " stray kwargs " + str(kwargs)
170 return "{}({})".format(
171 self.__class__.__name__,
175 'bytes.fromhex("{}")'.format(v.hex())
176 if isinstance(v, bytes)
179 for k, v in self.__dict__.items()
180 if not k.startswith("_")
184 def in_decode(self, length, packet):
185 # Overridden in subclasses, otherwise do not decode payload
188 def out_decode(self, length, packet):
189 # Overridden in subclasses, otherwise do not decode payload
193 # Necessary to emulate terminal, which is not implemented
194 raise NotImplementedError(
195 self.__class__.__name__ + ".encode() not implemented"
198 def out_encode(self):
199 # Overridden in subclasses, otherwise make empty payload
204 payload = self.encode()
205 length = len(payload) + 1
206 return pack("BB", length, self.PROTO) + payload
209 class UNKNOWN(GPS303Pkt):
210 PROTO = 256 # > 255 is impossible in real packets
213 class LOGIN(GPS303Pkt):
215 RESPOND = Respond.INL
216 # Default response for ACK, can also respond with STOP_UPLOAD
218 def in_decode(self, length, payload):
219 self.imei = payload[:-1].hex()
220 self.ver = unpack("B", payload[-1:])[0]
224 class SUPERVISION(GPS303Pkt):
226 OUT_KWARGS = (("status", int, 1),)
228 def out_encode(self):
229 # 1: The device automatically answers Pickup effect
230 # 2: Automatically Answering Two-way Calls
231 # 3: Ring manually answer the two-way call
232 return pack("B", self.status)
235 class HEARTBEAT(GPS303Pkt):
237 RESPOND = Respond.INL
240 class _GPS_POSITIONING(GPS303Pkt):
241 RESPOND = Respond.INL
243 def in_decode(self, length, payload):
244 self.dtime = payload[:6]
245 if self.dtime == b"\0\0\0\0\0\0":
248 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249 self.devtime = datetime(
250 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
252 self.gps_data_length = payload[6] >> 4
253 self.gps_nb_sat = payload[6] & 0x0F
254 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
256 flip_lon = bool(flags & 0b0000100000000000) # bit 4
257 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
258 self.heading = flags & 0b0000001111111111 # bits 6 - last
259 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
265 def out_encode(self):
266 tup = datetime.utcnow().timetuple()
267 ttup = (tup[0] % 100,) + tup[1:6]
268 return pack("BBBBBB", *ttup)
271 class GPS_POSITIONING(_GPS_POSITIONING):
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
279 class STATUS(GPS303Pkt):
281 RESPOND = Respond.EXT
282 OUT_KWARGS = (("upload_interval", int, 25),)
284 def in_decode(self, length, payload):
285 self.batt, self.ver, self.timezone, self.intvl = unpack(
289 self.signal = payload[4]
294 def out_encode(self): # Set interval in minutes
295 return pack("B", self.upload_interval)
298 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
302 class RESET(GPS303Pkt):
303 # Device sends when it got reset SMS
304 # Server can send to initiate factory reset
308 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
310 OUT_KWARGS = (("number", int, 3),)
312 def out_encode(self): # Number of whitelist entries
313 return pack("B", number)
316 class _WIFI_POSITIONING(GPS303Pkt):
317 def in_decode(self, length, payload):
318 self.dtime = payload[:6]
319 if self.dtime == b"\0\0\0\0\0\0":
322 self.devtime = datetime.strptime(
323 self.dtime.hex(), "%y%m%d%H%M%S"
324 ).astimezone(tz=timezone.utc)
326 for i in range(self.length): # length has special meaning here
327 slice = payload[6 + i * 7 : 13 + i * 7]
328 self.wifi_aps.append(
329 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
331 gsm_slice = payload[6 + self.length * 7 :]
332 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
334 for i in range(ncells):
335 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
336 locac, cellid, sigstr = unpack(
337 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
339 self.gsm_cells.append((locac, cellid, -sigstr))
343 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
345 RESPOND = Respond.INL
347 def out_encode(self):
348 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
351 class TIME(GPS303Pkt):
353 RESPOND = Respond.INL
355 def out_encode(self):
356 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
359 class PROHIBIT_LBS(GPS303Pkt):
361 OUT_KWARGS = (("status", int, 1),)
363 def out_encode(self): # Server sent, 0-off, 1-on
364 return pack("B", self.status)
367 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
370 # Data is in packed decimal
372 # 00/01 - Don't set / Set upload period
373 # HHMMHHMM - Upload period
375 # 00/01 - Don't set / Set time of boot
376 # HHMM - Time of boot
377 # 00/01 - Don't set / Set time of shutdown
378 # HHMM - Time of shutdown
379 def out_encode(self):
383 class _SET_PHONE(GPS303Pkt):
384 OUT_KWARGS = (("phone", str, ""),)
386 def out_encode(self):
387 return self.phone.encode()
390 class REMOTE_MONITOR_PHONE(_SET_PHONE):
394 class SOS_PHONE(_SET_PHONE):
398 class DAD_PHONE(_SET_PHONE):
402 class MOM_PHONE(_SET_PHONE):
406 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
410 class GPS_OFF_PERIOD(GPS303Pkt):
414 ("fm", hhmm, "0000"),
415 ("to", hhmm, "2359"),
418 def out_encode(self):
420 pack("B", self.onoff)
421 + bytes.fromhex(self.fm)
422 + bytes.fromhex(self.to)
426 class DND_PERIOD(GPS303Pkt):
431 ("fm1", hhmm, "0000"),
432 ("to1", hhmm, "2359"),
433 ("fm2", hhmm, "0000"),
434 ("to2", hhmm, "2359"),
437 def out_endode(self):
439 pack("B", self.onoff)
440 + pack("B", self.week)
441 + bytes.fromhex(self.fm1)
442 + bytes.fromhex(self.to1)
443 + bytes.fromhex(self.fm2)
444 + bytes.fromhex(self.to2)
448 class RESTART_SHUTDOWN(GPS303Pkt):
450 OUT_KWARGS = (("flag", int, 0),)
452 def out_encode(self):
455 return pack("B", self.flag)
458 class DEVICE(GPS303Pkt):
460 OUT_KWARGS = (("flag", int, 0),)
462 # 0 - Stop looking for equipment
463 # 1 - Start looking for equipment
464 def out_encode(self):
465 return pack("B", self.flag)
468 class ALARM_CLOCK(GPS303Pkt):
471 def out_encode(self):
472 # TODO implement parsing kwargs
473 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
475 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
479 class STOP_ALARM(GPS303Pkt):
482 def in_decode(self, length, payload):
483 self.flag = payload[0]
487 class SETUP(GPS303Pkt):
489 RESPOND = Respond.EXT
491 ("uploadintervalseconds", intx, 0x0300),
492 ("binaryswitch", intx, 0b00110001),
493 ("alarms", l3int, [0, 0, 0]),
494 ("dndtimeswitch", int, 0),
495 ("dndtimes", l3int, [0, 0, 0]),
496 ("gpstimeswitch", int, 0),
497 ("gpstimestart", int, 0),
498 ("gpstimestop", int, 0),
499 ("phonenumbers", l3str, ["", "", ""]),
502 def out_encode(self):
504 return pack("!I", x)[1:]
508 pack("!H", self.uploadintervalseconds),
509 pack("B", self.binaryswitch),
511 + [pack3b(el) for el in self.alarms]
513 pack("B", self.dndtimeswitch),
515 + [pack3b(el) for el in self.dndtimes]
517 pack("B", self.gpstimeswitch),
518 pack("!H", self.gpstimestart),
519 pack("!H", self.gpstimestop),
521 + [b";".join([el.encode() for el in self.phonenumbers])]
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
529 class RESTORE_PASSWORD(GPS303Pkt):
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
535 RESPOND = Respond.EXT
536 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
538 def out_encode(self):
539 if self.latitude is None or self.longitude is None:
541 return "{:+#010.8g},{:+#010.8g}".format(
542 self.latitude, self.longitude
545 def out_decode(self, length, payload):
546 lat, lon = payload.decode().split(",")
547 self.latitude = float(lat)
548 self.longitude = float(lon)
551 class MANUAL_POSITIONING(GPS303Pkt):
554 def in_decode(self, length, payload):
555 self.flag = payload[0] if len(payload) > 0 else None
560 4: "LBS search > 3 times",
561 5: "Same LBS and WiFi data",
562 6: "LBS prohibited, WiFi absent",
563 7: "GPS spacing < 50 m",
564 }.get(self.flag, "Unknown")
568 class BATTERY_CHARGE(GPS303Pkt):
572 class CHARGER_CONNECTED(GPS303Pkt):
576 class CHARGER_DISCONNECTED(GPS303Pkt):
580 class VIBRATION_RECEIVED(GPS303Pkt):
584 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
586 RESPOND = Respond.EXT
587 OUT_KWARGS = (("interval", int, 10),)
589 def in_decode(self, length, payload):
590 self.interval = unpack("!H", payload[:2])
593 def out_encode(self):
594 return pack("!H", interval)
597 class SOS_ALARM(GPS303Pkt):
601 class UNKNOWN_B3(GPS303Pkt):
603 IN_KWARGS = (("asciidata", str, ""),)
605 def in_decode(self, length, payload):
606 self.asciidata = payload.decode()
610 # Build dicts protocol number -> class and class name -> protocol number
613 if True: # just to indent the code, sorry!
616 for name, cls in globals().items()
618 and issubclass(cls, GPS303Pkt)
619 and not name.startswith("_")
621 if hasattr(cls, "PROTO"):
622 CLASSES[cls.PROTO] = cls
623 PROTOS[cls.__name__] = cls.PROTO
626 def class_by_prefix(prefix):
629 for name, proto in PROTOS.items()
630 if name.upper().startswith(prefix.upper())
635 return CLASSES[proto]
638 def proto_by_name(name):
639 return PROTOS.get(name, -1)
642 def proto_of_message(packet):
643 return unpack("B", packet[1:2])[0]
646 def inline_response(packet):
647 proto = proto_of_message(packet)
650 if cls.RESPOND is Respond.INL:
651 return cls.Out().packed
655 def parse_message(packet, is_incoming=True):
656 """From a packet (without framing bytes) derive the XXX.In object"""
657 length, proto = unpack("BB", packet[:2])
661 return CLASSES[proto].In(length, payload)
663 return CLASSES[proto].Out(length, payload)
665 retobj = UNKNOWN.In(length, payload)
666 retobj.PROTO = proto # Override class attr with object attr