git: 5716d902ae1d - main - Revert "unix: new implementation of unix/stream & unix/seqpacket"

From: Gleb Smirnoff <glebius_at_FreeBSD.org>
Date: Tue, 09 Apr 2024 20:19:34 UTC
The branch main has been updated by glebius:

URL: https://cgit.FreeBSD.org/src/commit/?id=5716d902ae1d2dbd396f7808c12d7b0d285f2c9d

commit 5716d902ae1d2dbd396f7808c12d7b0d285f2c9d
Author:     Gleb Smirnoff <glebius@FreeBSD.org>
AuthorDate: 2024-04-09 20:15:16 +0000
Commit:     Gleb Smirnoff <glebius@FreeBSD.org>
CommitDate: 2024-04-09 20:15:47 +0000

    Revert "unix: new implementation of unix/stream & unix/seqpacket"
    
    The regressions in aio(4) and kernel RPC aren't a 5 minute problem.
    
    This reverts commit d80a97def9a1db6f07f5d2e68f7ad62b27918947.
    This reverts commit d1cbb17a873c787a527316bbb27551e97d5ad30c.
    This reverts commit fb8a8333b481cc4256d0b3f0b5b4feaa4594e01f.
---
 sys/kern/uipc_usrreq.c | 959 +++++++++++++++++--------------------------------
 sys/sys/sockbuf.h      |   7 -
 2 files changed, 320 insertions(+), 646 deletions(-)

diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c
index 4a1c480c43fa..6e83e2be6f05 100644
--- a/sys/kern/uipc_usrreq.c
+++ b/sys/kern/uipc_usrreq.c
@@ -5,7 +5,7 @@
  *	The Regents of the University of California. All Rights Reserved.
  * Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved.
  * Copyright (c) 2018 Matthew Macy
- * Copyright (c) 2022-2024 Gleb Smirnoff <glebius@FreeBSD.org>
+ * Copyright (c) 2022 Gleb Smirnoff <glebius@FreeBSD.org>
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -141,14 +141,11 @@ static struct timeout_task unp_gc_task;
 static struct task	unp_defer_task;
 
 /*
- * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer,
- * however the notion of send buffer still makes sense with them.  Its size is
- * the amount of space that a send(2) syscall may copyin(9) before checking
- * with the receive buffer of a peer.  Although not linked anywhere yet,
- * pointed to by a stack variable, effectively it is a buffer that needs to be
- * sized.
+ * Both send and receive buffers are allocated PIPSIZ bytes of buffering for
+ * stream sockets, although the total for sender and receiver is actually
+ * only PIPSIZ.
  *
- * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size,
+ * Datagram sockets really use the sendspace as the maximum datagram size,
  * and don't really want to reserve the sendspace.  Their recvspace should be
  * large enough for at least one max-size datagram plus address.
  */
@@ -159,7 +156,7 @@ static u_long	unpst_sendspace = PIPSIZ;
 static u_long	unpst_recvspace = PIPSIZ;
 static u_long	unpdg_maxdgram = 8*1024;	/* support 8KB syslog msgs */
 static u_long	unpdg_recvspace = 16*1024;
-static u_long	unpsp_sendspace = PIPSIZ;
+static u_long	unpsp_sendspace = PIPSIZ;	/* really max datagram size */
 static u_long	unpsp_recvspace = PIPSIZ;
 
 static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
@@ -303,12 +300,13 @@ static void	unp_gc(__unused void *, int);
 static void	unp_scan(struct mbuf *, void (*)(struct filedescent **, int));
 static void	unp_discard(struct file *);
 static void	unp_freerights(struct filedescent **, int);
-static int	unp_internalize(struct mbuf *, struct mchain *,
-		    struct thread *);
+static int	unp_internalize(struct mbuf **, struct thread *,
+		    struct mbuf **, u_int *, u_int *);
 static void	unp_internalize_fp(struct file *);
 static int	unp_externalize(struct mbuf *, struct mbuf **, int);
 static int	unp_externalize_fp(struct file *);
-static void	unp_addsockcred(struct thread *, struct mchain *, int);
+static struct mbuf	*unp_addsockcred(struct thread *, struct mbuf *,
+		    int, struct mbuf **, u_int *, u_int *);
 static void	unp_process_defers(void * __unused, int);
 
 static void
@@ -451,7 +449,6 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
 		case SOCK_STREAM:
 			sendspace = unpst_sendspace;
 			recvspace = unpst_recvspace;
-			STAILQ_INIT(&so->so_rcv.sb_mbq);
 			break;
 
 		case SOCK_DGRAM:
@@ -469,7 +466,6 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
 		case SOCK_SEQPACKET:
 			sendspace = unpsp_sendspace;
 			recvspace = unpsp_recvspace;
-			STAILQ_INIT(&so->so_rcv.sb_mbq);
 			break;
 
 		default:
