1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
8 from .common import send_and_drain, TestWithServers
13 class Fuzz(TestWithServers):
14 def setUp(self, *args: str, **kwargs: Any) -> None:
15 super().setUp("collector")
17 for fam, typ, pro, cnm, skadr in getaddrinfo(
19 self.conf.getint("collector", "port"),
23 break # Just take the first element
24 self.sock = socket(AF_INET, SOCK_STREAM)
25 self.sock.connect(skadr)
27 def tearDown(self) -> None:
28 sleep(1) # give collector some time
29 send_and_drain(self.sock, None)
31 sleep(1) # Let the server close their side
34 def test_stream(self) -> None:
35 for _ in range(REPEAT):
36 size = self.rnd.randint(1, 5000)
37 buf = self.rnd.randbytes(size)
38 send_and_drain(self.sock, buf)
40 def test_msgs(self) -> None:
41 for _ in range(REPEAT):
42 size = self.rnd.randint(0, 300)
43 buf = b"xx" + self.rnd.randbytes(size) + b"\r\n"
44 send_and_drain(self.sock, buf)
47 if __name__ == "__main__":