From nobody Mon Apr 08 20:29:48 2024 X-Original-To: dev-commits-src-all@mlmmj.nyi.freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2610:1c1:1:606c::19:1]) by mlmmj.nyi.freebsd.org (Postfix) with ESMTP id 4VD10s0NVqz5HjBS; Mon, 8 Apr 2024 20:29:49 +0000 (UTC) (envelope-from git@FreeBSD.org) Received: from mxrelay.nyi.freebsd.org (mxrelay.nyi.freebsd.org [IPv6:2610:1c1:1:606c::19:3]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256 client-signature RSA-PSS (4096 bits) client-digest SHA256) (Client CN "mxrelay.nyi.freebsd.org", Issuer "R3" (verified OK)) by mx1.freebsd.org (Postfix) with ESMTPS id 4VD10r6krYz469h; Mon, 8 Apr 2024 20:29:48 +0000 (UTC) (envelope-from git@FreeBSD.org) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=freebsd.org; s=dkim; t=1712608188; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding; bh=7NrHtFnj8jSEMhC3NsLLA5RDQyO5Y8IcVs8f/TL25vc=; b=BJl78Ni7Yxr65mI1LxnKYZUBTT78kGZq2sOKIRA4EaMvvpm1O4XdqMrYhKlC8HKKYAbOMp diUKcLo1RXeb0YBjtggTFAwXJS5yjkz2VVZehrM18KJz4PoLkpFiuCeI45vRLZQT9fYYJp FBfXBuzhwN+ecT619dDQhFeuAjeUBlwEBlj63XEk957URyp9PruWuZXeIBRQa5Ed1JaGHM wKmJ5z99leQcdcYmD3BklTQ9j1Op6V8MvLsQ5DZJ66C8r1S8X5kxxRA8CcJvnY7k918L1W LAQobq5QIqeWu3v8J3hta9lhBAYbpEUK2h8Zz7mWS8y/9bSsCczVePWPEUCxtA== ARC-Seal: i=1; s=dkim; d=freebsd.org; t=1712608188; a=rsa-sha256; cv=none; b=wq0F14//0onkycO6RFfSEKHqCxlLf98HUcFYn0P6HCglaTadO+NWLPZn44YuNfFKG17F6z 9BYjxHqfvsJmC+ue6ThDNULRNlDGCBwY5L3O0fdP/BOGVDM5szWqWBpS3VwbROH+gjJwWg tieQ5VtB3g161Crexatysi4/4p2pUCldTpvnMUjaWZDLJOnHVQYxeprdgQDf5tcTbJVf6v +4XssA6EW/lfiTgr5WEt67SDZoEiDxfWliJLcF5lSwkp5ad0R43PrIG/9f9R36GTYv3nz5 8ZYa/8j1vXHGyZUmcYhqQO1mL+ZCPJWrDPTdot/5/5jOAdqmS/wXHSF+J4D6Ew== ARC-Authentication-Results: i=1; mx1.freebsd.org; none ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=freebsd.org; s=dkim; t=1712608188; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding; bh=7NrHtFnj8jSEMhC3NsLLA5RDQyO5Y8IcVs8f/TL25vc=; b=DZRxSW/Vgb42/f5lh41IKRjXcv1laQvGKrTFnJmwB1yNJxHN0ZPV5ZXwEF1hkRQWlF2aKR tPsLI0UvuF/tj3TYlZ0wLy81LeVh38biewU8pg9eJ2h7ZjCaxfKTz+apy/FBuYiOKekQ9W EIZyllKnFR25iBuHN3j15p5IWhO9Pn7gZL3pPHRS6ibwFTWsZQNbZ0nH7imxsL1xsmKrsE Zs6CKRNaH4/s5s+rWTBhviLtvQRXIYfotpMwJZAGbuhpw2J2kgmgs7SGZgr2/67aWYl+S3 hcnTNnDXE66+mKJctDXRsWd3u70/xuPH6nSSdW15YxdrbXGVB1tyIVFT1imXsA== Received: from gitrepo.freebsd.org (gitrepo.freebsd.org [IPv6:2610:1c1:1:6068::e6a:5]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (Client did not present a certificate) by mxrelay.nyi.freebsd.org (Postfix) with ESMTPS id 4VD10r6JcfzQcX; Mon, 8 Apr 2024 20:29:48 +0000 (UTC) (envelope-from git@FreeBSD.org) Received: from gitrepo.freebsd.org ([127.0.1.44]) by gitrepo.freebsd.org (8.17.1/8.17.1) with ESMTP id 438KTm78041903; Mon, 8 Apr 2024 20:29:48 GMT (envelope-from git@gitrepo.freebsd.org) Received: (from git@localhost) by gitrepo.freebsd.org (8.17.1/8.17.1/Submit) id 438KTmpR041900; Mon, 8 Apr 2024 20:29:48 GMT (envelope-from git) Date: Mon, 8 Apr 2024 20:29:48 GMT Message-Id: <202404082029.438KTmpR041900@gitrepo.freebsd.org> To: src-committers@FreeBSD.org, dev-commits-src-all@FreeBSD.org, dev-commits-src-main@FreeBSD.org From: Gleb Smirnoff Subject: git: d80a97def9a1 - main - unix: new implementation of unix/stream & unix/seqpacket List-Id: Commit messages for all branches of the src repository List-Archive: https://lists.freebsd.org/archives/dev-commits-src-all List-Help: List-Post: List-Subscribe: List-Unsubscribe: Sender: owner-dev-commits-src-all@freebsd.org X-BeenThere: dev-commits-src-all@freebsd.org MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit X-Git-Committer: glebius X-Git-Repository: src X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Commit: d80a97def9a1db6f07f5d2e68f7ad62b27918947 Auto-Submitted: auto-generated The branch main has been updated by glebius: URL: https://cgit.FreeBSD.org/src/commit/?id=d80a97def9a1db6f07f5d2e68f7ad62b27918947 commit d80a97def9a1db6f07f5d2e68f7ad62b27918947 Author: Gleb Smirnoff AuthorDate: 2024-04-08 20:16:51 +0000 Commit: Gleb Smirnoff CommitDate: 2024-04-08 20:16:51 +0000 unix: new implementation of unix/stream & unix/seqpacket Provide protocol specific pr_sosend and pr_soreceive for PF_UNIX SOCK_STREAM sockets and implement SOCK_SEQPACKET sockets as an extension of SOCK_STREAM. The change meets three goals: get rid of unix(4) specific stuff in the generic socket code, provide a faster and robust unix/stream sockets and bring unix/seqpacket much closer to specification. Highlights follow: - The send buffer now is truly bypassed. Previously it was always empty, but the send(2) still needed to acquire its lock and do a variety of tricks to be woken up in the right time while sleeping on it. Now the only two things we care about in the send buffer is the I/O sx(9) lock that serializes operations and value of so_snd.sb_hiwat, which we can read without obtaining a lock. The sleep of a send(2) happens on the mutex of the receive buffer of the peer. A bulk send/recv of data with large socket buffers will make both syscalls just bounce between owning the receive buffer lock and copyin(9)/copyout(9), no other locks would be involved. - The implementation uses new mchain structure to manipulate mbuf chains. Note that this required converting to mchain two functions that are shared with unix/dgram: unp_internalize() and unp_addsockcred() as well as adding a new shared one uipc_process_kernel_mbuf(). This induces some non- functional changes in the unix/dgram code as well. There is a space for improvement here, as right now it is a mix of mchain and manually managed mbuf chains. - unix/seqpacket previously marked as PR_ADDR & PR_ATOMIC and thus treated as a datagram socket by the generic socket code, now becomes a true stream socket with record markers. - unix/stream loses the sendfile(2) support. This can be brought back, but requires some work. Let's first see if there is any interest in this feature, except purely academical. Reviewed by: markj, tuexen Differential Revision: https://reviews.freebsd.org/D44151 --- sys/kern/uipc_usrreq.c | 956 +++++++++++++++++++++++++++++++++---------------- sys/sys/sockbuf.h | 7 + 2 files changed, 645 insertions(+), 318 deletions(-) diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c index 6e83e2be6f05..1b9416269696 100644 --- a/sys/kern/uipc_usrreq.c +++ b/sys/kern/uipc_usrreq.c @@ -5,7 +5,7 @@ * The Regents of the University of California. All Rights Reserved. * Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved. * Copyright (c) 2018 Matthew Macy - * Copyright (c) 2022 Gleb Smirnoff + * Copyright (c) 2022-2024 Gleb Smirnoff * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -141,11 +141,14 @@ static struct timeout_task unp_gc_task; static struct task unp_defer_task; /* - * Both send and receive buffers are allocated PIPSIZ bytes of buffering for - * stream sockets, although the total for sender and receiver is actually - * only PIPSIZ. + * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer, + * however the notion of send buffer still makes sense with them. Its size is + * the amount of space that a send(2) syscall may copyin(9) before checking + * with the receive buffer of a peer. Although not linked anywhere yet, + * pointed to by a stack variable, effectively it is a buffer that needs to be + * sized. * - * Datagram sockets really use the sendspace as the maximum datagram size, + * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size, * and don't really want to reserve the sendspace. Their recvspace should be * large enough for at least one max-size datagram plus address. */ @@ -156,7 +159,7 @@ static u_long unpst_sendspace = PIPSIZ; static u_long unpst_recvspace = PIPSIZ; static u_long unpdg_maxdgram = 8*1024; /* support 8KB syslog msgs */ static u_long unpdg_recvspace = 16*1024; -static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */ +static u_long unpsp_sendspace = PIPSIZ; static u_long unpsp_recvspace = PIPSIZ; static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0, @@ -300,13 +303,12 @@ static void unp_gc(__unused void *, int); static void unp_scan(struct mbuf *, void (*)(struct filedescent **, int)); static void unp_discard(struct file *); static void unp_freerights(struct filedescent **, int); -static int unp_internalize(struct mbuf **, struct thread *, - struct mbuf **, u_int *, u_int *); +static int unp_internalize(struct mbuf *, struct mchain *, + struct thread *); static void unp_internalize_fp(struct file *); static int unp_externalize(struct mbuf *, struct mbuf **, int); static int unp_externalize_fp(struct file *); -static struct mbuf *unp_addsockcred(struct thread *, struct mbuf *, - int, struct mbuf **, u_int *, u_int *); +static void unp_addsockcred(struct thread *, struct mchain *, int); static void unp_process_defers(void * __unused, int); static void @@ -449,6 +451,7 @@ uipc_attach(struct socket *so, int proto, struct thread *td) case SOCK_STREAM: sendspace = unpst_sendspace; recvspace = unpst_recvspace; + STAILQ_INIT(&so->so_rcv.sb_mbq); break; case SOCK_DGRAM: @@ -466,6 +469,7 @@ uipc_attach(struct socket *so, int proto, struct thread *td) case SOCK_SEQPACKET: sendspace = unpsp_sendspace; recvspace = unpsp_recvspace; + STAILQ_INIT(&so->so_rcv.sb_mbq); break; default: @@ -797,6 +801,10 @@ uipc_detach(struct socket *so) taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1); switch (so->so_type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + MPASS(STAILQ_EMPTY(&so->so_rcv.sb_mbq)); + break; case SOCK_DGRAM: /* * Everything should have been unlinked/freed by unp_dispose() @@ -852,6 +860,10 @@ uipc_listen(struct socket *so, int backlog, struct thread *td) error = solisten_proto_check(so); if (error == 0) { cru2xt(td, &unp->unp_peercred); + (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_snd.sb_hiwat, + 0, RLIM_INFINITY); + (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_rcv.sb_hiwat, + 0, RLIM_INFINITY); solisten_proto(so, backlog); } SOCK_UNLOCK(so); @@ -885,187 +897,563 @@ uipc_peeraddr(struct socket *so, struct sockaddr *ret) return (0); } -static int -uipc_rcvd(struct socket *so, int flags) +/* + * pr_sosend() called with mbuf instead of uio is a kernel thread. NFS, + * netgraph(4) and other subsystems can call into socket code. The + * function will condition the mbuf so that it can be safely put onto socket + * buffer and calculate its char count and mbuf count. + * + * Note: we don't support receiving control data from a kernel thread. Our + * pr_sosend methods have MPASS() to check that. This may change. + */ +static void +uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc) { - struct unpcb *unp, *unp2; - struct socket *so2; - u_int mbcnt, sbcc; - unp = sotounpcb(so); - KASSERT(unp != NULL, ("%s: unp == NULL", __func__)); - KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET, - ("%s: socktype %d", __func__, so->so_type)); + M_ASSERTPKTHDR(m); - /* - * Adjust backpressure on sender and wakeup any waiting to write. - * - * The unp lock is acquired to maintain the validity of the unp_conn - * pointer; no lock on unp2 is required as unp2->unp_socket will be - * static as long as we don't permit unp2 to disconnect from unp, - * which is prevented by the lock on unp. We cache values from - * so_rcv to avoid holding the so_rcv lock over the entire - * transaction on the remote so_snd. - */ - SOCKBUF_LOCK(&so->so_rcv); - mbcnt = so->so_rcv.sb_mbcnt; - sbcc = sbavail(&so->so_rcv); - SOCKBUF_UNLOCK(&so->so_rcv); - /* - * There is a benign race condition at this point. If we're planning to - * clear SB_STOP, but uipc_send is called on the connected socket at - * this instant, it might add data to the sockbuf and set SB_STOP. Then - * we would erroneously clear SB_STOP below, even though the sockbuf is - * full. The race is benign because the only ill effect is to allow the - * sockbuf to exceed its size limit, and the size limits are not - * strictly guaranteed anyway. - */ - UNP_PCB_LOCK(unp); - unp2 = unp->unp_conn; - if (unp2 == NULL) { - UNP_PCB_UNLOCK(unp); - return (0); + m_clrprotoflags(m); + m_tag_delete_chain(m, NULL); + m->m_pkthdr.rcvif = NULL; + m->m_pkthdr.flowid = 0; + m->m_pkthdr.csum_flags = 0; + m->m_pkthdr.fibnum = 0; + m->m_pkthdr.rsstype = 0; + + mc_init_m(mc, m); + MPASS(m->m_pkthdr.len == mc->mc_len); +} + +#ifdef SOCKBUF_DEBUG +static inline void +uipc_stream_sbcheck(struct sockbuf *sb) +{ + struct mbuf *d; + u_int dcc, dctl, dmbcnt; + + dcc = dctl = dmbcnt = 0; + STAILQ_FOREACH(d, &sb->sb_mbq, m_stailq) { + if (d->m_type == MT_CONTROL) + dctl += d->m_len; + else if (d->m_type == MT_DATA) + dcc += d->m_len; + else + MPASS(0); + dmbcnt += MSIZE; + if (d->m_flags & M_EXT) + dmbcnt += d->m_ext.ext_size; + if (d->m_stailq.stqe_next == NULL) + MPASS(sb->sb_mbq.stqh_last == &d->m_stailq.stqe_next); } - so2 = unp2->unp_socket; - SOCKBUF_LOCK(&so2->so_snd); - if (sbcc < so2->so_snd.sb_hiwat && mbcnt < so2->so_snd.sb_mbmax) - so2->so_snd.sb_flags &= ~SB_STOP; - sowwakeup_locked(so2); - UNP_PCB_UNLOCK(unp); - return (0); + MPASS(dcc == sb->sb_acc); + MPASS(dcc == sb->sb_ccc); + MPASS(dctl == sb->sb_ctl); + MPASS(dmbcnt == sb->sb_mbcnt); +} +#define UIPC_STREAM_SBCHECK(sb) uipc_stream_sbcheck(sb) +#else +#define UIPC_STREAM_SBCHECK(sb) do {} while (0) +#endif + +/* + * uipc_stream_sbspace() returns how much a writer can send, limited by char + * count or mbuf memory use, whatever ends first. + * + * XXXGL: sb_mbcnt may overcommit sb_mbmax in case if previous write observed + * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended + * up with 'mc_mlen > mbspace'. A typical scenario would be a full buffer with + * writer trying to push in a large write, and a slow reader, that reads just + * a few bytes at a time. In that case writer will keep creating new mbufs + * with mc_split(). These mbufs will carry little chars, but will all point at + * the same cluster, thus each adding cluster size to sb_mbcnt. This means we + * will count same cluster many times potentially underutilizing socket buffer. + * We aren't optimizing towards ineffective readers. Classic socket buffer had + * the same "feature". + */ +static inline u_int +uipc_stream_sbspace(struct sockbuf *sb) +{ + u_int space, mbspace; + + MPASS(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl); + space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl; + if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt)) + mbspace = sb->sb_mbmax - sb->sb_mbcnt; + else + return (0); + + return (min(space, mbspace)); } static int -uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam, - struct mbuf *control, struct thread *td) +uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr, + struct uio *uio, struct mbuf *m, struct mbuf *c, int flags, + struct thread *td) { struct unpcb *unp, *unp2; struct socket *so2; - u_int mbcnt, sbcc; + struct sockbuf *sb; + struct mchain mc, cmc; + ssize_t resid, sent; + bool nonblock, eor; int error; - unp = sotounpcb(so); - KASSERT(unp != NULL, ("%s: unp == NULL", __func__)); - KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET, - ("%s: socktype %d", __func__, so->so_type)); + MPASS((uio != NULL && m == NULL) || (m != NULL && uio == NULL)); + MPASS(m == NULL || c == NULL); - error = 0; - if (flags & PRUS_OOB) { + if (__predict_false(flags & MSG_OOB)) { error = EOPNOTSUPP; - goto release; + goto out; } - if (control != NULL && - (error = unp_internalize(&control, td, NULL, NULL, NULL))) - goto release; - unp2 = NULL; - if ((so->so_state & SS_ISCONNECTED) == 0) { - if (nam != NULL) { - if ((error = unp_connect(so, nam, td)) != 0) - goto out; - } else { - error = ENOTCONN; + nonblock = (so->so_state & SS_NBIO) || + (flags & (MSG_DONTWAIT | MSG_NBIO)); + eor = flags & MSG_EOR; + + mc = MCHAIN_INITIALIZER(&mc); + cmc = MCHAIN_INITIALIZER(&cmc); + sent = 0; + + if (m == NULL) { + if (c != NULL && (error = unp_internalize(c, &cmc, td))) goto out; - } - } + /* + * Optimization for a case when our send fits into the receive + * buffer - do the copyin before taking any locks, sized to our + * send buffer. Later copyins will also take into account + * space in the peer's receive buffer. + */ + resid = uio->uio_resid; + error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK, + eor ? M_EOR : 0); + if (__predict_false(error)) + goto out2; + } else + uipc_reset_kernel_mbuf(m, &mc); + + error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags)); + if (error) + goto out2; + unp = sotounpcb(so); UNP_PCB_LOCK(unp); - if ((unp2 = unp_pcb_lock_peer(unp)) == NULL) { + unp2 = unp_pcb_lock_peer(unp); + if (__predict_false(so->so_error != 0)) { + error = so->so_error; + so->so_error = 0; UNP_PCB_UNLOCK(unp); - error = ENOTCONN; - goto out; - } else if (so->so_snd.sb_state & SBS_CANTSENDMORE) { - unp_pcb_unlock_pair(unp, unp2); - error = EPIPE; - goto out; + if (unp2 != NULL) + UNP_PCB_UNLOCK(unp2); + goto out3; } - UNP_PCB_UNLOCK(unp); - if ((so2 = unp2->unp_socket) == NULL) { - UNP_PCB_UNLOCK(unp2); - error = ENOTCONN; - goto out; + if (__predict_false(unp2 == NULL)) { + /* + * Different error code for a previously connected socket and + * a never connected one. The SS_ISDISCONNECTED is set in the + * unp_soisdisconnected() and is synchronized by the pcb lock. + */ + error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN; + UNP_PCB_UNLOCK(unp); + goto out3; } - SOCKBUF_LOCK(&so2->so_rcv); + UNP_PCB_UNLOCK(unp); + if (unp2->unp_flags & UNP_WANTCRED_MASK) { /* * Credentials are passed only once on SOCK_STREAM and * SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or * forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS). */ - control = unp_addsockcred(td, control, unp2->unp_flags, NULL, - NULL, NULL); + unp_addsockcred(td, &cmc, unp2->unp_flags); unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT; } /* - * Send to paired receive port and wake up readers. Don't - * check for space available in the receive buffer if we're - * attaching ancillary data; Unix domain sockets only check - * for space in the sending sockbuf, and that check is - * performed one level up the stack. At that level we cannot - * precisely account for the amount of buffer space used - * (e.g., because control messages are not yet internalized). + * Cycle through the data to send and available space in the peer's + * receive buffer. Put a reference on the peer socket, so that it + * doesn't get freed while we sbwait(). If peer goes away, we will + * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's + * socket destruction. */ - switch (so->so_type) { - case SOCK_STREAM: - if (control != NULL) { - sbappendcontrol_locked(&so2->so_rcv, - m->m_len > 0 ? m : NULL, control, flags); - control = NULL; - } else - sbappend_locked(&so2->so_rcv, m, flags); - break; + so2 = unp2->unp_socket; + soref(so2); + UNP_PCB_UNLOCK(unp2); + sb = &so2->so_rcv; + while (mc.mc_len + cmc.mc_len > 0) { + struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext); + u_int space; - case SOCK_SEQPACKET: - if (sbappendaddr_nospacecheck_locked(&so2->so_rcv, - &sun_noname, m, control)) - control = NULL; - break; + SOCK_RECVBUF_LOCK(so2); +restart: + UIPC_STREAM_SBCHECK(sb); + if (__predict_false(cmc.mc_len > sb->sb_hiwat)) { + SOCK_RECVBUF_UNLOCK(so2); + error = EMSGSIZE; + goto out4; + } + if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) { + SOCK_RECVBUF_UNLOCK(so2); + error = EPIPE; + goto out4; + } + /* + * Wait on the peer socket receive buffer until we have enough + * space to put at least control. The data is a stream and can + * be put partially, but control is really a datagram. + */ + space = uipc_stream_sbspace(sb); + if (space < sb->sb_lowat || space < cmc.mc_len) { + if (nonblock) { + SOCK_RECVBUF_UNLOCK(so2); + error = EWOULDBLOCK; + goto out4; + } + if ((error = sbwait(so2, SO_RCV)) != 0) { + SOCK_RECVBUF_UNLOCK(so2); + goto out4; + } else + goto restart; + } + MPASS(space >= cmc.mc_len); + space -= cmc.mc_len; + if (space == 0) { + /* There is space only to send control. */ + MPASS(!STAILQ_EMPTY(&cmc.mc_q)); + mcnext = mc; + mc = MCHAIN_INITIALIZER(&mc); + } else if (space < mc.mc_len) { + /* Not enough space. */ + if (__predict_false(mc_split(&mc, &mcnext, space, + M_NOWAIT) == ENOMEM)) { + /* + * If allocation failed use M_WAITOK and merge + * the chain back. Next time mc_split() will + * easily split at the same place. Only if we + * race with setsockopt(SO_RCVBUF) shrinking + * sb_hiwat can this happen more than once. + */ + SOCK_RECVBUF_UNLOCK(so2); + (void)mc_split(&mc, &mcnext, space, M_WAITOK); + mc_concat(&mc, &mcnext); + SOCK_RECVBUF_LOCK(so2); + goto restart; + } + MPASS(mc.mc_len == space); + } + if (!STAILQ_EMPTY(&cmc.mc_q)) { + STAILQ_CONCAT(&sb->sb_mbq, &cmc.mc_q); + sb->sb_ctl += cmc.mc_len; + sb->sb_mbcnt += cmc.mc_mlen; + cmc.mc_len = 0; + } + sent += mc.mc_len; + sb->sb_acc += mc.mc_len; + sb->sb_ccc += mc.mc_len; + sb->sb_mbcnt += mc.mc_mlen; + STAILQ_CONCAT(&sb->sb_mbq, &mc.mc_q); + UIPC_STREAM_SBCHECK(sb); + space = uipc_stream_sbspace(sb); + sorwakeup_locked(so2); + mc = mcnext; + if (STAILQ_EMPTY(&mc.mc_q) && + uio != NULL && uio->uio_resid > 0) { + /* + * Copyin sum of peer's receive buffer space and our + * sb_hiwat, which is our virtual send buffer size. + * See comment above unpst_sendspace declaration. + * We are reading sb_hiwat locklessly, cause a) we + * don't care about an application that does send(2) + * and setsockopt(2) racing internally, and for an + * application that does this in sequence we will see + * the correct value cause sbsetopt() uses buffer lock + * and we also have already acquired it at least once. + */ + error = mc_uiotomc(&mc, uio, space + + atomic_load_int(&so->so_snd.sb_hiwat), 0, M_WAITOK, + eor ? M_EOR : 0); + if (__predict_false(error)) + goto out4; + } } - mbcnt = so2->so_rcv.sb_mbcnt; - sbcc = sbavail(&so2->so_rcv); - if (sbcc) - sorwakeup_locked(so2); - else - SOCKBUF_UNLOCK(&so2->so_rcv); + MPASS(STAILQ_EMPTY(&mc.mc_q)); - /* - * The PCB lock on unp2 protects the SB_STOP flag. Without it, - * it would be possible for uipc_rcvd to be called at this - * point, drain the receiving sockbuf, clear SB_STOP, and then - * we would set SB_STOP below. That could lead to an empty - * sockbuf having SB_STOP set - */ - SOCKBUF_LOCK(&so->so_snd); - if (sbcc >= so->so_snd.sb_hiwat || mbcnt >= so->so_snd.sb_mbmax) - so->so_snd.sb_flags |= SB_STOP; - SOCKBUF_UNLOCK(&so->so_snd); - UNP_PCB_UNLOCK(unp2); - m = NULL; + td->td_ru.ru_msgsnd++; +out4: + sorele(so2); +out3: + SOCK_IO_SEND_UNLOCK(so); +out2: + if (!mc_empty(&cmc)) + unp_scan(mc_first(&cmc), unp_freerights); out: + mc_freem(&mc); + mc_freem(&cmc); + + if (uio != NULL) + uio->uio_resid = resid - sent; + + return (error); +} + +static int +uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa, + struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp) +{ + struct sockbuf *sb = &so->so_rcv; + struct mbuf *control, *m, *first, *last, *next; + u_int ctl, space, datalen, mbcnt, lastlen; + int error, flags; + bool nonblock, waitall, peek; + + MPASS(mp0 == NULL); + + if (psa != NULL) + *psa = NULL; + if (controlp != NULL) + *controlp = NULL; + + flags = flagsp != NULL ? *flagsp : 0; + nonblock = (so->so_state & SS_NBIO) || + (flags & (MSG_DONTWAIT | MSG_NBIO)); + peek = flags & MSG_PEEK; + waitall = (flags & MSG_WAITALL) && !peek; + /* - * PRUS_EOF is equivalent to pr_send followed by pr_shutdown. + * This check may fail only on a socket that never went through + * connect(2). We can check this locklessly, cause: a) for a new born + * socket we don't care about applications that may race internally + * between connect(2) and recv(2), and b) for a dying socket if we + * miss update by unp_sosidisconnected(), we would still get the check + * correct. For dying socket we would observe SBS_CANTRCVMORE later. */ - if (flags & PRUS_EOF) { - UNP_PCB_LOCK(unp); - socantsendmore(so); - unp_shutdown(unp); - UNP_PCB_UNLOCK(unp); + if (__predict_false((atomic_load_short(&so->so_state) & + (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0)) + return (ENOTCONN); + + error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags)); + if (__predict_false(error)) + return (error); + +restart: + SOCK_RECVBUF_LOCK(so); + UIPC_STREAM_SBCHECK(sb); + while (sb->sb_acc < sb->sb_lowat && + (sb->sb_ctl == 0 || controlp == NULL)) { + if (so->so_error) { + error = so->so_error; + if (!peek) + so->so_error = 0; + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + if (sb->sb_state & SBS_CANTRCVMORE) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (0); + } + if (nonblock) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (EWOULDBLOCK); + } + error = sbwait(so, SO_RCV); + if (error) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } } - if (control != NULL && error != 0) - unp_scan(control, unp_freerights); -release: - if (control != NULL) - m_freem(control); + MPASS(STAILQ_FIRST(&sb->sb_mbq)); + MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0); + + mbcnt = 0; + ctl = 0; + first = STAILQ_FIRST(&sb->sb_mbq); + if (first->m_type == MT_CONTROL) { + control = first; + STAILQ_FOREACH_FROM(first, &sb->sb_mbq, m_stailq) { + if (first->m_type != MT_CONTROL) + break; + ctl += first->m_len; + mbcnt += MSIZE; + if (first->m_flags & M_EXT) + mbcnt += first->m_ext.ext_size; + } + } else + control = NULL; + /* - * In case of PRUS_NOTREADY, uipc_ready() is responsible - * for freeing memory. - */ - if (m != NULL && (flags & PRUS_NOTREADY) == 0) - m_freem(m); - return (error); + * Find split point for the next copyout. On exit from the loop: + * last == NULL - socket to be flushed + * last != NULL + * lastlen > last->m_len - uio to be filled, last to be adjusted + * lastlen == 0 - MT_CONTROL or M_EOR encountered + */ + space = uio->uio_resid; + datalen = 0; + for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) { + if (m->m_type != MT_DATA) { + last = m; + lastlen = 0; + break; + } + if (space >= m->m_len) { + space -= m->m_len; + datalen += m->m_len; + mbcnt += MSIZE; + if (m->m_flags & M_EXT) + mbcnt += m->m_ext.ext_size; + if (m->m_flags & M_EOR) { + last = STAILQ_NEXT(m, m_stailq); + lastlen = 0; + flags |= MSG_EOR; + break; + } + } else { + datalen += space; + last = m; + lastlen = space; + break; + } + } + + UIPC_STREAM_SBCHECK(sb); + if (!peek) { + if (last == NULL) + STAILQ_INIT(&sb->sb_mbq); + else { + STAILQ_FIRST(&sb->sb_mbq) = last; + MPASS(last->m_len > lastlen); + last->m_len -= lastlen; + last->m_data += lastlen; + } + MPASS(sb->sb_acc >= datalen); + sb->sb_acc -= datalen; + sb->sb_ccc -= datalen; + MPASS(sb->sb_ctl >= ctl); + sb->sb_ctl -= ctl; + MPASS(sb->sb_mbcnt >= mbcnt); + sb->sb_mbcnt -= mbcnt; + UIPC_STREAM_SBCHECK(sb); + /* Mind the name. We are waking writer here, not reader. */ + sorwakeup_locked(so); + } else + SOCK_RECVBUF_UNLOCK(so); + + while (control != NULL && control->m_type == MT_CONTROL) { + if (!peek) { + struct mbuf *c; + + /* + * unp_externalize() failure must abort entire read(2). + * Such failure should also free the problematic + * control, so that socket is not left in a state + * where it can't progress forward with reading. + * Probability of such a failure is really low, so it + * is fine that we need to perform pretty complex + * operation here to reconstruct the buffer. + * XXXGL: unp_externalize() used to be + * dom_externalize() KBI and it frees whole chain, so + * we need to feed it with mbufs one by one. + */ + c = control; + control = STAILQ_NEXT(c, m_stailq); + STAILQ_NEXT(c, m_stailq) = NULL; + error = unp_externalize(c, controlp, flags); + if (__predict_false(error)) { + SOCK_RECVBUF_LOCK(so); + UIPC_STREAM_SBCHECK(sb); + MPASS(!(sb->sb_state & SBS_CANTRCVMORE)); + /* XXXGL: STAILQ_PREPEND */ + if (STAILQ_EMPTY(&sb->sb_mbq)) + STAILQ_INSERT_HEAD(&sb->sb_mbq, + control, m_stailq); + else + STAILQ_FIRST(&sb->sb_mbq) = control; + sb->sb_ctl = sb->sb_acc = sb->sb_ccc = + sb->sb_mbcnt = 0; + STAILQ_FOREACH(m, &sb->sb_mbq, m_stailq) { + if (m->m_type == MT_DATA) { + sb->sb_acc += m->m_len; + sb->sb_ccc += m->m_len; + } else { + sb->sb_ctl += m->m_len; + } + sb->sb_mbcnt += MSIZE; + if (m->m_flags & M_EXT) + sb->sb_mbcnt += + m->m_ext.ext_size; + } + UIPC_STREAM_SBCHECK(sb); + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + if (controlp != NULL) { + while (*controlp != NULL) + controlp = &(*controlp)->m_next; + } + } else { + /* + * XXXGL + * + * In MSG_PEEK case control is not externalized. This + * means we are leaking some kernel pointers to the + * userland. They are useless to a law-abiding + * application, but may be useful to a malware. This + * is what the historical implementation in the + * soreceive_generic() did. To be improved? + */ + if (controlp != NULL) { + *controlp = m_copym(control, 0, control->m_len, + M_WAITOK); + controlp = &(*controlp)->m_next; + } + control = STAILQ_NEXT(control, m_stailq); + } + } + + for (m = first; m != last; m = next) { + next = STAILQ_NEXT(m, m_stailq); + error = uiomove(mtod(m, char *), m->m_len, uio); + if (__predict_false(error)) { + SOCK_IO_RECV_UNLOCK(so); + if (!peek) + for (; m != last; m = next) { + next = STAILQ_NEXT(m, m_stailq); + m_free(m); + } + return (error); + } + if (!peek) + m_free(m); + } + if (last != NULL && lastlen > 0) { + if (!peek) { + MPASS(!(m->m_flags & M_PKTHDR)); + MPASS(last->m_data - M_START(last) >= lastlen); + error = uiomove(mtod(last, char *) - lastlen, + lastlen, uio); + } else + error = uiomove(mtod(last, char *), lastlen, uio); + if (__predict_false(error)) { + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + } + if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0) + goto restart; + SOCK_IO_RECV_UNLOCK(so); + + if (flagsp != NULL) + *flagsp |= flags; + + uio->uio_td->td_ru.ru_msgrcv++; + + return (0); } /* PF_UNIX/SOCK_DGRAM version of sbspace() */ @@ -1111,7 +1499,8 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, const struct sockaddr *from; struct socket *so2; struct sockbuf *sb; - struct mbuf *f, *clast; + struct mchain cmc = MCHAIN_INITIALIZER(&cmc); + struct mbuf *f; u_int cc, ctl, mbcnt; u_int dcc __diagused, dctl __diagused, dmbcnt __diagused; int error; @@ -1120,7 +1509,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, error = 0; f = NULL; - ctl = 0; if (__predict_false(flags & MSG_OOB)) { error = EOPNOTSUPP; @@ -1139,16 +1527,14 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, f = m_gethdr(M_WAITOK, MT_SONAME); cc = m->m_pkthdr.len; mbcnt = MSIZE + m->m_pkthdr.memlen; - if (c != NULL && - (error = unp_internalize(&c, td, &clast, &ctl, &mbcnt))) + if (c != NULL && (error = unp_internalize(c, &cmc, td))) goto out; } else { - /* pr_sosend() with mbuf usually is a kernel thread. */ - - M_ASSERTPKTHDR(m); - if (__predict_false(c != NULL)) - panic("%s: control from a kernel thread", __func__); + struct mchain mc; + uipc_reset_kernel_mbuf(m, &mc); + cc = mc.mc_len; + mbcnt = mc.mc_mlen; if (__predict_false(m->m_pkthdr.len > unpdg_maxdgram)) { error = EMSGSIZE; goto out; @@ -1157,22 +1543,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, error = ENOBUFS; goto out; } - /* Condition the foreign mbuf to our standards. */ - m_clrprotoflags(m); - m_tag_delete_chain(m, NULL); - m->m_pkthdr.rcvif = NULL; - m->m_pkthdr.flowid = 0; - m->m_pkthdr.csum_flags = 0; - m->m_pkthdr.fibnum = 0; - m->m_pkthdr.rsstype = 0; - - cc = m->m_pkthdr.len; - mbcnt = MSIZE; - for (struct mbuf *mb = m; mb != NULL; mb = mb->m_next) { - mbcnt += MSIZE; - if (mb->m_flags & M_EXT) - mbcnt += mb->m_ext.ext_size; - } } unp = sotounpcb(so); @@ -1224,8 +1594,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, } if (unp2->unp_flags & UNP_WANTCRED_MASK) - c = unp_addsockcred(td, c, unp2->unp_flags, &clast, &ctl, - &mbcnt); + unp_addsockcred(td, &cmc, unp2->unp_flags); if (unp->unp_addr != NULL) from = (struct sockaddr *)unp->unp_addr; else @@ -1233,25 +1602,21 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, f->m_len = from->sa_len; MPASS(from->sa_len <= MLEN); bcopy(from, mtod(f, void *), from->sa_len); - ctl += f->m_len; /* * Concatenate mbufs: from -> control -> data. * Save overall cc and mbcnt in "from" mbuf. */ - if (c != NULL) { -#ifdef INVARIANTS - struct mbuf *mc; - - for (mc = c; mc->m_next != NULL; mc = mc->m_next); - MPASS(mc == clast); -#endif - f->m_next = c; - clast->m_next = m; - c = NULL; + if (!STAILQ_EMPTY(&cmc.mc_q)) { + f->m_next = mc_first(&cmc); + mc_last(&cmc)->m_next = m; + /* XXXGL: This is dirty as well as rollback after ENOBUFS. */ + STAILQ_INIT(&cmc.mc_q); } else f->m_next = m; m = NULL; + ctl = f->m_len + cmc.mc_len; + mbcnt += cmc.mc_mlen; #ifdef INVARIANTS dcc = dctl = dmbcnt = 0; for (struct mbuf *mb = f; mb != NULL; mb = mb->m_next) { @@ -1317,7 +1682,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, soroverflow_locked(so2); error = ENOBUFS; if (f->m_next->m_type == MT_CONTROL) { - c = f->m_next; + STAILQ_FIRST(&cmc.mc_q) = f->m_next; f->m_next = NULL; } } @@ -1332,13 +1697,12 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr *addr, struct uio *uio, out3: SOCK_IO_SEND_UNLOCK(so); out2: - if (c) - unp_scan(c, unp_freerights); + if (!mc_empty(&cmc)) + unp_scan(mc_first(&cmc), unp_freerights); out: if (f) m_freem(f); - if (c) - m_freem(c); + mc_freem(&cmc); if (m) m_freem(m); @@ -1579,6 +1943,7 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio, return (0); } *** 428 LINES SKIPPED ***