@@ -801,10 +797,6 @@ uipc_detach(struct socket *so)
 		taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1);
 
 	switch (so->so_type) {
-	case SOCK_STREAM:
-	case SOCK_SEQPACKET:
-		MPASS(STAILQ_EMPTY(&so->so_rcv.sb_mbq));
-		break;
 	case SOCK_DGRAM:
 		/*
 		 * Everything should have been unlinked/freed by unp_dispose()
@@ -860,10 +852,6 @@ uipc_listen(struct socket *so, int backlog, struct thread *td)
 	error = solisten_proto_check(so);
 	if (error == 0) {
 		cru2xt(td, &unp->unp_peercred);
-		(void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_snd.sb_hiwat,
-		    0, RLIM_INFINITY);
-		(void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_rcv.sb_hiwat,
-		    0, RLIM_INFINITY);
 		solisten_proto(so, backlog);
 	}
 	SOCK_UNLOCK(so);
@@ -897,562 +885,187 @@ uipc_peeraddr(struct socket *so, struct sockaddr *ret)
 	return (0);
 }
 
-/*
- * pr_sosend() called with mbuf instead of uio is a kernel thread.  NFS,
- * netgraph(4) and other subsystems can call into socket code.  The
- * function will condition the mbuf so that it can be safely put onto socket
- * buffer and calculate its char count and mbuf count.
- *
- * Note: we don't support receiving control data from a kernel thread.  Our
- * pr_sosend methods have MPASS() to check that.  This may change.
- */
-static void
-uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc)
-{
-
-	M_ASSERTPKTHDR(m);
-
-	m_clrprotoflags(m);
-	m_tag_delete_chain(m, NULL);
-	m->m_pkthdr.rcvif = NULL;
-	m->m_pkthdr.flowid = 0;
-	m->m_pkthdr.csum_flags = 0;
-	m->m_pkthdr.fibnum = 0;
-	m->m_pkthdr.rsstype = 0;
-
-	mc_init_m(mc, m);
-	MPASS(m->m_pkthdr.len == mc->mc_len);
-}
-
-#ifdef SOCKBUF_DEBUG
-static inline void
-uipc_stream_sbcheck(struct sockbuf *sb)
+static int
+uipc_rcvd(struct socket *so, int flags)
 {
-	struct mbuf *d;
-	u_int dcc, dctl, dmbcnt;
-
-	dcc = dctl = dmbcnt = 0;
-	STAILQ_FOREACH(d, &sb->sb_mbq, m_stailq) {
-		if (d->m_type == MT_CONTROL)
-			dctl += d->m_len;
-		else if (d->m_type == MT_DATA)
-			dcc +=  d->m_len;
-		else
-			MPASS(0);
-		dmbcnt += MSIZE;
-		if (d->m_flags & M_EXT)
-			dmbcnt += d->m_ext.ext_size;
-		if (d->m_stailq.stqe_next == NULL)
-			MPASS(sb->sb_mbq.stqh_last == &d->m_stailq.stqe_next);
-	}
-	MPASS(dcc == sb->sb_acc);
-	MPASS(dcc == sb->sb_ccc);
-	MPASS(dctl == sb->sb_ctl);
-	MPASS(dmbcnt == sb->sb_mbcnt);
-}
-#define	UIPC_STREAM_SBCHECK(sb)	uipc_stream_sbcheck(sb)
-#else
-#define	UIPC_STREAM_SBCHECK(sb)	do {} while (0)
-#endif
+	struct unpcb *unp, *unp2;
+	struct socket *so2;
+	u_int mbcnt, sbcc;
 
-/*
- * uipc_stream_sbspace() returns how much a writer can send, limited by char
- * count or mbuf memory use, whatever ends first.
- *
- * XXXGL: sb_mbcnt may overcommit sb_mbmax in case if previous write observed
- * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended
- * up with 'mc_mlen > mbspace'.  A typical scenario would be a full buffer with
- * writer trying to push in a large write, and a slow reader, that reads just
- * a few bytes at a time.  In that case writer will keep creating new mbufs
- * with mc_split().  These mbufs will carry little chars, but will all point at
- * the same cluster, thus each adding cluster size to sb_mbcnt.  This means we
- * will count same cluster many times potentially underutilizing socket buffer.
- * We aren't optimizing towards ineffective readers.  Classic socket buffer had
- * the same "feature".
- */
-static inline u_int
-uipc_stream_sbspace(struct sockbuf *sb)
-{
-	u_int space, mbspace;
+	unp = sotounpcb(so);
+	KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
+	KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
+	    ("%s: socktype %d", __func__, so->so_type));
 
