2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
46 "SYNCHRONOUS_WHITELIST",
52 "CHARGER_DISCONNECTED",
54 "POSITION_UPLOAD_INTERVAL",
57 log = getLogger("gps303")
62 For each class corresponding to a message, automatically create
63 two nested classes `In` and `Out` that also inherit from their
64 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
65 copied to the `In` nested class under the name `KWARGS`, and
66 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
67 to the nested class `Out`. In addition, method `encode` is
68 defined in both classes equal to `in_encode()` and `out_encode()`
72 def __new__(cls, name, bases, attrs):
73 newcls = super().__new__(cls, name, bases, attrs)
74 nestattrs = {"encode": lambda self: self.in_encode()}
75 if "IN_KWARGS" in attrs:
76 nestattrs["KWARGS"] = attrs["IN_KWARGS"]
77 newcls.In = super().__new__(
83 nestattrs = {"encode": lambda self: self.out_encode()}
84 if "OUT_KWARGS" in attrs:
85 nestattrs["KWARGS"] = attrs["OUT_KWARGS"]
86 newcls.Out = super().__new__(
96 NON = 0 # Incoming, no response needed
97 INL = 1 # Birirectional, use `inline_response()`
98 EXT = 2 # Birirectional, use external responder
101 class GPS303Pkt(metaclass=MetaPkt):
102 RESPOND = Respond.NON # Do not send anything back by default
104 # Have these kwargs for now, TODO redo
105 IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
107 def __init__(self, *args, **kwargs):
108 assert len(args) == 0
109 for kw, typ, dfl in self.KWARGS:
110 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
112 print("KWARGS", self.KWARGS)
113 print("kwargs", kwargs)
115 self.__class__.__name__ + " stray kwargs " + str(kwargs)
119 return "{}({})".format(
120 self.__class__.__name__,
124 'bytes.fromhex("{}")'.format(v.hex())
125 if isinstance(v, bytes)
128 for k, v in self.__dict__.items()
129 if not k.startswith("_")
134 raise NotImplementedError(
135 self.__class__.__name__ + ".encode() not implemented"
138 def out_encode(self):
143 payload = self.encode()
144 length = len(payload) + 1
145 return pack("BB", length, self.PROTO) + payload
148 def from_packet(cls, length, payload):
149 return cls.In(payload=payload, length=length)
152 class UNKNOWN(GPS303Pkt):
153 PROTO = 256 # > 255 is impossible in real packets
156 class LOGIN(GPS303Pkt):
158 RESPOND = Respond.INL
159 # Default response for ACK, can also respond with STOP_UPLOAD
162 def from_packet(cls, length, payload):
163 self = super().from_packet(length, payload)
164 self.imei = payload[:-1].hex()
165 self.ver = unpack("B", payload[-1:])[0]
169 class SUPERVISION(GPS303Pkt):
171 OUT_KWARGS = (("status", int, 1),)
173 def out_encode(self):
174 # 1: The device automatically answers Pickup effect
175 # 2: Automatically Answering Two-way Calls
176 # 3: Ring manually answer the two-way call
177 return pack("B", self.status)
180 class HEARTBEAT(GPS303Pkt):
182 RESPOND = Respond.INL
185 class _GPS_POSITIONING(GPS303Pkt):
186 RESPOND = Respond.INL
189 def from_packet(cls, length, payload):
190 self = super().from_packet(length, payload)
191 self.dtime = payload[:6]
192 if self.dtime == b"\0\0\0\0\0\0":
195 self.devtime = datetime(
196 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
198 self.gps_data_length = payload[6] >> 4
199 self.gps_nb_sat = payload[6] & 0x0F
200 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
201 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
202 flip_lon = bool(flags & 0b0000100000000000) # bit 4
203 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
204 self.heading = flags & 0b0000001111111111 # bits 6 - last
205 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
206 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
211 def out_encode(self):
212 tup = datetime.utcnow().timetuple()
213 ttup = (tup[0] % 100,) + tup[1:6]
214 return pack("BBBBBB", *ttup)
217 class GPS_POSITIONING(_GPS_POSITIONING):
221 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
225 class STATUS(GPS303Pkt):
227 RESPOND = Respond.EXT
228 OUT_KWARGS = (("upload_interval", int, 25),)
231 def from_packet(cls, length, payload):
232 self = super().from_packet(length, payload)
233 if len(payload) == 5:
240 ) = unpack("BBBBB", payload)
241 elif len(payload) == 4:
242 self.batt, self.ver, self.timezone, self.intvl = unpack(
248 def out_encode(self): # Set interval in minutes
249 return cls.make_packet(pack("B", self.upload_interval))
252 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
254 RESPOND = Respond.INL
257 class RESET(GPS303Pkt):
258 # Device sends when it got reset SMS
259 # Server can send to initiate factory reset
263 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
265 OUT_KWARGS = (("number", int, 3),)
267 def out_encode(self): # Number of whitelist entries
268 return pack("B", number)
271 class _WIFI_POSITIONING(GPS303Pkt):
273 def from_packet(cls, length, payload):
274 self = super().from_packet(length, payload)
275 self.dtime = payload[:6]
276 if self.dtime == b"\0\0\0\0\0\0":
279 self.devtime = datetime.strptime(
280 self.dtime.hex(), "%y%m%d%H%M%S"
281 ).astimezone(tz=timezone.utc)
283 for i in range(self.length): # length has special meaning here
284 slice = payload[6 + i * 7 : 13 + i * 7]
285 self.wifi_aps.append(
286 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
288 gsm_slice = payload[6 + self.length * 7 :]
289 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
291 for i in range(ncells):
292 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
293 locac, cellid, sigstr = unpack(
294 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
296 self.gsm_cells.append((locac, cellid, -sigstr))
300 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
302 RESPOND = Respond.INL
304 def out_encode(self):
305 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
308 class TIME(GPS303Pkt):
310 RESPOND = Respond.INL
312 def out_encode(self):
313 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
316 class PROHIBIT_LBS(GPS303Pkt):
318 OUT_KWARGS = (("status", int, 1),)
320 def out_encode(self): # Server sent, 0-off, 1-on
321 return pack("B", self.status)
324 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
327 # Data is in packed decimal
329 # 00/01 - Don't set / Set upload period
330 # HHMMHHMM - Upload period
332 # 00/01 - Don't set / Set time of boot
333 # HHMM - Time of boot
334 # 00/01 - Don't set / Set time of shutdown
335 # HHMM - Time of shutdown
336 def out_encode(self):
340 class _SET_PHONE(GPS303Pkt):
341 OUT_KWARGS = (("phone", str, ""),)
343 def out_encode(self):
344 return self.phone.encode()
347 class REMOTE_MONITOR_PHONE(_SET_PHONE):
351 class SOS_PHONE(_SET_PHONE):
355 class DAD_PHONE(_SET_PHONE):
359 class MOM_PHONE(_SET_PHONE):
363 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
367 class GPS_OFF_PERIOD(GPS303Pkt):
369 OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
371 def out_encode(self):
373 pack("B", self.onoff)
374 + bytes.fromhex(self.fm)
375 + bytes.fromhex(self.to)
379 class DND_PERIOD(GPS303Pkt):
384 ("fm1", str, "0000"),
385 ("to1", str, "2359"),
386 ("fm2", str, "0000"),
387 ("to2", str, "2359"),
390 def out_endode(self):
392 pack("B", self.onoff)
393 + pack("B", self.week)
394 + bytes.fromhex(self.fm1)
395 + bytes.fromhex(self.to1)
396 + bytes.fromhex(self.fm2)
397 + bytes.fromhex(self.to2)
401 class RESTART_SHUTDOWN(GPS303Pkt):
403 OUT_KWARGS = (("flag", int, 2),)
405 def out_encode(self):
408 return pack("B", self.flag)
411 class DEVICE(GPS303Pkt):
413 OUT_KWARGS = (("flag", int, 0),)
415 # 0 - Stop looking for equipment
416 # 1 - Start looking for equipment
417 def out_encode(self):
418 return pack("B", self.flag)
421 class ALARM_CLOCK(GPS303Pkt):
424 def out_encode(self):
425 # TODO implement parsing kwargs
426 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
428 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
432 class STOP_ALARM(GPS303Pkt):
436 def from_packet(cls, length, payload):
437 self = super().from_packet(length, payload)
438 self.flag = payload[0]
442 class SETUP(GPS303Pkt):
444 RESPOND = Respond.EXT
445 OUT_KWARGS = ( # TODO handle properly
446 ("uploadintervalseconds", int, 0x0300),
447 ("binaryswitch", int, 0b00110001),
448 ("alarms", int, [0, 0, 0]),
449 ("dndtimeswitch", int, 0),
450 ("dndtimes", int, [0, 0, 0]),
451 ("gpstimeswitch", int, 0),
452 ("gpstimestart", int, 0),
453 ("gpstimestop", int, 0),
454 ("phonenumbers", int, ["", "", ""]),
457 def out_encode(self):
459 return pack("!I", x)[1:]
463 pack("!H", self.uploadintervalseconds),
464 pack("B", self.binaryswitch),
466 + [pack3b(el) for el in self.alarms]
468 pack("B", self.dndtimeswitch),
470 + [pack3b(el) for el in self.dndtimes]
472 pack("B", self.gpstimeswitch),
473 pack("!H", self.gpstimestart),
474 pack("!H", self.gpstimestop),
476 + [b";".join([el.encode() for el in self.phonenumbers])]
480 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
484 class RESTORE_PASSWORD(GPS303Pkt):
488 class WIFI_POSITIONING(_WIFI_POSITIONING):
490 RESPOND = Respond.EXT
491 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
493 def out_encode(self):
494 if self.lat is None or self.lon is None:
496 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
499 class MANUAL_POSITIONING(GPS303Pkt):
503 def from_packet(cls, length, payload):
504 self = super().from_packet(length, payload)
505 self.flag = payload[0] if len(payload) > 0 else None
510 4: "LBS search > 3 times",
511 5: "Same LBS and WiFi data",
512 6: "LBS prohibited, WiFi absent",
513 7: "GPS spacing < 50 m",
514 }.get(self.flag, "Unknown")
518 class BATTERY_CHARGE(GPS303Pkt):
522 class CHARGER_CONNECTED(GPS303Pkt):
526 class CHARGER_DISCONNECTED(GPS303Pkt):
530 class VIBRATION_RECEIVED(GPS303Pkt):
534 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
536 RESPOND = Respond.EXT
537 OUT_KWARGS = (("interval", int, 10),)
540 def from_packet(cls, length, payload):
541 self = super().from_packet(length, payload)
542 self.interval = unpack("!H", payload[:2])
545 def out_encode(self):
546 return pack("!H", interval)
549 class SOS_ALARM(GPS303Pkt):
553 # Build dicts protocol number -> class and class name -> protocol number
556 if True: # just to indent the code, sorry!
559 for name, cls in globals().items()
561 and issubclass(cls, GPS303Pkt)
562 and not name.startswith("_")
564 if hasattr(cls, "PROTO"):
565 CLASSES[cls.PROTO] = cls
566 PROTOS[cls.__name__] = cls.PROTO
569 def class_by_prefix(prefix):
572 for name, proto in PROTOS.items()
573 if name.upper().startswith(prefix.upper())
578 return CLASSES[proto]
581 def proto_by_name(name):
582 return PROTOS.get(name, -1)
585 def proto_of_message(packet):
586 return unpack("B", packet[1:2])[0]
589 def inline_response(packet):
590 proto = proto_of_message(packet)
593 if cls.RESPOND is Respond.INL:
594 return cls.Out().packed
598 def make_object(length, proto, payload):
600 return CLASSES[proto].from_packet(length, payload)
602 retobj = UNKNOWN.from_packet(length, payload)
603 retobj.PROTO = proto # Override class attr with object attr
607 def parse_message(packet):
608 length, proto = unpack("BB", packet[:2])
610 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
612 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
614 and len(payload) + adjust != length
617 "With proto %d length is %d but payload length is %d+%d",
623 return make_object(length, proto, payload)