1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET6, MSG_DONTWAIT, SOCK_STREAM
6 from .common import TestWithServers
11 class Fuzz(TestWithServers):
12 def setUp(self, *args: str) -> None:
13 super().setUp("collector")
15 for fam, typ, pro, cnm, skadr in getaddrinfo(
17 self.conf.getint("collector", "port"),
21 break # Just take the first element
22 self.sock = socket(AF_INET6, SOCK_STREAM)
23 self.sock.connect(skadr)
25 def tearDown(self) -> None:
27 print("finished fuzzing")
30 def test_fuzz(self) -> None:
31 for _ in range(REPEAT):
32 size = self.rnd.randint(1, 5000)
33 buf = self.rnd.randbytes(size)
36 self.sock.recv(4096, MSG_DONTWAIT)
37 except BlockingIOError:
41 if __name__ == "__main__":