-	MPASS(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl);
-	space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl;
-	if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt))
-		mbspace = sb->sb_mbmax - sb->sb_mbcnt;
-	else
+	/*
+	 * Adjust backpressure on sender and wakeup any waiting to write.
+	 *
+	 * The unp lock is acquired to maintain the validity of the unp_conn
+	 * pointer; no lock on unp2 is required as unp2->unp_socket will be
+	 * static as long as we don't permit unp2 to disconnect from unp,
+	 * which is prevented by the lock on unp.  We cache values from
+	 * so_rcv to avoid holding the so_rcv lock over the entire
+	 * transaction on the remote so_snd.
+	 */
+	SOCKBUF_LOCK(&so->so_rcv);
+	mbcnt = so->so_rcv.sb_mbcnt;
+	sbcc = sbavail(&so->so_rcv);
+	SOCKBUF_UNLOCK(&so->so_rcv);
+	/*
+	 * There is a benign race condition at this point.  If we're planning to
+	 * clear SB_STOP, but uipc_send is called on the connected socket at
+	 * this instant, it might add data to the sockbuf and set SB_STOP.  Then
+	 * we would erroneously clear SB_STOP below, even though the sockbuf is
+	 * full.  The race is benign because the only ill effect is to allow the
+	 * sockbuf to exceed its size limit, and the size limits are not
+	 * strictly guaranteed anyway.
+	 */
+	UNP_PCB_LOCK(unp);
+	unp2 = unp->unp_conn;
+	if (unp2 == NULL) {
+		UNP_PCB_UNLOCK(unp);
 		return (0);
-
-	return (min(space, mbspace));
+	}
+	so2 = unp2->unp_socket;
+	SOCKBUF_LOCK(&so2->so_snd);
+	if (sbcc < so2->so_snd.sb_hiwat && mbcnt < so2->so_snd.sb_mbmax)
+		so2->so_snd.sb_flags &= ~SB_STOP;
+	sowwakeup_locked(so2);
+	UNP_PCB_UNLOCK(unp);
+	return (0);
 }
 
 static int
-uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr,
-    struct uio *uio, struct mbuf *m, struct mbuf *c, int flags,
-    struct thread *td)
+uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam,
+    struct mbuf *control, struct thread *td)
 {
 	struct unpcb *unp, *unp2;
 	struct socket *so2;
-	struct sockbuf *sb;
-	struct mchain mc, cmc;
-	ssize_t resid, sent;
-	bool nonblock, eor;
+	u_int mbcnt, sbcc;
 	int error;
 
-	MPASS((uio != NULL && m == NULL) || (m != NULL && uio == NULL));
-	MPASS(m == NULL || c == NULL);
-
-	if (__predict_false(flags & MSG_OOB))
-		return (EOPNOTSUPP);
-
-	nonblock = (so->so_state & SS_NBIO) ||
-	    (flags & (MSG_DONTWAIT | MSG_NBIO));
-	eor = flags & MSG_EOR;
+	unp = sotounpcb(so);
+	KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
+	KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
+	    ("%s: socktype %d", __func__, so->so_type));
 
-	mc = MCHAIN_INITIALIZER(&mc);
-	cmc = MCHAIN_INITIALIZER(&cmc);
-	sent = 0;
+	error = 0;
+	if (flags & PRUS_OOB) {
+		error = EOPNOTSUPP;
+		goto release;
+	}
+	if (control != NULL &&
+	    (error = unp_internalize(&control, td, NULL, NULL, NULL)))
+		goto release;
 
-	if (m == NULL) {
-		if (c != NULL && (error = unp_internalize(c, &cmc, td)))
+	unp2 = NULL;
+	if ((so->so_state & SS_ISCONNECTED) == 0) {
+		if (nam != NULL) {
+			if ((error = unp_connect(so, nam, td)) != 0)
+				goto out;
+		} else {
+			error = ENOTCONN;
 			goto out;
-		/*
-		 * Optimization for a case when our send fits into the receive
-		 * buffer - do the copyin before taking any locks, sized to our
-		 * send buffer.  Later copyins will also take into account
-		 * space in the peer's receive buffer.
-		 */
-		resid = uio->uio_resid;
-		error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK,
-		    eor ? M_EOR : 0);
-		if (__predict_false(error))
-			goto out2;
-	} else
-		uipc_reset_kernel_mbuf(m, &mc);
-
-	error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
-	if (error)
-		goto out2;
+		}
+	}
 
-	unp = sotounpcb(so);
 	UNP_PCB_LOCK(unp);
-	unp2 = unp_pcb_lock_peer(unp);
-	if (__predict_false(so->so_error != 0)) {
-		error = so->so_error;
-		so->so_error = 0;
-		UNP_PCB_UNLOCK(unp);
-		if (unp2 != NULL)
-			UNP_PCB_UNLOCK(unp2);
-		goto out3;
-	}
-	if (__predict_false(unp2 == NULL)) {
-		/*
-		 * Different error code for a previously connected socket and
-		 * a never connected one.  The SS_ISDISCONNECTED is set in the
-		 * unp_soisdisconnected() and is synchronized by the pcb lock.
-		 */
-		error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN;
+	if ((unp2 = unp_pcb_lock_peer(unp)) == NULL) {
 		UNP_PCB_UNLOCK(unp);
-		goto out3;
+		error = ENOTCONN;
+		goto out;
+	} else if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
+		unp_pcb_unlock_pair(unp, unp2);
+		error = EPIPE;
+		goto out;
 	}
 	UNP_PCB_UNLOCK(unp);
