From: Eugene Crosser Date: Mon, 18 Apr 2022 11:19:05 +0000 (+0200) Subject: rename protocol module to "gps303proto" X-Git-Tag: 0.01~58 X-Git-Url: http://average.org/gitweb/?a=commitdiff_plain;h=b613110bb16c95d9b641882c2ad6869e3ced1a0c;p=loctrkd.git rename protocol module to "gps303proto" --- diff --git a/gps303/GT06mod.py b/gps303/GT06mod.py deleted file mode 100755 index ded38f2..0000000 --- a/gps303/GT06mod.py +++ /dev/null @@ -1,410 +0,0 @@ -""" -Implementation of the protocol used by zx303 GPS+GPRS module -Description from https://github.com/tobadia/petGPS/tree/master/resources -""" - -from datetime import datetime, timezone -from inspect import isclass -from logging import getLogger -from struct import pack, unpack - -__all__ = ( - "handle_packet", - "make_object", - "make_response", - "set_config", - "UNKNOWN", - "LOGIN", - "SUPERVISION", - "HEARTBEAT", - "GPS_POSITIONING", - "GPS_OFFLINE_POSITIONING", - "STATUS", - "HIBERNATION", - "RESET", - "WHITELIST_TOTAL", - "WIFI_OFFLINE_POSITIONING", - "TIME", - "MOM_PHONE", - "STOP_ALARM", - "SETUP", - "SYNCHRONOUS_WHITELIST", - "RESTORE_PASSWORD", - "WIFI_POSITIONING", - "MANUAL_POSITIONING", - "BATTERY_CHARGE", - "CHARGER_CONNECTED", - "CHARGER_DISCONNECTED", - "VIBRATION_RECEIVED", - "POSITION_UPLOAD_INTERVAL", -) - -log = getLogger("gps303") - - -class _GT06pkt: - PROTO: int - CONFIG = None - - def __init__(self, *args, **kwargs): - assert len(args) == 0 - for k, v in kwargs.items(): - setattr(self, k, v) - - def __repr__(self): - return "{}({})".format( - self.__class__.__name__, - ", ".join( - "{}={}".format( - k, - 'bytes.fromhex("{}")'.format(v.hex()) - if isinstance(v, bytes) - else v.__repr__(), - ) - for k, v in self.__dict__.items() - if not k.startswith("_") - ), - ) - - @classmethod - def from_packet(cls, length, proto, payload): - return cls(proto=proto, payload=payload, length=length) - - def response(self, *args): - if len(args) == 0: - return None - assert len(args) == 1 and isinstance(args[0], bytes) - payload = args[0] - length = len(payload) + 1 - if length > 6: - length -= 6 - return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n" - - -class UNKNOWN(_GT06pkt): - pass - - -class LOGIN(_GT06pkt): - PROTO = 0x01 - - @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) - self.imei = payload[:-1].hex() - self.ver = unpack("B", payload[-1:])[0] - return self - - def response(self): - return super().response(b"") - - -class SUPERVISION(_GT06pkt): # Server sends supervision number status - PROTO = 0x05 - - def response(self, supnum=0): - # 1: The device automatically answers Pickup effect - # 2: Automatically Answering Two-way Calls - # 3: Ring manually answer the two-way call - return super().response(b"") - - -class HEARTBEAT(_GT06pkt): - PROTO = 0x08 - - -class _GPS_POSITIONING(_GT06pkt): - @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) - self.dtime = payload[:6] - if self.dtime == b"\0\0\0\0\0\0": - self.devtime = None - else: - self.devtime = datetime( - *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc - ) - self.gps_data_length = payload[6] >> 4 - self.gps_nb_sat = payload[6] & 0x0F - lat, lon, speed, flags = unpack("!IIBH", payload[7:18]) - self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3 - flip_lon = bool(flags & 0b0000100000000000) # bit 4 - flip_lat = not bool(flags & 0b0000010000000000) # bit 5 - self.heading = flags & 0b0000001111111111 # bits 6 - last - self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1) - self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1) - self.speed = speed - self.flags = flags - return self - - def response(self): - return super().response(self.dtime) - - -class GPS_POSITIONING(_GPS_POSITIONING): - PROTO = 0x10 - - -class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING): - PROTO = 0x11 - - -class STATUS(_GT06pkt): - PROTO = 0x13 - - @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) - if len(payload) == 5: - ( - self.batt, - self.ver, - self.timezone, - self.intvl, - self.signal, - ) = unpack("BBBBB", payload) - elif len(payload) == 4: - self.batt, self.ver, self.timezone, self.intvl = unpack( - "BBBB", payload - ) - self.signal = None - return self - - def response(self, upload_interval=25): # Set interval in minutes - return super().response(pack("B", upload_interval)) - - -class HIBERNATION(_GT06pkt): - PROTO = 0x14 - - -class RESET(_GT06pkt): # Device sends when it got reset SMS - PROTO = 0x15 - - def response(self): # Server can send to initiate factory reset - return super().response(b"") - - -class WHITELIST_TOTAL(_GT06pkt): # Server sends to initiage sync (0x58) - PROTO = 0x16 - - def response(self, number=3): # Number of whitelist entries - return super().response(pack("B", number)) - - -class _WIFI_POSITIONING(_GT06pkt): - @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) - self.dtime = payload[:6] - if self.dtime == b"\0\0\0\0\0\0": - self.devtime = None - else: - self.devtime = datetime.strptime( - self.dtime.hex(), "%y%m%d%H%M%S" - ).astimezone(tz=timezone.utc) - self.wifi_aps = [] - for i in range(self.length): # length has special meaning here - slice = payload[6 + i * 7 : 13 + i * 7] - self.wifi_aps.append( - (":".join([format(b, "02X") for b in slice[:6]]), -slice[6]) - ) - gsm_slice = payload[6 + self.length * 7 :] - ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4]) - self.gsm_cells = [] - for i in range(ncells): - slice = gsm_slice[4 + i * 5 : 9 + i * 5] - locac, cellid, sigstr = unpack( - "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5] - ) - self.gsm_cells.append((locac, cellid, -sigstr)) - return self - - -class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): - PROTO = 0x17 - - def response(self): - return super().response(self.dtime) - - -class TIME(_GT06pkt): - PROTO = 0x30 - - def response(self): - payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6]) - return super().response(payload) - - -class PROHIBIT_LBS(_GT06pkt): - PROTO = 0x33 - - def response(self, status=1): # Server sent, 0-off, 1-on - return super().response(pack("B", status)) - - -class MOM_PHONE(_GT06pkt): - PROTO = 0x43 - - -class STOP_UPLOAD(_GT06pkt): # Server response to LOGIN to thwart the device - PROTO = 0x44 - - def response(self): - return super().response(b"") - - -class STOP_ALARM(_GT06pkt): - PROTO = 0x56 - - -class SETUP(_GT06pkt): - PROTO = 0x57 - - def response( - self, - uploadIntervalSeconds=0x0300, - binarySwitch=0b00110001, - alarms=[0, 0, 0], - dndTimeSwitch=0, - dndTimes=[0, 0, 0], - gpsTimeSwitch=0, - gpsTimeStart=0, - gpsTimeStop=0, - phoneNumbers=["", "", ""], - ): - def pack3b(x): - return pack("!I", x)[1:] - - payload = b"".join( - [ - pack("!H", uploadIntervalSeconds), - pack("B", binarySwitch), - ] - + [pack3b(el) for el in alarms] - + [ - pack("B", dndTimeSwitch), - ] - + [pack3b(el) for el in dndTimes] - + [ - pack("B", gpsTimeSwitch), - pack("!H", gpsTimeStart), - pack("!H", gpsTimeStop), - ] - + [b";".join([el.encode() for el in phoneNumbers])] - ) - return super().response(payload) - - -class SYNCHRONOUS_WHITELIST(_GT06pkt): - PROTO = 0x58 - - -class RESTORE_PASSWORD(_GT06pkt): - PROTO = 0x67 - - -class WIFI_POSITIONING(_WIFI_POSITIONING): - PROTO = 0x69 - - def response(self, lat=None, lon=None): - if lat is None or lon is None: - payload = b"" - else: - payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode( - "ascii" - ) - return super().response(payload) - - -class MANUAL_POSITIONING(_GT06pkt): - PROTO = 0x80 - - -class BATTERY_CHARGE(_GT06pkt): - PROTO = 0x81 - - -class CHARGER_CONNECTED(_GT06pkt): - PROTO = 0x82 - - -class CHARGER_DISCONNECTED(_GT06pkt): - PROTO = 0x83 - - -class VIBRATION_RECEIVED(_GT06pkt): - PROTO = 0x94 - - -class POSITION_UPLOAD_INTERVAL(_GT06pkt): - PROTO = 0x98 - - @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) - self.interval = unpack("!H", payload[:2]) - return self - - def response(self): - return super().response(pack("!H", self.interval)) - - -class SOS_ALARM(_GT06pkt): - PROTO = 0x99 - - -# Build a dict protocol number -> class -CLASSES = {} -if True: # just to indent the code, sorry! - for cls in [ - cls - for name, cls in globals().items() - if isclass(cls) - and issubclass(cls, _GT06pkt) - and not name.startswith("_") - ]: - if hasattr(cls, "PROTO"): - CLASSES[cls.PROTO] = cls - - -def make_object(length, proto, payload): - if proto in CLASSES: - return CLASSES[proto].from_packet(length, proto, payload) - else: - return UNKNOWN.from_packet(length, proto, payload) - - -def handle_packet(packet, addr, when): - if len(packet) < 6: - return UNKNOWN.from_packet(0, 0, packet) - else: - xx, length, proto = unpack("!2sBB", packet[:4]) - crlf = packet[-2:] - payload = packet[4:-2] - adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case - if ( - proto - not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) - and length > 1 - and len(payload) + adjust != length - ): - log.warning( - "With proto %d length is %d but payload length is %d+%d", - proto, - length, - len(payload), - adjust, - ) - if xx != b"xx" or crlf != b"\r\n": - return UNKNOWN.from_packet(length, proto, packet) # full packet - else: - return make_object(length, proto, payload) - - -def make_response(msg, **kwargs): - return msg.response(**kwargs) - - -def set_config(config): # Note that we are setting _class_ attribute - _GT06pkt.CONFIG = config diff --git a/gps303/__main__.py b/gps303/__main__.py index e3fd84a..d3d6d1e 100755 --- a/gps303/__main__.py +++ b/gps303/__main__.py @@ -7,7 +7,7 @@ import sys from time import time from .config import readconfig -from .GT06mod import handle_packet, make_response, LOGIN, set_config +from .gps303proto import handle_packet, make_response, LOGIN, set_config from .evstore import initdb, stow from .lookaside import prepare_response diff --git a/gps303/collector.py b/gps303/collector.py index d8ca86c..28c636d 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,3 +1,5 @@ +""" TCP server that communicates with terminals """ + from getopt import getopt from logging import getLogger, StreamHandler, DEBUG, INFO from logging.handlers import SysLogHandler @@ -7,7 +9,7 @@ import sys import zmq from .config import readconfig -from .GT06mod import handle_packet, make_response, LOGIN, set_config +from .gps303proto import handle_packet, make_response, LOGIN, set_config CONF = "/etc/gps303.conf" @@ -15,41 +17,46 @@ log = getLogger("gps303/collector") class Bcast: + """Zmq message to broadcast what was received from the terminal""" def __init__(self, imei, msg): self.as_bytes = imei.encode() + msg.encode() -class Zmsg: +class Resp: + """Zmq message received from a third party to send to the terminal""" def __init__(self, msg): self.imei = msg[:16].decode() self.payload = msg[16:] class Client: - def __init__(self, clntsock, clntaddr): - self.clntsock = clntsock - self.clntaddr = clntaddr + """Connected socket to the terminal plus buffer and metadata""" + def __init__(self, sock, addr): + self.sock = sock + self.addr = addr self.buffer = b"" self.imei = None def close(self): - self.clntsock.close() + self.sock.close() + self.buffer = b"" + self.imei = None def recv(self): - packet = self.clntsock.recv(4096) - if not packet: + segment = self.sock.recv(4096) + if not segment: return None when = time() - self.buffer += packet + self.buffer += segment # implement framing properly - msg = handle_packet(packet, self.clntaddr, when) + msg = handle_packet(packet, self.addr, when) self.buffer = self.buffer[len(packet):] if isinstance(msg, LOGIN): self.imei = msg.imei return msg def send(self, buffer): - self.clntsock.send(buffer) + self.sock.send(buffer) class Clients: @@ -107,7 +114,7 @@ def runserver(opts, conf): while True: try: msg = zsub.recv(zmq.NOBLOCK) - tosend.append(Zmsg(msg)) + tosend.append(Resp(msg)) except zmq.Again: break elif sk == tcpfd: diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py new file mode 100755 index 0000000..4789b03 --- /dev/null +++ b/gps303/gps303proto.py @@ -0,0 +1,421 @@ +""" +Implementation of the protocol used by zx303 "ZhongXun Topin Locator" +GPS+GPRS module. Description lifted from this repository: +https://github.com/tobadia/petGPS/tree/master/resources + +Forewarnings: +1. There is no security whatsoever. If you know the module's IMEI, + you can feed fake data to the server, including fake location. +2. Ad-hoc choice of framing of messages (that are transferred over + the TCP stream) makes it vulnerable to coincidental appearance + of framing bytes in the middle of the message. Most of the time + the server will receive one message in one TCP segment (i.e. in + one `recv()` operation, but relying on that would break things + if the path has lower MTU than the size of a message. +""" + +from datetime import datetime, timezone +from inspect import isclass +from logging import getLogger +from struct import pack, unpack + +__all__ = ( + "handle_packet", + "make_object", + "make_response", + "set_config", + "UNKNOWN", + "LOGIN", + "SUPERVISION", + "HEARTBEAT", + "GPS_POSITIONING", + "GPS_OFFLINE_POSITIONING", + "STATUS", + "HIBERNATION", + "RESET", + "WHITELIST_TOTAL", + "WIFI_OFFLINE_POSITIONING", + "TIME", + "MOM_PHONE", + "STOP_ALARM", + "SETUP", + "SYNCHRONOUS_WHITELIST", + "RESTORE_PASSWORD", + "WIFI_POSITIONING", + "MANUAL_POSITIONING", + "BATTERY_CHARGE", + "CHARGER_CONNECTED", + "CHARGER_DISCONNECTED", + "VIBRATION_RECEIVED", + "POSITION_UPLOAD_INTERVAL", +) + +log = getLogger("gps303") + + +class _GT06pkt: + PROTO: int + CONFIG = None + + def __init__(self, *args, **kwargs): + assert len(args) == 0 + for k, v in kwargs.items(): + setattr(self, k, v) + + def __repr__(self): + return "{}({})".format( + self.__class__.__name__, + ", ".join( + "{}={}".format( + k, + 'bytes.fromhex("{}")'.format(v.hex()) + if isinstance(v, bytes) + else v.__repr__(), + ) + for k, v in self.__dict__.items() + if not k.startswith("_") + ), + ) + + @classmethod + def from_packet(cls, length, proto, payload): + return cls(proto=proto, payload=payload, length=length) + + def response(self, *args): + if len(args) == 0: + return None + assert len(args) == 1 and isinstance(args[0], bytes) + payload = args[0] + length = len(payload) + 1 + if length > 6: + length -= 6 + return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n" + + +class UNKNOWN(_GT06pkt): + pass + + +class LOGIN(_GT06pkt): + PROTO = 0x01 + + @classmethod + def from_packet(cls, length, proto, payload): + self = super().from_packet(length, proto, payload) + self.imei = payload[:-1].hex() + self.ver = unpack("B", payload[-1:])[0] + return self + + def response(self): + return super().response(b"") + + +class SUPERVISION(_GT06pkt): # Server sends supervision number status + PROTO = 0x05 + + def response(self, supnum=0): + # 1: The device automatically answers Pickup effect + # 2: Automatically Answering Two-way Calls + # 3: Ring manually answer the two-way call + return super().response(b"") + + +class HEARTBEAT(_GT06pkt): + PROTO = 0x08 + + +class _GPS_POSITIONING(_GT06pkt): + @classmethod + def from_packet(cls, length, proto, payload): + self = super().from_packet(length, proto, payload) + self.dtime = payload[:6] + if self.dtime == b"\0\0\0\0\0\0": + self.devtime = None + else: + self.devtime = datetime( + *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc + ) + self.gps_data_length = payload[6] >> 4 + self.gps_nb_sat = payload[6] & 0x0F + lat, lon, speed, flags = unpack("!IIBH", payload[7:18]) + self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3 + flip_lon = bool(flags & 0b0000100000000000) # bit 4 + flip_lat = not bool(flags & 0b0000010000000000) # bit 5 + self.heading = flags & 0b0000001111111111 # bits 6 - last + self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1) + self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1) + self.speed = speed + self.flags = flags + return self + + def response(self): + return super().response(self.dtime) + + +class GPS_POSITIONING(_GPS_POSITIONING): + PROTO = 0x10 + + +class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING): + PROTO = 0x11 + + +class STATUS(_GT06pkt): + PROTO = 0x13 + + @classmethod + def from_packet(cls, length, proto, payload): + self = super().from_packet(length, proto, payload) + if len(payload) == 5: + ( + self.batt, + self.ver, + self.timezone, + self.intvl, + self.signal, + ) = unpack("BBBBB", payload) + elif len(payload) == 4: + self.batt, self.ver, self.timezone, self.intvl = unpack( + "BBBB", payload + ) + self.signal = None + return self + + def response(self, upload_interval=25): # Set interval in minutes + return super().response(pack("B", upload_interval)) + + +class HIBERNATION(_GT06pkt): + PROTO = 0x14 + + +class RESET(_GT06pkt): # Device sends when it got reset SMS + PROTO = 0x15 + + def response(self): # Server can send to initiate factory reset + return super().response(b"") + + +class WHITELIST_TOTAL(_GT06pkt): # Server sends to initiage sync (0x58) + PROTO = 0x16 + + def response(self, number=3): # Number of whitelist entries + return super().response(pack("B", number)) + + +class _WIFI_POSITIONING(_GT06pkt): + @classmethod + def from_packet(cls, length, proto, payload): + self = super().from_packet(length, proto, payload) + self.dtime = payload[:6] + if self.dtime == b"\0\0\0\0\0\0": + self.devtime = None + else: + self.devtime = datetime.strptime( + self.dtime.hex(), "%y%m%d%H%M%S" + ).astimezone(tz=timezone.utc) + self.wifi_aps = [] + for i in range(self.length): # length has special meaning here + slice = payload[6 + i * 7 : 13 + i * 7] + self.wifi_aps.append( + (":".join([format(b, "02X") for b in slice[:6]]), -slice[6]) + ) + gsm_slice = payload[6 + self.length * 7 :] + ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4]) + self.gsm_cells = [] + for i in range(ncells): + slice = gsm_slice[4 + i * 5 : 9 + i * 5] + locac, cellid, sigstr = unpack( + "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5] + ) + self.gsm_cells.append((locac, cellid, -sigstr)) + return self + + +class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): + PROTO = 0x17 + + def response(self): + return super().response(self.dtime) + + +class TIME(_GT06pkt): + PROTO = 0x30 + + def response(self): + payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6]) + return super().response(payload) + + +class PROHIBIT_LBS(_GT06pkt): + PROTO = 0x33 + + def response(self, status=1): # Server sent, 0-off, 1-on + return super().response(pack("B", status)) + + +class MOM_PHONE(_GT06pkt): + PROTO = 0x43 + + +class STOP_UPLOAD(_GT06pkt): # Server response to LOGIN to thwart the device + PROTO = 0x44 + + def response(self): + return super().response(b"") + + +class STOP_ALARM(_GT06pkt): + PROTO = 0x56 + + +class SETUP(_GT06pkt): + PROTO = 0x57 + + def response( + self, + uploadIntervalSeconds=0x0300, + binarySwitch=0b00110001, + alarms=[0, 0, 0], + dndTimeSwitch=0, + dndTimes=[0, 0, 0], + gpsTimeSwitch=0, + gpsTimeStart=0, + gpsTimeStop=0, + phoneNumbers=["", "", ""], + ): + def pack3b(x): + return pack("!I", x)[1:] + + payload = b"".join( + [ + pack("!H", uploadIntervalSeconds), + pack("B", binarySwitch), + ] + + [pack3b(el) for el in alarms] + + [ + pack("B", dndTimeSwitch), + ] + + [pack3b(el) for el in dndTimes] + + [ + pack("B", gpsTimeSwitch), + pack("!H", gpsTimeStart), + pack("!H", gpsTimeStop), + ] + + [b";".join([el.encode() for el in phoneNumbers])] + ) + return super().response(payload) + + +class SYNCHRONOUS_WHITELIST(_GT06pkt): + PROTO = 0x58 + + +class RESTORE_PASSWORD(_GT06pkt): + PROTO = 0x67 + + +class WIFI_POSITIONING(_WIFI_POSITIONING): + PROTO = 0x69 + + def response(self, lat=None, lon=None): + if lat is None or lon is None: + payload = b"" + else: + payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode( + "ascii" + ) + return super().response(payload) + + +class MANUAL_POSITIONING(_GT06pkt): + PROTO = 0x80 + + +class BATTERY_CHARGE(_GT06pkt): + PROTO = 0x81 + + +class CHARGER_CONNECTED(_GT06pkt): + PROTO = 0x82 + + +class CHARGER_DISCONNECTED(_GT06pkt): + PROTO = 0x83 + + +class VIBRATION_RECEIVED(_GT06pkt): + PROTO = 0x94 + + +class POSITION_UPLOAD_INTERVAL(_GT06pkt): + PROTO = 0x98 + + @classmethod + def from_packet(cls, length, proto, payload): + self = super().from_packet(length, proto, payload) + self.interval = unpack("!H", payload[:2]) + return self + + def response(self): + return super().response(pack("!H", self.interval)) + + +class SOS_ALARM(_GT06pkt): + PROTO = 0x99 + + +# Build a dict protocol number -> class +CLASSES = {} +if True: # just to indent the code, sorry! + for cls in [ + cls + for name, cls in globals().items() + if isclass(cls) + and issubclass(cls, _GT06pkt) + and not name.startswith("_") + ]: + if hasattr(cls, "PROTO"): + CLASSES[cls.PROTO] = cls + + +def make_object(length, proto, payload): + if proto in CLASSES: + return CLASSES[proto].from_packet(length, proto, payload) + else: + return UNKNOWN.from_packet(length, proto, payload) + + +def handle_packet(packet, addr, when): + if len(packet) < 6: + return UNKNOWN.from_packet(0, 0, packet) + else: + xx, length, proto = unpack("!2sBB", packet[:4]) + crlf = packet[-2:] + payload = packet[4:-2] + adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case + if ( + proto + not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) + and length > 1 + and len(payload) + adjust != length + ): + log.warning( + "With proto %d length is %d but payload length is %d+%d", + proto, + length, + len(payload), + adjust, + ) + if xx != b"xx" or crlf != b"\r\n": + return UNKNOWN.from_packet(length, proto, packet) # full packet + else: + return make_object(length, proto, payload) + + +def make_response(msg, **kwargs): + return msg.response(**kwargs) + + +def set_config(config): # Note that we are setting _class_ attribute + _GT06pkt.CONFIG = config diff --git a/gps303/lookaside.py b/gps303/lookaside.py index 3c3ee32..d9af989 100644 --- a/gps303/lookaside.py +++ b/gps303/lookaside.py @@ -2,7 +2,7 @@ For when responding to the terminal is not trivial """ -from .GT06mod import * +from .gps303proto import * from .opencellid import qry_cell def prepare_response(conf, msg): diff --git a/gps303/mkgpx.py b/gps303/mkgpx.py index e0084c3..0d1fe89 100644 --- a/gps303/mkgpx.py +++ b/gps303/mkgpx.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone from sqlite3 import connect import sys -from .GT06mod import * +from .gps303proto import * from .opencellid import qry_cell db = connect(sys.argv[1]) diff --git a/gps303/opencellid.py b/gps303/opencellid.py index 192409a..843e595 100644 --- a/gps303/opencellid.py +++ b/gps303/opencellid.py @@ -58,7 +58,7 @@ def qry_cell(dbname, mcc, gsm_cells): if __name__.endswith("__main__"): from datetime import datetime, timezone import sys - from .GT06mod import * + from .gps303proto import * db = connect(sys.argv[1]) c = db.cursor()