1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
7 from .common import send_and_drain, TestWithServers
12 class Fuzz(TestWithServers):
13 def setUp(self, *args: str) -> None:
14 super().setUp("collector")
16 for fam, typ, pro, cnm, skadr in getaddrinfo(
18 self.conf.getint("collector", "port"),
22 break # Just take the first element
23 self.sock = socket(AF_INET, SOCK_STREAM)
24 self.sock.connect(skadr)
26 def tearDown(self) -> None:
27 sleep(1) # give collector some time
28 send_and_drain(self.sock, None)
30 sleep(1) # Let the server close their side
33 def test_stream(self) -> None:
34 for _ in range(REPEAT):
35 size = self.rnd.randint(1, 5000)
36 buf = self.rnd.randbytes(size)
37 send_and_drain(self.sock, buf)
39 def test_msgs(self) -> None:
40 for _ in range(REPEAT):
41 size = self.rnd.randint(0, 300)
42 buf = b"xx" + self.rnd.randbytes(size) + b"\r\n"
43 send_and_drain(self.sock, buf)
46 if __name__ == "__main__":