-
+	if ((so2 = unp2->unp_socket) == NULL) {
+		UNP_PCB_UNLOCK(unp2);
+		error = ENOTCONN;
+		goto out;
+	}
+	SOCKBUF_LOCK(&so2->so_rcv);
 	if (unp2->unp_flags & UNP_WANTCRED_MASK) {
 		/*
 		 * Credentials are passed only once on SOCK_STREAM and
 		 * SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or
 		 * forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS).
 		 */
-		unp_addsockcred(td, &cmc, unp2->unp_flags);
+		control = unp_addsockcred(td, control, unp2->unp_flags, NULL,
+		    NULL, NULL);
 		unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT;
 	}
 
 	/*
-	 * Cycle through the data to send and available space in the peer's
-	 * receive buffer.  Put a reference on the peer socket, so that it
-	 * doesn't get freed while we sbwait().  If peer goes away, we will
-	 * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's
-	 * socket destruction.
+	 * Send to paired receive port and wake up readers.  Don't
+	 * check for space available in the receive buffer if we're
+	 * attaching ancillary data; Unix domain sockets only check
+	 * for space in the sending sockbuf, and that check is
+	 * performed one level up the stack.  At that level we cannot
+	 * precisely account for the amount of buffer space used
+	 * (e.g., because control messages are not yet internalized).
 	 */
-	so2 = unp2->unp_socket;
-	soref(so2);
-	UNP_PCB_UNLOCK(unp2);
-	sb = &so2->so_rcv;
-	while (mc.mc_len + cmc.mc_len > 0) {
-		struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext);
-		u_int space;
+	switch (so->so_type) {
+	case SOCK_STREAM:
+		if (control != NULL) {
+			sbappendcontrol_locked(&so2->so_rcv,
+			    m->m_len > 0 ?  m : NULL, control, flags);
+			control = NULL;
+		} else
+			sbappend_locked(&so2->so_rcv, m, flags);
+		break;
 
-		SOCK_RECVBUF_LOCK(so2);
-restart:
-		UIPC_STREAM_SBCHECK(sb);
-		if (__predict_false(cmc.mc_len > sb->sb_hiwat)) {
-			SOCK_RECVBUF_UNLOCK(so2);
-			error = EMSGSIZE;
-			goto out4;
-		}
-		if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
-			SOCK_RECVBUF_UNLOCK(so2);
-			error = EPIPE;
-			goto out4;
-		}
-		/*
-		 * Wait on the peer socket receive buffer until we have enough
-		 * space to put at least control.  The data is a stream and can
-		 * be put partially, but control is really a datagram.
-		 */
-		space = uipc_stream_sbspace(sb);
-		if (space < sb->sb_lowat || space < cmc.mc_len) {
-			if (nonblock) {
-				SOCK_RECVBUF_UNLOCK(so2);
-				error = EWOULDBLOCK;
-				goto out4;
-			}
-			if ((error = sbwait(so2, SO_RCV)) != 0) {
-				SOCK_RECVBUF_UNLOCK(so2);
-				goto out4;
-			} else
-				goto restart;
-		}
-		MPASS(space >= cmc.mc_len);
-		space -= cmc.mc_len;
-		if (space == 0) {
-			/* There is space only to send control. */
-			MPASS(!STAILQ_EMPTY(&cmc.mc_q));
-			mcnext = mc;
-			mc = MCHAIN_INITIALIZER(&mc);
-		} else if (space < mc.mc_len) {
-			/* Not enough space. */
-			if (__predict_false(mc_split(&mc, &mcnext, space,
-			    M_NOWAIT) == ENOMEM)) {
-				/*
-				 * If allocation failed use M_WAITOK and merge
-				 * the chain back.  Next time mc_split() will
-				 * easily split at the same place.  Only if we
-				 * race with setsockopt(SO_RCVBUF) shrinking
-				 * sb_hiwat can this happen more than once.
-				 */
-				SOCK_RECVBUF_UNLOCK(so2);
-				(void)mc_split(&mc, &mcnext, space, M_WAITOK);
-				mc_concat(&mc, &mcnext);
-				SOCK_RECVBUF_LOCK(so2);
-				goto restart;
-			}
-			MPASS(mc.mc_len == space);
-		}
-		if (!STAILQ_EMPTY(&cmc.mc_q)) {
-			STAILQ_CONCAT(&sb->sb_mbq, &cmc.mc_q);
-			sb->sb_ctl += cmc.mc_len;
-			sb->sb_mbcnt += cmc.mc_mlen;
-			cmc.mc_len = 0;
-		}
-		sent += mc.mc_len;
-		sb->sb_acc += mc.mc_len;
-		sb->sb_ccc += mc.mc_len;
-		sb->sb_mbcnt += mc.mc_mlen;
-		STAILQ_CONCAT(&sb->sb_mbq, &mc.mc_q);
-		UIPC_STREAM_SBCHECK(sb);
-		space = uipc_stream_sbspace(sb);
-		sorwakeup_locked(so2);
-		mc = mcnext;
-		if (STAILQ_EMPTY(&mc.mc_q) &&
-		    uio != NULL && uio->uio_resid > 0) {
-			/*
-			 * Copyin sum of peer's receive buffer space and our
-			 * sb_hiwat, which is our virtual send buffer size.
-			 * See comment above unpst_sendspace declaration.
-			 * We are reading sb_hiwat locklessly, cause a) we
-			 * don't care about an application that does send(2)
-			 * and setsockopt(2) racing internally, and for an
-			 * application that does this in sequence we will see
-			 * the correct value cause sbsetopt() uses buffer lock
-			 * and we also have already acquired it at least once.
-			 */
-			error = mc_uiotomc(&mc, uio, space +
-			    atomic_load_int(&so->so_snd.sb_hiwat), 0, M_WAITOK,
-			    eor ? M_EOR : 0);
-			if (__predict_false(error))
-				goto out4;
-		}
+	case SOCK_SEQPACKET:
+		if (sbappendaddr_nospacecheck_locked(&so2->so_rcv,
+		    &sun_noname, m, control))
+			control = NULL;
+		break;
 	}
 
