from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
-from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from socket import (
+ AF_INET6,
+ MSG_DONTWAIT,
+ SOCK_DGRAM,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ socket,
+ SocketType,
+)
from tempfile import mkstemp
from time import sleep
+from typing import Optional
from unittest import TestCase
if p.pid is not None:
kill(p.pid, SIGINT)
p.join()
- print(srvname, "terminated with return code", p.exitcode)
+ self.assertEqual(
+ p.exitcode,
+ 0,
+ srvname + " terminated with non-zero return code",
+ )
for sfx in (".pub", ".pul"):
unlink(self.tmpfilebase + sfx)
+
+
+def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
+ if buf is not None:
+ sock.send(buf)
+ try:
+ sock.recv(4096, MSG_DONTWAIT)
+ except BlockingIOError:
+ pass
""" Send junk to the collector """
from random import Random
-from socket import getaddrinfo, socket, AF_INET6, MSG_DONTWAIT, SOCK_STREAM
+from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
from time import sleep
-from typing import Optional
import unittest
-from .common import TestWithServers
+from .common import send_and_drain, TestWithServers
REPEAT: int = 1000000
def tearDown(self) -> None:
sleep(1) # give collector some time
- self._send_and_drain(None)
+ send_and_drain(self.sock, None)
self.sock.close()
print("finished fuzzing")
super().tearDown()
- def _send_and_drain(self, buf: Optional[bytes]) -> None:
- if buf is not None:
- self.sock.send(buf)
- try:
- self.sock.recv(4096, MSG_DONTWAIT)
- except BlockingIOError:
- pass
-
def test_stream(self) -> None:
for _ in range(REPEAT):
size = self.rnd.randint(1, 5000)
buf = self.rnd.randbytes(size)
- self._send_and_drain(buf)
+ send_and_drain(self.sock, buf)
def test_msgs(self) -> None:
for _ in range(REPEAT):
size = self.rnd.randint(0, 300)
buf = b"xx" + self.rnd.randbytes(size) + b"\r\n"
- self._send_and_drain(buf)
+ send_and_drain(self.sock, buf)
if __name__ == "__main__":