]> average.org Git - loctrkd.git/commitdiff
test: add fuzzer for beesure protocol
authorEugene Crosser <crosser@average.org>
Tue, 23 Aug 2022 14:08:48 +0000 (16:08 +0200)
committerEugene Crosser <crosser@average.org>
Tue, 23 Aug 2022 22:12:17 +0000 (00:12 +0200)
loctrkd/beesure.py
test/common.py
test/test_fuzz_bs.py [new file with mode: 0644]

index 9e9e22b65667c3494338efd242ce19d33ad58321..dcf16ba227fd7ca585baa9371595e19307917fe9 100755 (executable)
@@ -116,7 +116,7 @@ class Stream:
             else:
                 msgs.append(
                     f"Packet does not end with ']'"
-                    f" at {self.datalen+20}: {self.buffer=!r}"
+                    f" at {self.datalen+20}: {self.buffer[:64]=!r}"
                 )
             self.buffer = self.buffer[self.datalen + 21 :]
             self.datalen = 0
@@ -602,8 +602,15 @@ def proto_handled(proto: str) -> bool:
     return proto.startswith(PROTO_PREFIX)
 
 
+def _local_proto(packet: bytes) -> str:
+    try:
+        return packet[20:-1].split(b",")[0].decode()
+    except UnicodeDecodeError:
+        return "UNKNOWN"
+
+
 def proto_of_message(packet: bytes) -> str:
-    return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
+    return PROTO_PREFIX + _local_proto(packet)
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
@@ -618,7 +625,7 @@ def is_goodbye_packet(packet: bytes) -> bool:
 
 
 def inline_response(packet: bytes) -> Optional[bytes]:
-    proto = packet[20:-1].split(b",")[0].decode()
+    proto = _local_proto(packet)
     if proto in CLASSES:
         cls = CLASSES[proto]
         if cls.RESPOND is Respond.INL:
index 131169ccedb80b42d3864f5fe055c191c303261f..a31ff094d9d806ebfa5b1572d49587a2a20c7bd8 100644 (file)
@@ -46,7 +46,7 @@ class TestWithServers(TestCase):
         _, self.tmpfilebase = mkstemp()
         self.conf = ConfigParser()
         self.conf["common"] = {
-            "protocols": "zx303proto",
+            "protocols": "zx303proto,beesure",
         }
         self.conf["collector"] = {
             "port": str(freeports[0]),
diff --git a/test/test_fuzz_bs.py b/test/test_fuzz_bs.py
new file mode 100644 (file)
index 0000000..ca63c0f
--- /dev/null
@@ -0,0 +1,47 @@
+""" Send junk to the collector """
+
+import unittest
+from .common import send_and_drain, TestWithServers, Fuzz
+
+REPEAT: int = 1000000
+
+
+class FuzzMsgs(Fuzz):
+    def test_msgs(self) -> None:
+        imeis = list(
+            self.rnd.randint(1000000000, 9999999999) for _ in range(3)
+        )
+        for _ in range(REPEAT):
+            base_size = self.rnd.randint(0, 5000)
+            size = base_size + (
+                (self.rnd.randint(-5, 5) // 5) * self.rnd.randint(1, 20)
+            )
+            if size < 0:
+                size = 0
+            imei = f"{self.rnd.choice(imeis):d}".encode("ascii")
+            commapos = self.rnd.randint(0, 10)
+            if commapos > 0 and commapos < base_size:
+                payload = (
+                    self.rnd.randbytes(commapos - 1)
+                    + b","
+                    + self.rnd.randbytes(base_size - commapos)
+                )
+            else:
+                payload = self.rnd.randbytes(base_size)
+            # print(imei, base_size, size)
+            # "\[(\w\w)\*(\d{10})\*([0-9a-fA-F]{4})\*"
+            buf = (
+                b"[TS*"
+                + imei
+                + b"*"
+                + f"{size:04x}".encode("ascii")
+                + b"*"
+                + self.rnd.randbytes(base_size)
+                + b"]"
+            )
+            # print(buf[:64], "len:", len(buf))
+            send_and_drain(self.sock, buf)
+
+
+if __name__ == "__main__":
+    unittest.main()