-	MPASS(STAILQ_EMPTY(&mc.mc_q));
-
-	td->td_ru.ru_msgsnd++;
-out4:
-	sorele(so2);
-out3:
-	SOCK_IO_SEND_UNLOCK(so);
-out2:
-	if (!mc_empty(&cmc))
-		unp_scan(mc_first(&cmc), unp_freerights);
-out:
-	mc_freem(&mc);
-	mc_freem(&cmc);
-
-	if (uio != NULL)
-		uio->uio_resid = resid - sent;
-
-	return (error);
-}
-
-static int
-uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa,
-    struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
-{
-	struct sockbuf *sb = &so->so_rcv;
-	struct mbuf *control, *m, *first, *last, *next;
-	u_int ctl, space, datalen, mbcnt, lastlen;
-	int error, flags;
-	bool nonblock, waitall, peek;
-
-	MPASS(mp0 == NULL);
-
-	if (psa != NULL)
-		*psa = NULL;
-	if (controlp != NULL)
-		*controlp = NULL;
-
-	flags = flagsp != NULL ? *flagsp : 0;
-	nonblock = (so->so_state & SS_NBIO) ||
-	    (flags & (MSG_DONTWAIT | MSG_NBIO));
-	peek = flags & MSG_PEEK;
-	waitall = (flags & MSG_WAITALL) && !peek;
+	mbcnt = so2->so_rcv.sb_mbcnt;
+	sbcc = sbavail(&so2->so_rcv);
+	if (sbcc)
+		sorwakeup_locked(so2);
+	else
+		SOCKBUF_UNLOCK(&so2->so_rcv);
 
 	/*
-	 * This check may fail only on a socket that never went through
-	 * connect(2).  We can check this locklessly, cause: a) for a new born
-	 * socket we don't care about applications that may race internally
-	 * between connect(2) and recv(2), and b) for a dying socket if we
-	 * miss update by unp_sosidisconnected(), we would still get the check
-	 * correct.  For dying socket we would observe SBS_CANTRCVMORE later.
+	 * The PCB lock on unp2 protects the SB_STOP flag.  Without it,
+	 * it would be possible for uipc_rcvd to be called at this
+	 * point, drain the receiving sockbuf, clear SB_STOP, and then
+	 * we would set SB_STOP below.  That could lead to an empty
+	 * sockbuf having SB_STOP set
 	 */
-	if (__predict_false((atomic_load_short(&so->so_state) &
-	    (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0))
-		return (ENOTCONN);
-
-	error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags));
-	if (__predict_false(error))
-		return (error);
-
-restart:
-	SOCK_RECVBUF_LOCK(so);
-	UIPC_STREAM_SBCHECK(sb);
-	while (sb->sb_acc < sb->sb_lowat &&
-	    (sb->sb_ctl == 0 || controlp == NULL)) {
-		if (so->so_error) {
-			error = so->so_error;
-			if (!peek)
-				so->so_error = 0;
-			SOCK_RECVBUF_UNLOCK(so);
-			SOCK_IO_RECV_UNLOCK(so);
-			return (error);
-		}
-		if (sb->sb_state & SBS_CANTRCVMORE) {
-			SOCK_RECVBUF_UNLOCK(so);
-			SOCK_IO_RECV_UNLOCK(so);
-			return (0);
-		}
-		if (nonblock) {
-			SOCK_RECVBUF_UNLOCK(so);
-			SOCK_IO_RECV_UNLOCK(so);
-			return (EWOULDBLOCK);
-		}
-		error = sbwait(so, SO_RCV);
-		if (error) {
-			SOCK_RECVBUF_UNLOCK(so);
-			SOCK_IO_RECV_UNLOCK(so);
-			return (error);
-		}
-	}
-
-	MPASS(STAILQ_FIRST(&sb->sb_mbq));
-	MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0);
-
-	mbcnt = 0;
-	ctl = 0;
-	first = STAILQ_FIRST(&sb->sb_mbq);
-	if (first->m_type == MT_CONTROL) {
-		control = first;
-		STAILQ_FOREACH_FROM(first, &sb->sb_mbq, m_stailq) {
-			if (first->m_type != MT_CONTROL)
-				break;
-			ctl += first->m_len;
-			mbcnt += MSIZE;
-			if (first->m_flags & M_EXT)
-				mbcnt += first->m_ext.ext_size;
-		}
-	} else
-		control = NULL;
-
+	SOCKBUF_LOCK(&so->so_snd);
+	if (sbcc >= so->so_snd.sb_hiwat || mbcnt >= so->so_snd.sb_mbmax)
+		so->so_snd.sb_flags |= SB_STOP;
+	SOCKBUF_UNLOCK(&so->so_snd);
+	UNP_PCB_UNLOCK(unp2);
+	m = NULL;
+out:
 	/*
-	 * Find split point for the next copyout.  On exit from the loop:
-	 * last == NULL - socket to be flushed
-	 * last != NULL
-	 *   lastlen > last->m_len - uio to be filled, last to be adjusted
-	 *   lastlen == 0          - MT_CONTROL or M_EOR encountered
+	 * PRUS_EOF is equivalent to pr_send followed by pr_shutdown.
 	 */
