git: 458f475df8e5 - main - unix/dgram: smart socket buffers for one-to-many sockets

From: Gleb Smirnoff <glebius_at_FreeBSD.org>
Date: Fri, 24 Jun 2022 16:10:39 UTC
The branch main has been updated by glebius:

URL: https://cgit.FreeBSD.org/src/commit/?id=458f475df8e5912609c14208c189414a8255c738

commit 458f475df8e5912609c14208c189414a8255c738
Author:     Gleb Smirnoff <glebius@FreeBSD.org>
AuthorDate: 2022-06-24 16:09:11 +0000
Commit:     Gleb Smirnoff <glebius@FreeBSD.org>
CommitDate: 2022-06-24 16:09:11 +0000

    unix/dgram: smart socket buffers for one-to-many sockets
    
    A one-to-many unix/dgram socket is a socket that has been bound
    with bind(2) and can get multiple connections.  A typical example
    is /var/run/log bound by syslogd(8) and receiving multiple
    connections from libc syslog(3) API.  Until now all of these
    connections shared the same receive socket buffer of the bound
    socket.  This made the socket vulnerable to overflow attack.
    See 240d5a9b1ce for a historical attempt to workaround the problem.
    
    This commit creates a per-connection socket buffer for every single
    connected socket and eliminates the problem.  The new behavior will
    optimize seldom writers over frequent writers.  See added test case
    scenarios and code comments for more detailed description of the
    new behavior.
    
    Reviewed by:            markj
    Differential revision:  https://reviews.freebsd.org/D35303
---
 share/man/man4/unix.4       |  61 ++++++++++++-
 sys/kern/uipc_usrreq.c      | 203 +++++++++++++++++++++++++++++++++++++++-----
 sys/sys/sockbuf.h           |  27 +++++-
 tests/sys/kern/unix_dgram.c |  85 ++++++++++++++++---
 4 files changed, 337 insertions(+), 39 deletions(-)

diff --git a/share/man/man4/unix.4 b/share/man/man4/unix.4
index cae33f9e8710..3de5f875fd09 100644
--- a/share/man/man4/unix.4
+++ b/share/man/man4/unix.4
@@ -28,7 +28,7 @@
 .\"     @(#)unix.4	8.1 (Berkeley) 6/9/93
 .\" $FreeBSD$
 .\"
-.Dd August 7, 2021
+.Dd June 24, 2022
 .Dt UNIX 4
 .Os
 .Sh NAME
@@ -393,6 +393,62 @@ socket refer to the
 .Dv LOCAL_CREDS
 socket option.
 .El
+.Sh BUFFERING
+Due to the local nature of the
+.Ux Ns -domain
+sockets, they do not implement send buffers.
+The
+.Xr send 2
+and
+.Xr write 2
+families of system calls attempt to write data to the receive buffer of the
+destination socket.
+.Pp
+The default buffer sizes for
+.Dv SOCK_STREAM
+and
+.Dv SOCK_SEQPACKET
+.Ux Ns -domain
+sockets can be configured with
+.Va net.local.stream
+and
+.Va net.local.seqpacket
+branches of
+.Xr sysctl 3
+MIB respectively.
+Note that setting the send buffer size (sendspace) affects only the maximum
+write size.
+.Pp
+The
+.Ux Ns -domain
+sockets of type
+.Dv SOCK_DGRAM
+are unreliable and always non-blocking for write operations.
+The default receive buffer can be configured with
+.Va net.local.dgram.recvspace .
+The maximum allowed datagram size is limited by
+.Va net.local.dgram.maxdgram .
+A
+.Dv SOCK_DGRAM
+socket that has been bound with
+.Xr bind 2
+can have multiple peers connected
+at the same time.
+The modern
+.Fx
+implementation will allocate
+.Va net.local.dgram.recvspace
+sized private buffers in the receive buffer of the bound socket for every
+connected socket, preventing a situation when a single writer can exhaust
+all of buffer space.
+Messages coming from unconnected sends using
+.Xr sendto 2
+land on the shared buffer of the receiving socket, which has the same
+size limit.
+A side effect of the implementation is that it doesn't guarantee
+that writes from different senders will arrive at the receiver in the same
+chronological order they were sent.
+The order is preserved for writes coming through a particular connection.
 .Sh SEE ALSO
 .Xr connect 2 ,
 .Xr dup 2 ,
