From 718dd83f618a0ee2674450162aba85cfed447512 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Tue, 23 Aug 2022 16:08:48 +0200 Subject: [PATCH] test: add fuzzer for beesure protocol --- loctrkd/beesure.py | 13 +++++++++--- test/common.py | 2 +- test/test_fuzz_bs.py | 47 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) create mode 100644 test/test_fuzz_bs.py diff --git a/loctrkd/beesure.py b/loctrkd/beesure.py index 9e9e22b..dcf16ba 100755 --- a/loctrkd/beesure.py +++ b/loctrkd/beesure.py @@ -116,7 +116,7 @@ class Stream: else: msgs.append( f"Packet does not end with ']'" - f" at {self.datalen+20}: {self.buffer=!r}" + f" at {self.datalen+20}: {self.buffer[:64]=!r}" ) self.buffer = self.buffer[self.datalen + 21 :] self.datalen = 0 @@ -602,8 +602,15 @@ def proto_handled(proto: str) -> bool: return proto.startswith(PROTO_PREFIX) +def _local_proto(packet: bytes) -> str: + try: + return packet[20:-1].split(b",")[0].decode() + except UnicodeDecodeError: + return "UNKNOWN" + + def proto_of_message(packet: bytes) -> str: - return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode() + return PROTO_PREFIX + _local_proto(packet) def imei_from_packet(packet: bytes) -> Optional[str]: @@ -618,7 +625,7 @@ def is_goodbye_packet(packet: bytes) -> bool: def inline_response(packet: bytes) -> Optional[bytes]: - proto = packet[20:-1].split(b",")[0].decode() + proto = _local_proto(packet) if proto in CLASSES: cls = CLASSES[proto] if cls.RESPOND is Respond.INL: diff --git a/test/common.py b/test/common.py index 131169c..a31ff09 100644 --- a/test/common.py +++ b/test/common.py @@ -46,7 +46,7 @@ class TestWithServers(TestCase): _, self.tmpfilebase = mkstemp() self.conf = ConfigParser() self.conf["common"] = { - "protocols": "zx303proto", + "protocols": "zx303proto,beesure", } self.conf["collector"] = { "port": str(freeports[0]), diff --git a/test/test_fuzz_bs.py b/test/test_fuzz_bs.py new file mode 100644 index 0000000..ca63c0f --- /dev/null +++ b/test/test_fuzz_bs.py @@ -0,0 +1,47 @@ +""" Send junk to the collector """ + +import unittest +from .common import send_and_drain, TestWithServers, Fuzz + +REPEAT: int = 1000000 + + +class FuzzMsgs(Fuzz): + def test_msgs(self) -> None: + imeis = list( + self.rnd.randint(1000000000, 9999999999) for _ in range(3) + ) + for _ in range(REPEAT): + base_size = self.rnd.randint(0, 5000) + size = base_size + ( + (self.rnd.randint(-5, 5) // 5) * self.rnd.randint(1, 20) + ) + if size < 0: + size = 0 + imei = f"{self.rnd.choice(imeis):d}".encode("ascii") + commapos = self.rnd.randint(0, 10) + if commapos > 0 and commapos < base_size: + payload = ( + self.rnd.randbytes(commapos - 1) + + b"," + + self.rnd.randbytes(base_size - commapos) + ) + else: + payload = self.rnd.randbytes(base_size) + # print(imei, base_size, size) + # "\[(\w\w)\*(\d{10})\*([0-9a-fA-F]{4})\*" + buf = ( + b"[TS*" + + imei + + b"*" + + f"{size:04x}".encode("ascii") + + b"*" + + self.rnd.randbytes(base_size) + + b"]" + ) + # print(buf[:64], "len:", len(buf)) + send_and_drain(self.sock, buf) + + +if __name__ == "__main__": + unittest.main() -- 2.43.0