else:
msgs.append(
f"Packet does not end with ']'"
- f" at {self.datalen+20}: {self.buffer=!r}"
+ f" at {self.datalen+20}: {self.buffer[:64]=!r}"
)
self.buffer = self.buffer[self.datalen + 21 :]
self.datalen = 0
return proto.startswith(PROTO_PREFIX)
+def _local_proto(packet: bytes) -> str:
+ try:
+ return packet[20:-1].split(b",")[0].decode()
+ except UnicodeDecodeError:
+ return "UNKNOWN"
+
+
def proto_of_message(packet: bytes) -> str:
- return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
+ return PROTO_PREFIX + _local_proto(packet)
def imei_from_packet(packet: bytes) -> Optional[str]:
def inline_response(packet: bytes) -> Optional[bytes]:
- proto = packet[20:-1].split(b",")[0].decode()
+ proto = _local_proto(packet)
if proto in CLASSES:
cls = CLASSES[proto]
if cls.RESPOND is Respond.INL:
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
self.conf["common"] = {
- "protocols": "zx303proto",
+ "protocols": "zx303proto,beesure",
}
self.conf["collector"] = {
"port": str(freeports[0]),
--- /dev/null
+""" Send junk to the collector """
+
+import unittest
+from .common import send_and_drain, TestWithServers, Fuzz
+
+REPEAT: int = 1000000
+
+
+class FuzzMsgs(Fuzz):
+ def test_msgs(self) -> None:
+ imeis = list(
+ self.rnd.randint(1000000000, 9999999999) for _ in range(3)
+ )
+ for _ in range(REPEAT):
+ base_size = self.rnd.randint(0, 5000)
+ size = base_size + (
+ (self.rnd.randint(-5, 5) // 5) * self.rnd.randint(1, 20)
+ )
+ if size < 0:
+ size = 0
+ imei = f"{self.rnd.choice(imeis):d}".encode("ascii")
+ commapos = self.rnd.randint(0, 10)
+ if commapos > 0 and commapos < base_size:
+ payload = (
+ self.rnd.randbytes(commapos - 1)
+ + b","
+ + self.rnd.randbytes(base_size - commapos)
+ )
+ else:
+ payload = self.rnd.randbytes(base_size)
+ # print(imei, base_size, size)
+ # "\[(\w\w)\*(\d{10})\*([0-9a-fA-F]{4})\*"
+ buf = (
+ b"[TS*"
+ + imei
+ + b"*"
+ + f"{size:04x}".encode("ascii")
+ + b"*"
+ + self.rnd.randbytes(base_size)
+ + b"]"
+ )
+ # print(buf[:64], "len:", len(buf))
+ send_and_drain(self.sock, buf)
+
+
+if __name__ == "__main__":
+ unittest.main()