@@ -404,7 +460,8 @@ socket option.
 .Xr setsockopt 2 ,
 .Xr socket 2 ,
 .Xr CMSG_DATA 3 ,
-.Xr intro 4
+.Xr intro 4 ,
+.Xr sysctl 8
 .Rs
 .%T "An Introductory 4.3 BSD Interprocess Communication Tutorial"
 .%B PS1
diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c
index 05fde7675b9f..2eb9db632a11 100644
--- a/sys/kern/uipc_usrreq.c
+++ b/sys/kern/uipc_usrreq.c
@@ -532,8 +532,14 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
 
 		case SOCK_DGRAM:
 			STAILQ_INIT(&so->so_rcv.uxdg_mb);
-			sendspace = unpdg_maxdgram;
-			recvspace = unpdg_recvspace;
+			STAILQ_INIT(&so->so_snd.uxdg_mb);
+			TAILQ_INIT(&so->so_rcv.uxdg_conns);
+			/*
+			 * Since send buffer is either bypassed or is a part
+			 * of one-to-many receive buffer, we assign both space
+			 * limits to unpdg_recvspace.
+			 */
+			sendspace = recvspace = unpdg_recvspace;
 			break;
 
 		case SOCK_SEQPACKET:
@@ -858,9 +864,13 @@ uipc_detach(struct socket *so)
 	switch (so->so_type) {
 	case SOCK_DGRAM:
 		/*
-		 * Everything should have been unlinked/freed by unp_dispose().
+		 * Everything should have been unlinked/freed by unp_dispose()
+		 * and/or unp_disconnect().
 		 */
+		MPASS(so->so_rcv.uxdg_peeked == NULL);
 		MPASS(STAILQ_EMPTY(&so->so_rcv.uxdg_mb));
+		MPASS(TAILQ_EMPTY(&so->so_rcv.uxdg_conns));
+		MPASS(STAILQ_EMPTY(&so->so_snd.uxdg_mb));
 	}
 }
 
@@ -1130,6 +1140,24 @@ release:
 	return (error);
 }
 
+/* PF_UNIX/SOCK_DGRAM version of sbspace() */
+static inline bool
+uipc_dgram_sbspace(struct sockbuf *sb, u_int cc, u_int mbcnt)
+{
+	u_int bleft, mleft;
+
+	MPASS(sb->sb_hiwat >= sb->uxdg_cc);
+	MPASS(sb->sb_mbmax >= sb->uxdg_mbcnt);
+
+	if (__predict_false(sb->sb_state & SBS_CANTRCVMORE))
+		return (false);
+
+	bleft = sb->sb_hiwat - sb->uxdg_cc;
+	mleft = sb->sb_mbmax - sb->uxdg_mbcnt;
+
+	return (bleft >= cc && mleft >= mbcnt);
+}
+
 /*
  * PF_UNIX/SOCK_DGRAM send
  *
@@ -1310,15 +1338,46 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 	f->m_pkthdr.memlen = mbcnt;
 	f->m_pkthdr.ctllen = ctl;
 
+	/*
+	 * Destination socket buffer selection.
+	 *
+	 * Unconnected sends, when !(so->so_state & SS_ISCONNECTED) and the
+	 * destination address is supplied, create a temporary connection for
+	 * the run time of the function (see call to unp_connectat() above and
+	 * to unp_disconnect() below).  We distinguish them by condition of
+	 * (addr != NULL).  We intentionally avoid adding 'bool connected' for
+	 * that condition, since, again, through the run time of this code we
+	 * are always connected.  For such "unconnected" sends, the destination
+	 * buffer would be the receive buffer of destination socket so2.
+	 *
+	 * For connected sends, data lands on the send buffer of the sender's
+	 * socket "so".  Then, if we just added the very first datagram
+	 * on this send buffer, we need to add the send buffer on to the
+	 * receiving socket's buffer list.  We put ourselves on top of the
+	 * list.  Such logic gives infrequent senders priority over frequent
+	 * senders.
+	 *
+	 * Note on byte count management. As long as event methods kevent(2),
+	 * select(2) are not protocol specific (yet), we need to maintain
+	 * meaningful values on the receive buffer.  So, the receive buffer
+	 * would accumulate counters from all connected buffers potentially
+	 * having sb_ccc > sb_hiwat or sb_mbcnt > sb_mbmax.
+	 */
 	so2 = unp2->unp_socket;