-	space = uio->uio_resid;
-	datalen = 0;
-	for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) {
-		if (m->m_type != MT_DATA) {
-			last = m;
-			lastlen = 0;
-			break;
-		}
-		if (space >= m->m_len) {
-			space -= m->m_len;
-			datalen += m->m_len;
-			mbcnt += MSIZE;
-			if (m->m_flags & M_EXT)
-				mbcnt += m->m_ext.ext_size;
-			if (m->m_flags & M_EOR) {
-				last = STAILQ_NEXT(m, m_stailq);
-				lastlen = 0;
-				flags |= MSG_EOR;
-				break;
-			}
-		} else {
-			datalen += space;
-			last = m;
-			lastlen = space;
-			break;
-		}
-	}
-
-	UIPC_STREAM_SBCHECK(sb);
-	if (!peek) {
-		if (last == NULL)
-			STAILQ_INIT(&sb->sb_mbq);
-		else {
-			STAILQ_FIRST(&sb->sb_mbq) = last;
-			MPASS(last->m_len > lastlen);
-			last->m_len -= lastlen;
-			last->m_data += lastlen;
-		}
-		MPASS(sb->sb_acc >= datalen);
-		sb->sb_acc -= datalen;
-		sb->sb_ccc -= datalen;
-		MPASS(sb->sb_ctl >= ctl);
-		sb->sb_ctl -= ctl;
-		MPASS(sb->sb_mbcnt >= mbcnt);
-		sb->sb_mbcnt -= mbcnt;
-		UIPC_STREAM_SBCHECK(sb);
-		/* Mind the name.  We are waking writer here, not reader. */
-		sorwakeup_locked(so);
-	} else
-		SOCK_RECVBUF_UNLOCK(so);
-
-	while (control != NULL && control->m_type == MT_CONTROL) {
-		if (!peek) {
-			struct mbuf *c;
-
-			/*
-			 * unp_externalize() failure must abort entire read(2).
-			 * Such failure should also free the problematic
-			 * control, so that socket is not left in a state
-			 * where it can't progress forward with reading.
-			 * Probability of such a failure is really low, so it
-			 * is fine that we need to perform pretty complex
-			 * operation here to reconstruct the buffer.
-			 * XXXGL: unp_externalize() used to be
-			 * dom_externalize() KBI and it frees whole chain, so
-			 * we need to feed it with mbufs one by one.
-			 */
-			c = control;
-			control = STAILQ_NEXT(c, m_stailq);
-			STAILQ_NEXT(c, m_stailq) = NULL;
-			error = unp_externalize(c, controlp, flags);
-			if (__predict_false(error)) {
-				SOCK_RECVBUF_LOCK(so);
-				UIPC_STREAM_SBCHECK(sb);
-				MPASS(!(sb->sb_state & SBS_CANTRCVMORE));
-				/* XXXGL: STAILQ_PREPEND */
-				if (STAILQ_EMPTY(&sb->sb_mbq) &&
-				    control != NULL)
-					STAILQ_INSERT_HEAD(&sb->sb_mbq,
-					    control, m_stailq);
-				else
-					STAILQ_FIRST(&sb->sb_mbq) = control;
-				sb->sb_ctl = sb->sb_acc = sb->sb_ccc =
-				    sb->sb_mbcnt = 0;
-				STAILQ_FOREACH(m, &sb->sb_mbq, m_stailq) {
-					if (m->m_type == MT_DATA) {
-						sb->sb_acc += m->m_len;
-						sb->sb_ccc += m->m_len;
-					} else {
-						sb->sb_ctl += m->m_len;
-					}
-					sb->sb_mbcnt += MSIZE;
-					if (m->m_flags & M_EXT)
-						sb->sb_mbcnt +=
-						    m->m_ext.ext_size;
-				}
-				UIPC_STREAM_SBCHECK(sb);
-				SOCK_RECVBUF_UNLOCK(so);
-				SOCK_IO_RECV_UNLOCK(so);
-				return (error);
-			}
-			if (controlp != NULL) {
-				while (*controlp != NULL)
-					controlp = &(*controlp)->m_next;
-			}
-		} else {
-			/*
-			 * XXXGL
-			 *
-			 * In MSG_PEEK case control is not externalized.  This
-			 * means we are leaking some kernel pointers to the
-			 * userland.  They are useless to a law-abiding
-			 * application, but may be useful to a malware.  This
-			 * is what the historical implementation in the
-			 * soreceive_generic() did. To be improved?
-			 */
-			if (controlp != NULL) {
-				*controlp = m_copym(control, 0, control->m_len,
-				    M_WAITOK);
-				controlp = &(*controlp)->m_next;
-			}
-			control = STAILQ_NEXT(control, m_stailq);
-		}
+	if (flags & PRUS_EOF) {
+		UNP_PCB_LOCK(unp);
+		socantsendmore(so);
+		unp_shutdown(unp);
+		UNP_PCB_UNLOCK(unp);
 	}
+	if (control != NULL && error != 0)
+		unp_scan(control, unp_freerights);
 
-	for (m = first; m != last; m = next) {
-		next = STAILQ_NEXT(m, m_stailq);
-		error = uiomove(mtod(m, char *), m->m_len, uio);
-		if (__predict_false(error)) {
-			SOCK_IO_RECV_UNLOCK(so);
-			if (!peek)
-				for (; m != last; m = next) {
-					next = STAILQ_NEXT(m, m_stailq);
-					m_free(m);
-				}
-			return (error);
-		}
-		if (!peek)
-			m_free(m);
-	}
-	if (last != NULL && lastlen > 0) {
-		if (!peek) {
-			MPASS(!(m->m_flags & M_PKTHDR));
-			MPASS(last->m_data - M_START(last) >= lastlen);
-			error = uiomove(mtod(last, char *) - lastlen,
-			    lastlen, uio);
-		} else
-			error = uiomove(mtod(last, char *), lastlen, uio);
-		if (__predict_false(error)) {
-			SOCK_IO_RECV_UNLOCK(so);
-			return (error);
-		}
-	}
-	if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0)
-		goto restart;
-	SOCK_IO_RECV_UNLOCK(so);
-
-	if (flagsp != NULL)
-		*flagsp |= flags;
-
-	uio->uio_td->td_ru.ru_msgrcv++;
-
-	return (0);
+release:
+	if (control != NULL)
+		m_freem(control);
+	/*
+	 * In case of PRUS_NOTREADY, uipc_ready() is responsible
+	 * for freeing memory.
+	 */   
+	if (m != NULL && (flags & PRUS_NOTREADY) == 0)
+		m_freem(m);
+	return (error);
 }
 
 /* PF_UNIX/SOCK_DGRAM version of sbspace() */
