git: 0607df059582 - stable/13 - pf: test rules evaluation in the face of multiple IPv6 fragment headers

From: Kristof Provost <kp_at_FreeBSD.org>
Date: Fri, 04 Aug 2023 14:08:25 UTC
The branch stable/13 has been updated by kp:

URL: https://cgit.FreeBSD.org/src/commit/?id=0607df059582abc970b21cc3ce6e297dec1d5eeb

commit 0607df059582abc970b21cc3ce6e297dec1d5eeb
Author:     Kristof Provost <kp@FreeBSD.org>
AuthorDate: 2023-07-13 06:34:54 +0000
Commit:     Kristof Provost <kp@FreeBSD.org>
CommitDate: 2023-08-04 14:08:05 +0000

    pf: test rules evaluation in the face of multiple IPv6 fragment headers
    
    Send an ICMPv6 echo request packet with multiple IPv6 fragment headers.
    Set rules to pass all packets, except for ICMPv6 echo requests.
    
    pf ought to drop the echo request, but doesn't because it reassembles
    the packet, and then doesn't handle the second fragment header. In other
    words: it fails to detect the ICMPv6 echo header.
    
    Reported by:    Enrico Bassetti bassetti@di.uniroma1.it (NetSecurityLab @ Sapienza University of Rome)
    MFC after:      instant
    Sponsored by:   Rubicon Communications, LLC ("Netgate")
    
    (cherry picked from commit b23dbabb7f3edb3f323a64f03e37be2c9a8b2a45)
---
 tests/atf_python/sys/net/tools.py | 21 ++++++++++++++
 tests/sys/netpfil/pf/Makefile     |  2 ++
 tests/sys/netpfil/pf/frag6.py     | 60 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 83 insertions(+)

diff --git a/tests/atf_python/sys/net/tools.py b/tests/atf_python/sys/net/tools.py
index 567d9d4b21ac..44bd74d8578f 100644
--- a/tests/atf_python/sys/net/tools.py
+++ b/tests/atf_python/sys/net/tools.py
@@ -1,6 +1,7 @@
 #!/usr/local/bin/python3
 import json
 import os
+import subprocess
 
 
 class ToolsHelper(object):
@@ -13,6 +14,26 @@ class ToolsHelper(object):
             print("run: '{}'".format(cmd))
         return os.popen(cmd).read()
 
+    @classmethod
+    def pf_rules(cls, rules, verbose=True):
+        pf_conf = ""
+        for r in rules:
+            pf_conf = pf_conf + r + "\n"
+
+        if verbose:
+            print("Set rules:")
+            print(pf_conf)
+
+        ps = subprocess.Popen("/sbin/pfctl -g -f -", shell=True,
+            stdin=subprocess.PIPE)
+        ps.communicate(bytes(pf_conf, 'utf-8'))
+        ret = ps.wait()
+        if ret != 0:
+            raise Exception("Failed to set pf rules %d" % ret)
+
+        if verbose:
+            cls.print_output("/sbin/pfctl -sr")
+
     @classmethod
     def print_output(cls, cmd: str, verbose=True):
         if verbose:
diff --git a/tests/sys/netpfil/pf/Makefile b/tests/sys/netpfil/pf/Makefile
index 8f6b57797e23..26e5710d6f81 100644
--- a/tests/sys/netpfil/pf/Makefile
+++ b/tests/sys/netpfil/pf/Makefile
@@ -32,6 +32,8 @@ ATF_TESTS_SH+=	altq \
 		table \
 		tos
 
+ATF_TESTS_PYTEST+=	frag6.py
+
 # Tests reuse jail names and so cannot run in parallel.
 TEST_METADATA+=	is_exclusive=true
 
diff --git a/tests/sys/netpfil/pf/frag6.py b/tests/sys/netpfil/pf/frag6.py
new file mode 100644
index 000000000000..28b1829d418c
--- /dev/null
+++ b/tests/sys/netpfil/pf/frag6.py
@@ -0,0 +1,60 @@
+import pytest
+import logging
+import threading
+import time
+logging.getLogger("scapy").setLevel(logging.CRITICAL)
+from atf_python.sys.net.tools import ToolsHelper
+from atf_python.sys.net.vnet import VnetTestTemplate
+
+class DelayedSend(threading.Thread):
+    def __init__(self, packet):
+        threading.Thread.__init__(self)
+        self._packet = packet
+
+        self.start()
+
+    def run(self):
+        import scapy.all as sp
+        time.sleep(1)
+        sp.send(self._packet)
+
+class TestFrag6(VnetTestTemplate):
+    REQUIRED_MODULES = ["pf"]
+    TOPOLOGY = {
+        "vnet1": {"ifaces": ["if1"]},
+        "vnet2": {"ifaces": ["if1"]},
+        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
+    }
+
+    def vnet2_handler(self, vnet):
+        ToolsHelper.print_output("/sbin/pfctl -e")
+        ToolsHelper.pf_rules([
+            "scrub fragment reassemble",
+            "pass",
+            "block in inet6 proto icmp6 icmp6-type echoreq",
+        ])
+
+    def check_ping_reply(self, packet):
+        print(packet)
+        return False
+
+    @pytest.mark.require_user("root")
+    def test_dup_frag_hdr(self):
+        "Test packets with duplicate fragment headers"
+        srv_vnet = self.vnet_map["vnet2"]
+
+        # Import in the correct vnet, so at to not confuse Scapy
+        import scapy.all as sp
+
+        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
+            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
+            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
+            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
+
+        # Delay the send so the sniffer is running when we transmit.
+        s = DelayedSend(packet)
+
+        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
+            timeout=3)
+        for p in packets:
+            assert not p.getlayer(sp.ICMPv6EchoReply)