-	sb = &so2->so_rcv;
+	sb = (addr == NULL) ? &so->so_snd : &so2->so_rcv;
 	SOCK_RECVBUF_LOCK(so2);
-	if (cc <= sbspace(sb)) {
+	if (uipc_dgram_sbspace(sb, cc + ctl, mbcnt)) {
+		if (addr == NULL && STAILQ_EMPTY(&sb->uxdg_mb))
+			TAILQ_INSERT_HEAD(&so2->so_rcv.uxdg_conns, &so->so_snd,
+			    uxdg_clist);
 		STAILQ_INSERT_TAIL(&sb->uxdg_mb, f, m_stailqpkt);
-		sb->sb_acc += cc + ctl;
-		sb->sb_ccc += cc + ctl;
-		sb->sb_ctl += ctl;
-		sb->sb_mbcnt += mbcnt;
+		sb->uxdg_cc += cc + ctl;
+		sb->uxdg_ctl += ctl;
+		sb->uxdg_mbcnt += mbcnt;
+		so2->so_rcv.sb_acc += cc + ctl;
+		so2->so_rcv.sb_ccc += cc + ctl;
+		so2->so_rcv.sb_ctl += ctl;
+		so2->so_rcv.sb_mbcnt += mbcnt;
 		sorwakeup_locked(so2);
 		f = NULL;
 	} else {
@@ -1350,19 +1409,23 @@ out:
 }
 
 /*
- * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK
+ * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK.
+ * The mbuf has already been unlinked from the uxdg_mb of socket buffer
+ * and needs to be linked onto uxdg_peeked of receive socket buffer.
  */
 static int
-uipc_peek_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
-    struct mbuf **controlp, int *flagsp)
+uipc_peek_dgram(struct socket *so, struct mbuf *m, struct sockaddr **psa,
+    struct uio *uio, struct mbuf **controlp, int *flagsp)
 {
-	struct mbuf *m;
 	ssize_t len;
 	int error;
 
+	so->so_rcv.uxdg_peeked = m;
+	so->so_rcv.uxdg_cc += m->m_pkthdr.len;
+	so->so_rcv.uxdg_ctl += m->m_pkthdr.ctllen;
+	so->so_rcv.uxdg_mbcnt += m->m_pkthdr.memlen;
 	SOCK_RECVBUF_UNLOCK(so);
 
-	m = STAILQ_FIRST(&so->so_rcv.uxdg_mb);
 	KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type));
 	if (psa != NULL)
 		*psa = sodupsockaddr(mtod(m, struct sockaddr *), M_WAITOK);
