2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
44 "GPS_LBS_SWITCH_TIMES",
45 "REMOTE_MONITOR_PHONE",
57 "SYNCHRONOUS_WHITELIST",
63 "CHARGER_DISCONNECTED",
65 "POSITION_UPLOAD_INTERVAL",
69 log = getLogger("gps303")
73 if isinstance(x, str):
79 """Check for the string that represents hours and minutes"""
80 if not isinstance(x, str) or len(x) != 4:
81 raise ValueError(str(x) + " is not a four-character string")
84 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
85 raise ValueError(str(x) + " does not contain valid hours and minutes")
90 if isinstance(x, str):
92 if len(x) != 3 or not all(isinstance(el, str) for el in x):
93 raise ValueError(str(x) + " is not a list of three strings")
98 if isinstance(x, str):
100 x = [int(el) for el in x]
101 if len(x) != 3 or not all(isinstance(el, int) for el in x):
102 raise ValueError(str(x) + " is not a list of three integers")
108 For each class corresponding to a message, automatically create
109 two nested classes `In` and `Out` that also inherit from their
110 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
111 copied to the `In` nested class under the name `KWARGS`, and
112 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
113 to the nested class `Out`. In addition, method `encode` is
114 defined in both classes equal to `in_encode()` and `out_encode()`
118 def __new__(cls, name, bases, attrs):
119 newcls = super().__new__(cls, name, bases, attrs)
120 newcls.In = super().__new__(
124 {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
126 newcls.Out = super().__new__(
130 {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
136 NON = 0 # Incoming, no response needed
137 INL = 1 # Birirectional, use `inline_response()`
138 EXT = 2 # Birirectional, use external responder
141 class GPS303Pkt(metaclass=MetaPkt):
142 RESPOND = Respond.NON # Do not send anything back by default
144 # Have these kwargs for now, TODO redo
145 IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
148 def __init__(self, *args, **kwargs):
149 assert len(args) == 0
150 for kw, typ, dfl in self.KWARGS:
151 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
153 print("KWARGS", self.KWARGS)
154 print("kwargs", kwargs)
156 self.__class__.__name__ + " stray kwargs " + str(kwargs)
160 return "{}({})".format(
161 self.__class__.__name__,
165 'bytes.fromhex("{}")'.format(v.hex())
166 if isinstance(v, bytes)
169 for k, v in self.__dict__.items()
170 if not k.startswith("_")
175 raise NotImplementedError(
176 self.__class__.__name__ + ".encode() not implemented"
179 def out_encode(self):
184 payload = self.encode()
185 length = len(payload) + 1
186 return pack("BB", length, self.PROTO) + payload
189 def from_packet(cls, length, payload):
190 return cls.In(payload=payload, length=length)
193 class UNKNOWN(GPS303Pkt):
194 PROTO = 256 # > 255 is impossible in real packets
197 class LOGIN(GPS303Pkt):
199 RESPOND = Respond.INL
200 # Default response for ACK, can also respond with STOP_UPLOAD
203 def from_packet(cls, length, payload):
204 self = super().from_packet(length, payload)
205 self.imei = payload[:-1].hex()
206 self.ver = unpack("B", payload[-1:])[0]
210 class SUPERVISION(GPS303Pkt):
212 OUT_KWARGS = (("status", int, 1),)
214 def out_encode(self):
215 # 1: The device automatically answers Pickup effect
216 # 2: Automatically Answering Two-way Calls
217 # 3: Ring manually answer the two-way call
218 return pack("B", self.status)
221 class HEARTBEAT(GPS303Pkt):
223 RESPOND = Respond.INL
226 class _GPS_POSITIONING(GPS303Pkt):
227 RESPOND = Respond.INL
230 def from_packet(cls, length, payload):
231 self = super().from_packet(length, payload)
232 self.dtime = payload[:6]
233 if self.dtime == b"\0\0\0\0\0\0":
236 self.devtime = datetime(
237 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
239 self.gps_data_length = payload[6] >> 4
240 self.gps_nb_sat = payload[6] & 0x0F
241 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
242 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
243 flip_lon = bool(flags & 0b0000100000000000) # bit 4
244 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
245 self.heading = flags & 0b0000001111111111 # bits 6 - last
246 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
247 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
252 def out_encode(self):
253 tup = datetime.utcnow().timetuple()
254 ttup = (tup[0] % 100,) + tup[1:6]
255 return pack("BBBBBB", *ttup)
258 class GPS_POSITIONING(_GPS_POSITIONING):
262 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
266 class STATUS(GPS303Pkt):
268 RESPOND = Respond.EXT
269 OUT_KWARGS = (("upload_interval", int, 25),)
272 def from_packet(cls, length, payload):
273 self = super().from_packet(length, payload)
274 if len(payload) == 5:
281 ) = unpack("BBBBB", payload)
282 elif len(payload) == 4:
283 self.batt, self.ver, self.timezone, self.intvl = unpack(
289 def out_encode(self): # Set interval in minutes
290 return pack("B", self.upload_interval)
293 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
295 RESPOND = Respond.INL
298 class RESET(GPS303Pkt):
299 # Device sends when it got reset SMS
300 # Server can send to initiate factory reset
304 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
306 OUT_KWARGS = (("number", int, 3),)
308 def out_encode(self): # Number of whitelist entries
309 return pack("B", number)
312 class _WIFI_POSITIONING(GPS303Pkt):
314 def from_packet(cls, length, payload):
315 self = super().from_packet(length, payload)
316 self.dtime = payload[:6]
317 if self.dtime == b"\0\0\0\0\0\0":
320 self.devtime = datetime.strptime(
321 self.dtime.hex(), "%y%m%d%H%M%S"
322 ).astimezone(tz=timezone.utc)
324 for i in range(self.length): # length has special meaning here
325 slice = payload[6 + i * 7 : 13 + i * 7]
326 self.wifi_aps.append(
327 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
329 gsm_slice = payload[6 + self.length * 7 :]
330 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
332 for i in range(ncells):
333 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
334 locac, cellid, sigstr = unpack(
335 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
337 self.gsm_cells.append((locac, cellid, -sigstr))
341 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
343 RESPOND = Respond.INL
345 def out_encode(self):
346 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
349 class TIME(GPS303Pkt):
351 RESPOND = Respond.INL
353 def out_encode(self):
354 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
357 class PROHIBIT_LBS(GPS303Pkt):
359 OUT_KWARGS = (("status", int, 1),)
361 def out_encode(self): # Server sent, 0-off, 1-on
362 return pack("B", self.status)
365 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
368 # Data is in packed decimal
370 # 00/01 - Don't set / Set upload period
371 # HHMMHHMM - Upload period
373 # 00/01 - Don't set / Set time of boot
374 # HHMM - Time of boot
375 # 00/01 - Don't set / Set time of shutdown
376 # HHMM - Time of shutdown
377 def out_encode(self):
381 class _SET_PHONE(GPS303Pkt):
382 OUT_KWARGS = (("phone", str, ""),)
384 def out_encode(self):
385 return self.phone.encode()
388 class REMOTE_MONITOR_PHONE(_SET_PHONE):
392 class SOS_PHONE(_SET_PHONE):
396 class DAD_PHONE(_SET_PHONE):
400 class MOM_PHONE(_SET_PHONE):
404 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
408 class GPS_OFF_PERIOD(GPS303Pkt):
412 ("fm", hhmm, "0000"),
413 ("to", hhmm, "2359"),
416 def out_encode(self):
418 pack("B", self.onoff)
419 + bytes.fromhex(self.fm)
420 + bytes.fromhex(self.to)
424 class DND_PERIOD(GPS303Pkt):
429 ("fm1", hhmm, "0000"),
430 ("to1", hhmm, "2359"),
431 ("fm2", hhmm, "0000"),
432 ("to2", hhmm, "2359"),
435 def out_endode(self):
437 pack("B", self.onoff)
438 + pack("B", self.week)
439 + bytes.fromhex(self.fm1)
440 + bytes.fromhex(self.to1)
441 + bytes.fromhex(self.fm2)
442 + bytes.fromhex(self.to2)
446 class RESTART_SHUTDOWN(GPS303Pkt):
448 OUT_KWARGS = (("flag", int, 0),)
450 def out_encode(self):
453 return pack("B", self.flag)
456 class DEVICE(GPS303Pkt):
458 OUT_KWARGS = (("flag", int, 0),)
460 # 0 - Stop looking for equipment
461 # 1 - Start looking for equipment
462 def out_encode(self):
463 return pack("B", self.flag)
466 class ALARM_CLOCK(GPS303Pkt):
469 def out_encode(self):
470 # TODO implement parsing kwargs
471 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
473 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
477 class STOP_ALARM(GPS303Pkt):
481 def from_packet(cls, length, payload):
482 self = super().from_packet(length, payload)
483 self.flag = payload[0]
487 class SETUP(GPS303Pkt):
489 RESPOND = Respond.EXT
491 ("uploadintervalseconds", intx, 0x0300),
492 ("binaryswitch", intx, 0b00110001),
493 ("alarms", l3int, [0, 0, 0]),
494 ("dndtimeswitch", int, 0),
495 ("dndtimes", l3int, [0, 0, 0]),
496 ("gpstimeswitch", int, 0),
497 ("gpstimestart", int, 0),
498 ("gpstimestop", int, 0),
499 ("phonenumbers", l3str, ["", "", ""]),
502 def out_encode(self):
504 return pack("!I", x)[1:]
508 pack("!H", self.uploadintervalseconds),
509 pack("B", self.binaryswitch),
511 + [pack3b(el) for el in self.alarms]
513 pack("B", self.dndtimeswitch),
515 + [pack3b(el) for el in self.dndtimes]
517 pack("B", self.gpstimeswitch),
518 pack("!H", self.gpstimestart),
519 pack("!H", self.gpstimestop),
521 + [b";".join([el.encode() for el in self.phonenumbers])]
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
529 class RESTORE_PASSWORD(GPS303Pkt):
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
535 RESPOND = Respond.EXT
536 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
538 def out_encode(self):
539 if self.lat is None or self.lon is None:
541 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
544 class MANUAL_POSITIONING(GPS303Pkt):
548 def from_packet(cls, length, payload):
549 self = super().from_packet(length, payload)
550 self.flag = payload[0] if len(payload) > 0 else None
555 4: "LBS search > 3 times",
556 5: "Same LBS and WiFi data",
557 6: "LBS prohibited, WiFi absent",
558 7: "GPS spacing < 50 m",
559 }.get(self.flag, "Unknown")
563 class BATTERY_CHARGE(GPS303Pkt):
567 class CHARGER_CONNECTED(GPS303Pkt):
571 class CHARGER_DISCONNECTED(GPS303Pkt):
575 class VIBRATION_RECEIVED(GPS303Pkt):
579 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
581 RESPOND = Respond.EXT
582 OUT_KWARGS = (("interval", int, 10),)
585 def from_packet(cls, length, payload):
586 self = super().from_packet(length, payload)
587 self.interval = unpack("!H", payload[:2])
590 def out_encode(self):
591 return pack("!H", interval)
594 class SOS_ALARM(GPS303Pkt):
598 # Build dicts protocol number -> class and class name -> protocol number
601 if True: # just to indent the code, sorry!
604 for name, cls in globals().items()
606 and issubclass(cls, GPS303Pkt)
607 and not name.startswith("_")
609 if hasattr(cls, "PROTO"):
610 CLASSES[cls.PROTO] = cls
611 PROTOS[cls.__name__] = cls.PROTO
614 def class_by_prefix(prefix):
617 for name, proto in PROTOS.items()
618 if name.upper().startswith(prefix.upper())
623 return CLASSES[proto]
626 def proto_by_name(name):
627 return PROTOS.get(name, -1)
630 def proto_of_message(packet):
631 return unpack("B", packet[1:2])[0]
634 def inline_response(packet):
635 proto = proto_of_message(packet)
638 if cls.RESPOND is Respond.INL:
639 return cls.Out().packed
643 def make_object(length, proto, payload):
645 return CLASSES[proto].from_packet(length, payload)
647 retobj = UNKNOWN.from_packet(length, payload)
648 retobj.PROTO = proto # Override class attr with object attr
652 def parse_message(packet):
653 length, proto = unpack("BB", packet[:2])
655 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
657 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
659 and len(payload) + adjust != length
662 "With proto %d length is %d but payload length is %d+%d",
668 return make_object(length, proto, payload)