@@ -1498,8 +1111,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 	const struct sockaddr *from;
 	struct socket *so2;
 	struct sockbuf *sb;
-	struct mchain cmc = MCHAIN_INITIALIZER(&cmc);
-	struct mbuf *f;
+	struct mbuf *f, *clast;
 	u_int cc, ctl, mbcnt;
 	u_int dcc __diagused, dctl __diagused, dmbcnt __diagused;
 	int error;
@@ -1508,6 +1120,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 
 	error = 0;
 	f = NULL;
+	ctl = 0;
 
 	if (__predict_false(flags & MSG_OOB)) {
 		error = EOPNOTSUPP;
@@ -1526,14 +1139,16 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 		f = m_gethdr(M_WAITOK, MT_SONAME);
 		cc = m->m_pkthdr.len;
 		mbcnt = MSIZE + m->m_pkthdr.memlen;
-		if (c != NULL && (error = unp_internalize(c, &cmc, td)))
+		if (c != NULL &&
+		    (error = unp_internalize(&c, td, &clast, &ctl, &mbcnt)))
 			goto out;
 	} else {
-		struct mchain mc;
+		/* pr_sosend() with mbuf usually is a kernel thread. */
+
+		M_ASSERTPKTHDR(m);
+		if (__predict_false(c != NULL))
+			panic("%s: control from a kernel thread", __func__);
 
-		uipc_reset_kernel_mbuf(m, &mc);
-		cc = mc.mc_len;
-		mbcnt = mc.mc_mlen;
 		if (__predict_false(m->m_pkthdr.len > unpdg_maxdgram)) {
 			error = EMSGSIZE;
 			goto out;
@@ -1542,6 +1157,22 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 			error = ENOBUFS;
 			goto out;
 		}
+		/* Condition the foreign mbuf to our standards. */
+		m_clrprotoflags(m);
+		m_tag_delete_chain(m, NULL);
+		m->m_pkthdr.rcvif = NULL;
+		m->m_pkthdr.flowid = 0;
+		m->m_pkthdr.csum_flags = 0;
+		m->m_pkthdr.fibnum = 0;
+		m->m_pkthdr.rsstype = 0;
+
+		cc = m->m_pkthdr.len;
+		mbcnt = MSIZE;
+		for (struct mbuf *mb = m; mb != NULL; mb = mb->m_next) {
+			mbcnt += MSIZE;
+			if (mb->m_flags & M_EXT)
+				mbcnt += mb->m_ext.ext_size;
+		}
 	}
 
 	unp = sotounpcb(so);