@@ -1409,6 +1472,7 @@ static int
 uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
     struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
 {
+	struct sockbuf *sb = NULL;
 	struct mbuf *m;
 	int flags, error;
 	ssize_t len;
@@ -1430,13 +1494,15 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
 		return (error);
 
 	/*
-	 * Loop blocking while waiting for a datagram.
+	 * Loop blocking while waiting for a datagram.  Prioritize connected
+	 * peers over unconnected sends.  Set sb to selected socket buffer
+	 * containing an mbuf on exit from the wait loop.  A datagram that
+	 * had already been peeked at has top priority.
 	 */
 	SOCK_RECVBUF_LOCK(so);
-	while ((m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) {
-		KASSERT(sbavail(&so->so_rcv) == 0,
-		    ("soreceive_dgram: sb_mb NULL but sbavail %u",
-		    sbavail(&so->so_rcv)));
+	while ((m = so->so_rcv.uxdg_peeked) == NULL &&
+	    (sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) == NULL &&
+	    (m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) {
 		if (so->so_error) {
 			error = so->so_error;
 			so->so_error = 0;
@@ -1463,16 +1529,34 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
 		}
 	}
 
+	if (sb == NULL)
+		sb = &so->so_rcv;
+	else if (m == NULL)
+		m = STAILQ_FIRST(&sb->uxdg_mb);
+	else
+		MPASS(m == so->so_rcv.uxdg_peeked);
+
+	MPASS(sb->uxdg_cc > 0);
 	M_ASSERTPKTHDR(m);
 	KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type));
 
 	if (uio->uio_td)
 		uio->uio_td->td_ru.ru_msgrcv++;
 
+	if (__predict_true(m != so->so_rcv.uxdg_peeked)) {
+		STAILQ_REMOVE_HEAD(&sb->uxdg_mb, m_stailqpkt);
+		if (STAILQ_EMPTY(&sb->uxdg_mb) && sb != &so->so_rcv)
+			TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist);
+	} else
+		so->so_rcv.uxdg_peeked = NULL;
+
+	sb->uxdg_cc -= m->m_pkthdr.len;
+	sb->uxdg_ctl -= m->m_pkthdr.ctllen;
+	sb->uxdg_mbcnt -= m->m_pkthdr.memlen;
+
 	if (__predict_false(flags & MSG_PEEK))
-		return (uipc_peek_dgram(so, psa, uio, controlp, flagsp));
+		return (uipc_peek_dgram(so, m, psa, uio, controlp, flagsp));
 
-	STAILQ_REMOVE_HEAD(&so->so_rcv.uxdg_mb, m_stailqpkt);
 	so->so_rcv.sb_acc -= m->m_pkthdr.len;
 	so->so_rcv.sb_ccc -= m->m_pkthdr.len;
 	so->so_rcv.sb_ctl -= m->m_pkthdr.ctllen;
@@ -2103,6 +2187,7 @@ static void
 unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
 {
 	struct socket *so, *so2;
+	struct mbuf *m = NULL;
 #ifdef INVARIANTS
 	struct unpcb *unptmp;
 #endif
@@ -2117,6 +2202,39 @@ unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
 	so2 = unp2->unp_socket;
 	switch (unp->unp_socket->so_type) {
 	case SOCK_DGRAM:
+		/*
+		 * Remove our send socket buffer from the peer's receive buffer.
+		 * Move the data to the receive buffer only if it is empty.
+		 * This is a protection against a scenario where a peer
+		 * connects, floods and disconnects, effectively blocking
+		 * sendto() from unconnected sockets.
+		 */
+		SOCK_RECVBUF_LOCK(so2);
+		if (!STAILQ_EMPTY(&so->so_snd.uxdg_mb)) {
+			TAILQ_REMOVE(&so2->so_rcv.uxdg_conns, &so->so_snd,
+			    uxdg_clist);
+			if (__predict_true((so2->so_rcv.sb_state &
+			    SBS_CANTRCVMORE) == 0) &&
+			    STAILQ_EMPTY(&so2->so_rcv.uxdg_mb)) {
+				STAILQ_CONCAT(&so2->so_rcv.uxdg_mb,
+				    &so->so_snd.uxdg_mb);
+				so2->so_rcv.uxdg_cc += so->so_snd.uxdg_cc;
+				so2->so_rcv.uxdg_ctl += so->so_snd.uxdg_ctl;
+				so2->so_rcv.uxdg_mbcnt += so->so_snd.uxdg_mbcnt;
+			} else {
+				m = STAILQ_FIRST(&so->so_snd.uxdg_mb);
+				STAILQ_INIT(&so->so_snd.uxdg_mb);
+				so2->so_rcv.sb_acc -= so->so_snd.uxdg_cc;
+				so2->so_rcv.sb_ccc -= so->so_snd.uxdg_cc;
+				so2->so_rcv.sb_ctl -= so->so_snd.uxdg_ctl;
+				so2->so_rcv.sb_mbcnt -= so->so_snd.uxdg_mbcnt;
+			}
+			/* Note: so may reconnect. */
+			so->so_snd.uxdg_cc = 0;
+			so->so_snd.uxdg_ctl = 0;
+			so->so_snd.uxdg_mbcnt = 0;
+		}
+		SOCK_RECVBUF_UNLOCK(so2);
 		UNP_REF_LIST_LOCK();
 #ifdef INVARIANTS
 		LIST_FOREACH(unptmp, &unp2->unp_refs, unp_reflink) {
@@ -2156,6 +2274,11 @@ unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
 		if (!unp_pcb_rele(unp2))
 			UNP_PCB_UNLOCK(unp2);
 	}
+
+	if (m != NULL) {
+		unp_scan(m, unp_freerights);
+		m_freem(m);
+	}
 }
 
 /*
@@ -2980,6 +3103,7 @@ unp_restore_undead_ref(struct filedescent **fdep, int fdcount)
 static void
 unp_scan_socket(struct socket *so, void (*op)(struct filedescent **, int))
 {
+	struct sockbuf *sb;
 
 	SOCK_LOCK_ASSERT(so);
 
@@ -2990,6 +3114,9 @@ unp_scan_socket(struct socket *so, void (*op)(struct filedescent **, int))
 	switch (so->so_type) {
 	case SOCK_DGRAM:
 		unp_scan(STAILQ_FIRST(&so->so_rcv.uxdg_mb), op);
+		unp_scan(so->so_rcv.uxdg_peeked, op);
+		TAILQ_FOREACH(sb, &so->so_rcv.uxdg_conns, uxdg_clist)
+			unp_scan(STAILQ_FIRST(&sb->uxdg_mb), op);
 		break;
 	case SOCK_STREAM:
 	case SOCK_SEQPACKET:
@@ -3168,7 +3295,7 @@ unp_gc(__unused void *arg, int pending)
 static void
 unp_dispose(struct socket *so)
 {
-	struct sockbuf *sb = &so->so_rcv;
+	struct sockbuf *sb;
 	struct unpcb *unp;
 	struct mbuf *m;
 
@@ -3185,14 +3312,46 @@ unp_dispose(struct socket *so)
 	SOCK_RECVBUF_LOCK(so);
 	switch (so->so_type) {
 	case SOCK_DGRAM:
+		while ((sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) != NULL) {
+			STAILQ_CONCAT(&so->so_rcv.uxdg_mb, &sb->uxdg_mb);
+			TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist);
+			/* Note: socket of sb may reconnect. */
+			sb->uxdg_cc = sb->uxdg_ctl = sb->uxdg_mbcnt = 0;
+		}
+		sb = &so->so_rcv;
+		if (sb->uxdg_peeked != NULL) {
+			STAILQ_INSERT_HEAD(&sb->uxdg_mb, sb->uxdg_peeked,
+			    m_stailqpkt);
+			sb->uxdg_peeked = NULL;
+		}
 		m = STAILQ_FIRST(&sb->uxdg_mb);
 		STAILQ_INIT(&sb->uxdg_mb);
 		/* XXX: our shortened sbrelease() */
 		(void)chgsbsize(so->so_cred->cr_uidinfo, &sb->sb_hiwat, 0,
 		    RLIM_INFINITY);
+		/*
+		 * XXXGL Mark sb with SBS_CANTRCVMORE.  This is needed to
+		 * prevent uipc_sosend_dgram() or unp_disconnect() adding more
+		 * data to the socket.
+		 * We are now in dom_dispose and it could be a call from
+		 * soshutdown() or from the final sofree().  The sofree() case
+		 * is simple as it guarantees that no more sends will happen,
+		 * however we can race with unp_disconnect() from our peer.
+		 * The shutdown(2) case is more exotic.  It would call into
+		 * dom_dispose() only if socket is SS_ISCONNECTED.  This is
+		 * possible if we did connect(2) on this socket and we also
+		 * had it bound with bind(2) and receive connections from other
+		 * sockets.  Because soshutdown() violates POSIX (see comment
+		 * there) we will end up here shutting down our receive side.
+		 * Of course this will have affect not only on the peer we
+		 * connect(2)ed to, but also on all of the peers who had
+		 * connect(2)ed to us.  Their sends would end up with ENOBUFS.
+		 */
+		sb->sb_state |= SBS_CANTRCVMORE;
 		break;
 	case SOCK_STREAM:
 	case SOCK_SEQPACKET:
+		sb = &so->so_rcv;
 		m = sbcut_locked(sb, sb->sb_ccc);
 		KASSERT(sb->sb_ccc == 0 && sb->sb_mb == 0 && sb->sb_mbcnt == 0,
 		    ("%s: ccc %u mb %p mbcnt %u", __func__,
diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h
index a1fd65d1a9e5..7075cab650da 100644
--- a/sys/sys/sockbuf.h
+++ b/sys/sys/sockbuf.h
@@ -136,10 +136,35 @@ struct sockbuf {
 		/*
 		 * PF_UNIX/SOCK_DGRAM
 		 *
-		 * Local protocol, thus any socket buffer is a receive buffer.
+		 * Local protocol, thus we should buffer on the receive side
+		 * only.  However, in one to many configuration we don't want
+		 * a single receive buffer to be shared.  So we would link
+		 * send buffers onto receive buffer.  All the fields are locked
+		 * by the receive buffer lock.
 		 */
 		struct {
+			/*
+			 * For receive buffer: own queue of this buffer for
+			 * unconnected sends.  For send buffer: queue lended
+			 * to the peer receive buffer, to isolate ourselves
+			 * from other senders.
+			 */
 			STAILQ_HEAD(, mbuf)	uxdg_mb;
+			/* For receive buffer: datagram seen via MSG_PEEK. */
+			struct mbuf		*uxdg_peeked;
+			/*
+			 * For receive buffer: queue of send buffers of
+			 * connected peers.  For send buffer: linkage on
+			 * connected peer receive buffer queue.
+			 */
+			union {
+				TAILQ_HEAD(, sockbuf)	uxdg_conns;
+				TAILQ_ENTRY(sockbuf)	uxdg_clist;
+			};
+			/* Counters for this buffer uxdg_mb chain + peeked. */
+			u_int uxdg_cc;
+			u_int uxdg_ctl;
+			u_int uxdg_mbcnt;
 		};
 	};
 };
diff --git a/tests/sys/kern/unix_dgram.c b/tests/sys/kern/unix_dgram.c
index 2279eed7b604..891cc4a58f5c 100644
--- a/tests/sys/kern/unix_dgram.c
+++ b/tests/sys/kern/unix_dgram.c
@@ -170,15 +170,16 @@ ATF_TC_BODY(basic, tc)
 ATF_TC_WITHOUT_HEAD(one2many);
 ATF_TC_BODY(one2many, tc)
 {
-	int one, many[2], two;
-	char buf[1024];
+	int one, many[3], two;
+#define	BUFSIZE	1024
+	char buf[BUFSIZE], goodboy[BUFSIZE], flooder[BUFSIZE], notconn[BUFSIZE];
 
 	/* Establish one to many connection. */
 	ATF_REQUIRE((one = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
 	ATF_REQUIRE(bind(one, (struct sockaddr *)&sun, sizeof(sun)) == 0);
 	/* listen(2) shall fail. */
 	ATF_REQUIRE(listen(one, -1) != 0);
-	for (int i = 0; i < 2; i++) {
+	for (int i = 0; i < 3; i++) {
 		ATF_REQUIRE((many[i] = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
 		ATF_REQUIRE(connect(many[i], (struct sockaddr *)&sun,
 		    sizeof(sun)) == 0);
@@ -198,22 +199,78 @@ ATF_TC_BODY(one2many, tc)
 	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 42);
 
 	/*
-	 * Sending from an unconnected socket to a bound socket.  Connection is
-	 * created for the duration of the syscall.
+	 * Interaction between concurrent senders. New feature in FreeBSD 14.
+	 *
+	 * One sender can not fill the receive side.  Other senders can
+	 * continue operation.  Senders who don't fill their buffers are
+	 * prioritized over flooders.  Connected senders are prioritized over
+	 * unconnected.
+	 *
+	 * Disconnecting a sender that has queued data optionally preserves
+	 * the data.  Allow the data to migrate to peers buffer only if the
+	 * latter is empty.  Otherwise discard it, to prevent against
+	 * connect-fill-close attack.
 	 */
+#define	FLOODER	13	/* for connected flooder on many[0] */
+#define	GOODBOY	42	/* for a good boy on many[1] */
+#define	NOTCONN	66	/* for sendto(2) via two */
+	goodboy[0] = GOODBOY;
+	flooder[0] = FLOODER;
+	notconn[0] = NOTCONN;
+
+	/* Connected priority over sendto(2). */
 	ATF_REQUIRE((two = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
-	ATF_REQUIRE(sendto(two, buf, 43, 0, (struct sockaddr *)&sun,
-	    sizeof(sun)) == 43);
-	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 43);
+	ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun,
+	    sizeof(sun)) == BUFSIZE);
+	ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == GOODBOY);	/* message from good boy comes first */
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == NOTCONN);	/* only then message from sendto(2) */
 
-	/* One sender can fill the receive side.
-	 * Current behavior which needs improvement.
-	 */
-	fill(many[0], buf, sizeof(buf));
-	ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == -1);
+	/* Casual sender priority over a flooder. */
+	fill(many[0], flooder, sizeof(flooder));
+	ATF_REQUIRE(send(many[0], flooder, BUFSIZE, 0) == -1);
 	ATF_REQUIRE(errno == ENOBUFS);
+	ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == GOODBOY);	/* message from good boy comes first */
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == FLOODER);	/* only then message from flooder */
+
+	/* Once seen, a message can't be deprioritized by any other message. */
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == FLOODER); /* message from the flooder seen */
+	ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == FLOODER); /* should be the same message */
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == FLOODER); /* now we read it out... */
 	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
-	ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == GOODBOY); /* ... and next one is the good boy */
+
+	/* Disconnect in presence of data from not connected. */
+	ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun,
+	    sizeof(sun)) == BUFSIZE);
+	close(many[0]);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == NOTCONN);	/* message from sendto() */
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_DONTWAIT) == -1);
+	ATF_REQUIRE(errno == EAGAIN);	/* data from many[0] discarded */
+
+	/* Disconnect in absence of data from not connected. */
+	ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+	close(many[1]);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+	ATF_REQUIRE(buf[0] == GOODBOY);	/* message from many[1] preserved */
+
+	/* Check that nothing leaks on close(2). */
+	ATF_REQUIRE(send(many[2], buf, 42, 0) == 42);
+	ATF_REQUIRE(send(many[2], buf, 42, 0) == 42);
+	ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == 42);
+	ATF_REQUIRE(sendto(two, notconn, 42, 0, (struct sockaddr *)&sun,
+	    sizeof(sun)) == 42);
+	close(one);
 }
 
 /*