2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
46 "SYNCHRONOUS_WHITELIST",
52 "CHARGER_DISCONNECTED",
54 "POSITION_UPLOAD_INTERVAL",
57 log = getLogger("gps303")
61 if isinstance(x, str):
67 """Check for the string that represents hours and minutes"""
68 if not isinstance(x, str) or len(x) != 4:
69 raise ValueError(str(x) + " is not a four-character string")
72 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
73 raise ValueError(str(x) + " does not contain valid hours and minutes")
78 if isinstance(x, str):
80 if len(x) != 3 or not all(isinstance(el, str) for el in x):
81 raise ValueError(str(x) + " is not a list of three strings")
86 if isinstance(x, str):
88 x = [int(el) for el in x]
89 if len(x) != 3 or not all(isinstance(el, int) for el in x):
90 raise ValueError(str(x) + " is not a list of three integers")
96 For each class corresponding to a message, automatically create
97 two nested classes `In` and `Out` that also inherit from their
98 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
99 copied to the `In` nested class under the name `KWARGS`, and
100 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
101 to the nested class `Out`. In addition, method `encode` is
102 defined in both classes equal to `in_encode()` and `out_encode()`
106 def __new__(cls, name, bases, attrs):
107 newcls = super().__new__(cls, name, bases, attrs)
108 newcls.In = super().__new__(
112 {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
114 newcls.Out = super().__new__(
118 {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
124 NON = 0 # Incoming, no response needed
125 INL = 1 # Birirectional, use `inline_response()`
126 EXT = 2 # Birirectional, use external responder
129 class GPS303Pkt(metaclass=MetaPkt):
130 RESPOND = Respond.NON # Do not send anything back by default
132 # Have these kwargs for now, TODO redo
133 IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
136 def __init__(self, *args, **kwargs):
137 assert len(args) == 0
138 for kw, typ, dfl in self.KWARGS:
139 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
141 print("KWARGS", self.KWARGS)
142 print("kwargs", kwargs)
144 self.__class__.__name__ + " stray kwargs " + str(kwargs)
148 return "{}({})".format(
149 self.__class__.__name__,
153 'bytes.fromhex("{}")'.format(v.hex())
154 if isinstance(v, bytes)
157 for k, v in self.__dict__.items()
158 if not k.startswith("_")
163 raise NotImplementedError(
164 self.__class__.__name__ + ".encode() not implemented"
167 def out_encode(self):
172 payload = self.encode()
173 length = len(payload) + 1
174 return pack("BB", length, self.PROTO) + payload
177 def from_packet(cls, length, payload):
178 return cls.In(payload=payload, length=length)
181 class UNKNOWN(GPS303Pkt):
182 PROTO = 256 # > 255 is impossible in real packets
185 class LOGIN(GPS303Pkt):
187 RESPOND = Respond.INL
188 # Default response for ACK, can also respond with STOP_UPLOAD
191 def from_packet(cls, length, payload):
192 self = super().from_packet(length, payload)
193 self.imei = payload[:-1].hex()
194 self.ver = unpack("B", payload[-1:])[0]
198 class SUPERVISION(GPS303Pkt):
200 OUT_KWARGS = (("status", int, 1),)
202 def out_encode(self):
203 # 1: The device automatically answers Pickup effect
204 # 2: Automatically Answering Two-way Calls
205 # 3: Ring manually answer the two-way call
206 return pack("B", self.status)
209 class HEARTBEAT(GPS303Pkt):
211 RESPOND = Respond.INL
214 class _GPS_POSITIONING(GPS303Pkt):
215 RESPOND = Respond.INL
218 def from_packet(cls, length, payload):
219 self = super().from_packet(length, payload)
220 self.dtime = payload[:6]
221 if self.dtime == b"\0\0\0\0\0\0":
224 self.devtime = datetime(
225 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
227 self.gps_data_length = payload[6] >> 4
228 self.gps_nb_sat = payload[6] & 0x0F
229 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
230 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
231 flip_lon = bool(flags & 0b0000100000000000) # bit 4
232 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
233 self.heading = flags & 0b0000001111111111 # bits 6 - last
234 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
235 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
240 def out_encode(self):
241 tup = datetime.utcnow().timetuple()
242 ttup = (tup[0] % 100,) + tup[1:6]
243 return pack("BBBBBB", *ttup)
246 class GPS_POSITIONING(_GPS_POSITIONING):
250 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
254 class STATUS(GPS303Pkt):
256 RESPOND = Respond.EXT
257 OUT_KWARGS = (("upload_interval", int, 25),)
260 def from_packet(cls, length, payload):
261 self = super().from_packet(length, payload)
262 if len(payload) == 5:
269 ) = unpack("BBBBB", payload)
270 elif len(payload) == 4:
271 self.batt, self.ver, self.timezone, self.intvl = unpack(
277 def out_encode(self): # Set interval in minutes
278 return cls.make_packet(pack("B", self.upload_interval))
281 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
283 RESPOND = Respond.INL
286 class RESET(GPS303Pkt):
287 # Device sends when it got reset SMS
288 # Server can send to initiate factory reset
292 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
294 OUT_KWARGS = (("number", int, 3),)
296 def out_encode(self): # Number of whitelist entries
297 return pack("B", number)
300 class _WIFI_POSITIONING(GPS303Pkt):
302 def from_packet(cls, length, payload):
303 self = super().from_packet(length, payload)
304 self.dtime = payload[:6]
305 if self.dtime == b"\0\0\0\0\0\0":
308 self.devtime = datetime.strptime(
309 self.dtime.hex(), "%y%m%d%H%M%S"
310 ).astimezone(tz=timezone.utc)
312 for i in range(self.length): # length has special meaning here
313 slice = payload[6 + i * 7 : 13 + i * 7]
314 self.wifi_aps.append(
315 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
317 gsm_slice = payload[6 + self.length * 7 :]
318 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
320 for i in range(ncells):
321 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
322 locac, cellid, sigstr = unpack(
323 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
325 self.gsm_cells.append((locac, cellid, -sigstr))
329 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
331 RESPOND = Respond.INL
333 def out_encode(self):
334 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
337 class TIME(GPS303Pkt):
339 RESPOND = Respond.INL
341 def out_encode(self):
342 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
345 class PROHIBIT_LBS(GPS303Pkt):
347 OUT_KWARGS = (("status", int, 1),)
349 def out_encode(self): # Server sent, 0-off, 1-on
350 return pack("B", self.status)
353 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
356 # Data is in packed decimal
358 # 00/01 - Don't set / Set upload period
359 # HHMMHHMM - Upload period
361 # 00/01 - Don't set / Set time of boot
362 # HHMM - Time of boot
363 # 00/01 - Don't set / Set time of shutdown
364 # HHMM - Time of shutdown
365 def out_encode(self):
369 class _SET_PHONE(GPS303Pkt):
370 OUT_KWARGS = (("phone", str, ""),)
372 def out_encode(self):
373 return self.phone.encode()
376 class REMOTE_MONITOR_PHONE(_SET_PHONE):
380 class SOS_PHONE(_SET_PHONE):
384 class DAD_PHONE(_SET_PHONE):
388 class MOM_PHONE(_SET_PHONE):
392 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
396 class GPS_OFF_PERIOD(GPS303Pkt):
400 ("fm", hhmm, "0000"),
401 ("to", hhmm, "2359"),
404 def out_encode(self):
406 pack("B", self.onoff)
407 + bytes.fromhex(self.fm)
408 + bytes.fromhex(self.to)
412 class DND_PERIOD(GPS303Pkt):
417 ("fm1", hhmm, "0000"),
418 ("to1", hhmm, "2359"),
419 ("fm2", hhmm, "0000"),
420 ("to2", hhmm, "2359"),
423 def out_endode(self):
425 pack("B", self.onoff)
426 + pack("B", self.week)
427 + bytes.fromhex(self.fm1)
428 + bytes.fromhex(self.to1)
429 + bytes.fromhex(self.fm2)
430 + bytes.fromhex(self.to2)
434 class RESTART_SHUTDOWN(GPS303Pkt):
436 OUT_KWARGS = (("flag", int, 2),)
438 def out_encode(self):
441 return pack("B", self.flag)
444 class DEVICE(GPS303Pkt):
446 OUT_KWARGS = (("flag", int, 0),)
448 # 0 - Stop looking for equipment
449 # 1 - Start looking for equipment
450 def out_encode(self):
451 return pack("B", self.flag)
454 class ALARM_CLOCK(GPS303Pkt):
457 def out_encode(self):
458 # TODO implement parsing kwargs
459 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
461 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
465 class STOP_ALARM(GPS303Pkt):
469 def from_packet(cls, length, payload):
470 self = super().from_packet(length, payload)
471 self.flag = payload[0]
475 class SETUP(GPS303Pkt):
477 RESPOND = Respond.EXT
479 ("uploadintervalseconds", intx, 0x0300),
480 ("binaryswitch", intx, 0b00110001),
481 ("alarms", l3int, [0, 0, 0]),
482 ("dndtimeswitch", int, 0),
483 ("dndtimes", l3int, [0, 0, 0]),
484 ("gpstimeswitch", int, 0),
485 ("gpstimestart", int, 0),
486 ("gpstimestop", int, 0),
487 ("phonenumbers", l3str, ["", "", ""]),
490 def out_encode(self):
492 return pack("!I", x)[1:]
496 pack("!H", self.uploadintervalseconds),
497 pack("B", self.binaryswitch),
499 + [pack3b(el) for el in self.alarms]
501 pack("B", self.dndtimeswitch),
503 + [pack3b(el) for el in self.dndtimes]
505 pack("B", self.gpstimeswitch),
506 pack("!H", self.gpstimestart),
507 pack("!H", self.gpstimestop),
509 + [b";".join([el.encode() for el in self.phonenumbers])]
513 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
517 class RESTORE_PASSWORD(GPS303Pkt):
521 class WIFI_POSITIONING(_WIFI_POSITIONING):
523 RESPOND = Respond.EXT
524 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
526 def out_encode(self):
527 if self.lat is None or self.lon is None:
529 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
532 class MANUAL_POSITIONING(GPS303Pkt):
536 def from_packet(cls, length, payload):
537 self = super().from_packet(length, payload)
538 self.flag = payload[0] if len(payload) > 0 else None
543 4: "LBS search > 3 times",
544 5: "Same LBS and WiFi data",
545 6: "LBS prohibited, WiFi absent",
546 7: "GPS spacing < 50 m",
547 }.get(self.flag, "Unknown")
551 class BATTERY_CHARGE(GPS303Pkt):
555 class CHARGER_CONNECTED(GPS303Pkt):
559 class CHARGER_DISCONNECTED(GPS303Pkt):
563 class VIBRATION_RECEIVED(GPS303Pkt):
567 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
569 RESPOND = Respond.EXT
570 OUT_KWARGS = (("interval", int, 10),)
573 def from_packet(cls, length, payload):
574 self = super().from_packet(length, payload)
575 self.interval = unpack("!H", payload[:2])
578 def out_encode(self):
579 return pack("!H", interval)
582 class SOS_ALARM(GPS303Pkt):
586 # Build dicts protocol number -> class and class name -> protocol number
589 if True: # just to indent the code, sorry!
592 for name, cls in globals().items()
594 and issubclass(cls, GPS303Pkt)
595 and not name.startswith("_")
597 if hasattr(cls, "PROTO"):
598 CLASSES[cls.PROTO] = cls
599 PROTOS[cls.__name__] = cls.PROTO
602 def class_by_prefix(prefix):
605 for name, proto in PROTOS.items()
606 if name.upper().startswith(prefix.upper())
611 return CLASSES[proto]
614 def proto_by_name(name):
615 return PROTOS.get(name, -1)
618 def proto_of_message(packet):
619 return unpack("B", packet[1:2])[0]
622 def inline_response(packet):
623 proto = proto_of_message(packet)
626 if cls.RESPOND is Respond.INL:
627 return cls.Out().packed
631 def make_object(length, proto, payload):
633 return CLASSES[proto].from_packet(length, payload)
635 retobj = UNKNOWN.from_packet(length, payload)
636 retobj.PROTO = proto # Override class attr with object attr
640 def parse_message(packet):
641 length, proto = unpack("BB", packet[:2])
643 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
645 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
647 and len(payload) + adjust != length
650 "With proto %d length is %d but payload length is %d+%d",
656 return make_object(length, proto, payload)