@@ -1593,7 +1224,8 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 	}
 
 	if (unp2->unp_flags & UNP_WANTCRED_MASK)
-		unp_addsockcred(td, &cmc, unp2->unp_flags);
+		c = unp_addsockcred(td, c, unp2->unp_flags, &clast, &ctl,
+		    &mbcnt);
 	if (unp->unp_addr != NULL)
 		from = (struct sockaddr *)unp->unp_addr;
 	else
@@ -1601,21 +1233,25 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 	f->m_len = from->sa_len;
 	MPASS(from->sa_len <= MLEN);
 	bcopy(from, mtod(f, void *), from->sa_len);
+	ctl += f->m_len;
 
 	/*
 	 * Concatenate mbufs: from -> control -> data.
 	 * Save overall cc and mbcnt in "from" mbuf.
 	 */
-	if (!STAILQ_EMPTY(&cmc.mc_q)) {
-		f->m_next = mc_first(&cmc);
-		mc_last(&cmc)->m_next = m;
-		/* XXXGL: This is dirty as well as rollback after ENOBUFS. */
-		STAILQ_INIT(&cmc.mc_q);
+	if (c != NULL) {
+#ifdef INVARIANTS
+		struct mbuf *mc;
+
+		for (mc = c; mc->m_next != NULL; mc = mc->m_next);
+		MPASS(mc == clast);
+#endif
+		f->m_next = c;
+		clast->m_next = m;
+		c = NULL;
 	} else
 		f->m_next = m;
 	m = NULL;
-	ctl = f->m_len + cmc.mc_len;
-	mbcnt += cmc.mc_mlen;
 #ifdef INVARIANTS
 	dcc = dctl = dmbcnt = 0;
 	for (struct mbuf *mb = f; mb != NULL; mb = mb->m_next) {
@@ -1681,7 +1317,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 		soroverflow_locked(so2);
 		error = ENOBUFS;
 		if (f->m_next->m_type == MT_CONTROL) {
-			STAILQ_FIRST(&cmc.mc_q) = f->m_next;
+			c = f->m_next;
 			f->m_next = NULL;
 		}
 	}
@@ -1696,12 +1332,13 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio,
 out3:
 	SOCK_IO_SEND_UNLOCK(so);
 out2:
-	if (!mc_empty(&cmc))
-		unp_scan(mc_first(&cmc), unp_freerights);
+	if (c)
+		unp_scan(c, unp_freerights);
 out:
 	if (f)
 		m_freem(f);
-	mc_freem(&cmc);
+	if (c)
+		m_freem(c);
 	if (m)
 		m_freem(m);
 
@@ -1942,7 +1579,6 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
 	return (0);
 }
 
-#if 0	/* No sendfile support. */
 static bool
 uipc_ready_scan(struct socket *so, struct mbuf *m, int count, int *errorp)
 {
@@ -2022,7 +1658,6 @@ uipc_ready(struct socket *so, struct mbuf *m, int count)
 	}
 	return (error);
 }
-#endif
 
 static int
 uipc_sense(struct socket *so, struct stat *sb)
@@ -2461,19 +2096,6 @@ unp_connect2(struct socket *so, struct socket *so2)
 	}
 }
 
-static void
-unp_soisdisconnected(struct socket *so)
-{
-	SOCK_LOCK(so);
-	MPASS(!SOLISTENING(so));
-	so->so_state |= SS_ISDISCONNECTED;
-	so->so_state &= ~SS_ISCONNECTED;
-	SOCK_RECVBUF_LOCK(so);
-	socantrcvmore_locked(so);
-	SOCK_UNLOCK(so);
-	wakeup(&so->so_timeo);	/* XXXGL: is this needed? */
-}
-
*** 398 LINES SKIPPED ***