From nobody Tue Sep 10 16:55:22 2024 X-Original-To: dev-commits-src-main@mlmmj.nyi.freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2610:1c1:1:606c::19:1]) by mlmmj.nyi.freebsd.org (Postfix) with ESMTP id 4X38vt4Rmmz5WC7d; Tue, 10 Sep 2024 16:55:22 +0000 (UTC) (envelope-from git@FreeBSD.org) Received: from mxrelay.nyi.freebsd.org (mxrelay.nyi.freebsd.org [IPv6:2610:1c1:1:606c::19:3]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256 client-signature RSA-PSS (4096 bits) client-digest SHA256) (Client CN "mxrelay.nyi.freebsd.org", Issuer "R11" (verified OK)) by mx1.freebsd.org (Postfix) with ESMTPS id 4X38vt2M5kz440h; Tue, 10 Sep 2024 16:55:22 +0000 (UTC) (envelope-from git@FreeBSD.org) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=freebsd.org; s=dkim; t=1725987322; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding; bh=ce9f7mOHOexXrS4WG4MJTLvnJTpn9wXTrDnxhAtnvAM=; b=MmK5hWcpNRP2WSV5rcRMeGvO+KHB+Psd1j8jwk13I6Gxenw3bck32WdOBRHyCw9Ik20G9C +rOg5jHr2BeJZ+yvPjkiEzikLGCMClm6C3BWkxS3110S/Odv0FqRDFLXoGiyFyE6lZ0OkA P4/oRcTWK42REOBqMv9a6XF/hEhepZ4e2jnGaShryGg7peX6IV8hpepP/47WREsaPGZTIF d6r9FcePAMWLq8KTrS+yN1ExDZ7Q89YM3bXME0rl+KxJRtb5pDiqM0PXYs2ZBXn4s4Txma TZILbg9m3a5NkLL9V66cCWHcfNOYrNiU6QX/c+cQ4lKtrl3AtcWUpmhwPvHPVg== ARC-Seal: i=1; s=dkim; d=freebsd.org; t=1725987322; a=rsa-sha256; cv=none; b=Vm7pNE5QX/YHHpPyfVBywIInYhn90TlLNpTY5znICgBhP+LpTudU4JFAjrvO4F3NlpUILW SmIAw0iFmS3PBcqRq8GR2LzxGDAFjXDack12Y0N57vygF9orqSwkOUgF3AtMA3y+w/G0FK 3Btjmj1DMVFqen0XwOyusKnXkejG0MN96zDo/XVbHrxk05EJwIwCsHAPLtm3NRQ5XDqgXM H0dR5aopeQRc7BSKWyqR7/6MAawTTanG+BGX36AcYEqWCaVem+qVqN7D6rvBY8NryglJ4f 15JD78aIJjV1mwnjmeTVnaXk1zI7P7PzalHfhx/SnI6DKOLkGUMeFe383DJgOA== ARC-Authentication-Results: i=1; mx1.freebsd.org; none ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=freebsd.org; s=dkim; t=1725987322; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding; bh=ce9f7mOHOexXrS4WG4MJTLvnJTpn9wXTrDnxhAtnvAM=; b=C8wEnTWS3mxPITSmx91jo9vPTMlV4z3VILCEOyUBaMI/F2okBv75KEW7o/Y+2wJQHHr6YH cI5eZqhFX1XVnpHJNfbCMPz7d7j3FUmpAypidJz3h2MTVueh1t5KQpUC4IATXNrxcJhh0W LQELB9O5zmHarScMiR2/HyX2I8pJSZwya2zD0uY/1hmPKqy0XR6ilnnPSCai3T87LhIXq8 L4qcvsJeHw8Wzqa2XhWfeYmL46Up2WgQ/c21CbMfThM5AaoQrk7vpRDh5LM1hCYb7AE80r K+f1psMuDhSD+KIaAgDhHIQXNA/sKnhDO33rhxhnTJVveZMI92hUgIGtALX+bg== Received: from gitrepo.freebsd.org (gitrepo.freebsd.org [IPv6:2610:1c1:1:6068::e6a:5]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (Client did not present a certificate) by mxrelay.nyi.freebsd.org (Postfix) with ESMTPS id 4X38vt1y8hzvrd; Tue, 10 Sep 2024 16:55:22 +0000 (UTC) (envelope-from git@FreeBSD.org) Received: from gitrepo.freebsd.org ([127.0.1.44]) by gitrepo.freebsd.org (8.18.1/8.18.1) with ESMTP id 48AGtMwt083968; Tue, 10 Sep 2024 16:55:22 GMT (envelope-from git@gitrepo.freebsd.org) Received: (from git@localhost) by gitrepo.freebsd.org (8.18.1/8.18.1/Submit) id 48AGtMfl083965; Tue, 10 Sep 2024 16:55:22 GMT (envelope-from git) Date: Tue, 10 Sep 2024 16:55:22 GMT Message-Id: <202409101655.48AGtMfl083965@gitrepo.freebsd.org> To: src-committers@FreeBSD.org, dev-commits-src-all@FreeBSD.org, dev-commits-src-main@FreeBSD.org From: Mark Johnston Subject: git: a1da7dc1cdad - main - socket: Implement SO_SPLICE List-Id: Commit messages for the main branch of the src repository List-Archive: https://lists.freebsd.org/archives/dev-commits-src-main List-Help: List-Post: List-Subscribe: List-Unsubscribe: X-BeenThere: dev-commits-src-main@freebsd.org Sender: owner-dev-commits-src-main@FreeBSD.org MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit X-Git-Committer: markj X-Git-Repository: src X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Commit: a1da7dc1cdad8c000622a7b23ff5994ccfe9cac6 Auto-Submitted: auto-generated The branch main has been updated by markj: URL: https://cgit.FreeBSD.org/src/commit/?id=a1da7dc1cdad8c000622a7b23ff5994ccfe9cac6 commit a1da7dc1cdad8c000622a7b23ff5994ccfe9cac6 Author: Mark Johnston AuthorDate: 2024-09-10 16:50:30 +0000 Commit: Mark Johnston CommitDate: 2024-09-10 16:51:37 +0000 socket: Implement SO_SPLICE This is a feature which allows one to splice two TCP sockets together such that data which arrives on one socket is automatically pushed into the send buffer of the spliced socket. This can be used to make TCP proxying more efficient as it eliminates the need to copy data into and out of userspace. The interface is copied from OpenBSD, and this implementation aims to be compatible. Splicing is enabled by setting the SO_SPLICE socket option. When spliced, data that arrives on the receive buffer is automatically forwarded to the other socket. In particular, splicing is a unidirectional operation; to splice a socket pair in both directions, SO_SPLICE needs to be applied to both sockets. More concretely, when setting the option one passes the following struct: struct splice { int fd; off_t max; struct timveval idle; }; where "fd" refers to the socket to which the first socket is to be spliced, and two setsockopt(SO_SPLICE) calls are required to set up a bi-directional splice. select(), poll() and kevent() do not return when data arrives in the receive buffer of a spliced socket, as such data is expected to be removed automatically once space is available in the corresponding send buffer. Userspace can perform I/O on spliced sockets, but it will be unpredictably interleaved with splice I/O. A splice can be configured to unsplice once a certain number of bytes have been transmitted, or after a given time period. Once unspliced, the socket behaves normally from userspace's perspective. The number of bytes transmitted via the splice can be retrieved using getsockopt(SO_SPLICE); this works after unsplicing as well, up until the socket is closed or spliced again. Userspace can also manually trigger unsplicing by splicing to -1. Splicing work is handled by dedicated threads, similar to KTLS. A worker thread is assigned at splice creation time. At some point it would be nice to have a direct dispatch mode, wherein the thread which places data into a receive buffer is also responsible for pushing it into the sink, but this requires tighter integration with the protocol stack in order to avoid reentrancy problems. Currently, sowakeup() and related functions will signal the worker thread assigned to a spliced socket. so_splice_xfer() does the hard work of moving data between socket buffers. Co-authored by: gallatin Reviewed by: brooks (interface bits) MFC after: 3 months Sponsored by: Klara, Inc. Sponsored by: Stormshield Sponsored by: Netflix Differential Revision: https://reviews.freebsd.org/D46411 --- lib/libsys/getsockopt.2 | 62 ++++- sys/kern/uipc_sockbuf.c | 34 ++- sys/kern/uipc_socket.c | 702 +++++++++++++++++++++++++++++++++++++++++++++++- sys/sys/ktrace.h | 2 + sys/sys/sockbuf.h | 3 +- sys/sys/socket.h | 12 + sys/sys/socketvar.h | 49 +++- 7 files changed, 854 insertions(+), 10 deletions(-) diff --git a/lib/libsys/getsockopt.2 b/lib/libsys/getsockopt.2 index a74bf3a4685e..ca826528ff2f 100644 --- a/lib/libsys/getsockopt.2 +++ b/lib/libsys/getsockopt.2 @@ -25,7 +25,7 @@ .\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF .\" SUCH DAMAGE. .\" -.Dd February 8, 2021 +.Dd July 8, 2024 .Dt GETSOCKOPT 2 .Os .Sh NAME @@ -191,6 +191,7 @@ The following options are recognized in .It Dv SO_MAX_PACING_RATE Ta "set the maximum transmit rate in bytes per second for the socket" .It Dv SO_NO_OFFLOAD Ta "disables protocol offloads" .It Dv SO_NO_DDP Ta "disables direct data placement offload" +.It Dv SO_SPLICE Ta "splice two sockets together" .El .Pp .Dv SO_DEBUG @@ -551,6 +552,56 @@ DDP is an offload supported by Chelsio network adapters that permits reassembled TCP data streams to be received via zero-copy in user-supplied buffers using .Xr aio_read 2 . +.Pp +.Dv SO_SPLICE , +when passed to +.Fn setsockopt , +splices two sockets together using the following +.Fa optval : +.Bd -literal +struct so_splice { + int sp_fd; + off_t sp_max; + struct timeval sp_idle; +}; +.Ed +.Pp +Data received on +.Fa s +will automatically be transmitted from the socket specified in +.Fa sp_fd +without any intervention by userspace. +Splicing is a one-way operation; a given pair of sockets may be +spliced in one or both directions. +Currently only connected +.Xr tcp 4 +sockets may be spliced together. +If +.Fa sp_max +is greater than zero, the socket pair will automatically be unspliced +once that number of bytes have been transmitted. +If +.Fa sp_idle +is non-zero, the socket pair will automatically be unspliced once the +specified amount of time has elapsed since the initial call to +.Fn setsockopt . +If +.Fa sp_fd +is -1, the socket will be unspliced immediately. +.Pp +When passed to +.Fn getsockopt , +the +.Dv SO_SPLICE +option returns a 64-bit integer containing the number of bytes transmitted by +the most recent splice. +That is, while the socket is spliced, the value returned will be the number +of bytes spliced so far. +When unsplicing, this value is saved and is returned until the socket is closed +or spliced again. +For example, if a splice transmits 100 bytes and is then unspliced, a subsequent +.Nm getsockopt +call will return 100 until the socket is spliced again. .Sh RETURN VALUES .Rv -std .Sh ERRORS @@ -618,5 +669,14 @@ and .Fn setsockopt system calls appeared in .Bx 4.2 . +The +.Dv SO_SPLICE +option originated in +.Ox 4.9 +and first appeared in +.Fx 15.0 . +The +.Fx +implementation aims to be source-compatible. .Sh BUGS Several of the socket options should be handled at lower levels of the system. diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c index 6d3050596f23..95c7241d5f13 100644 --- a/sys/kern/uipc_sockbuf.c +++ b/sys/kern/uipc_sockbuf.c @@ -508,6 +508,32 @@ sowakeup(struct socket *so, const sb_which which) SOCK_BUF_UNLOCK_ASSERT(so, which); } +static void +splice_push(struct socket *so) +{ + struct so_splice *sp; + + SOCK_RECVBUF_LOCK_ASSERT(so); + + sp = so->so_splice; + mtx_lock(&sp->mtx); + SOCK_RECVBUF_UNLOCK(so); + so_splice_dispatch(sp); +} + +static void +splice_pull(struct socket *so) +{ + struct so_splice *sp; + + SOCK_SENDBUF_LOCK_ASSERT(so); + + sp = so->so_splice_back; + mtx_lock(&sp->mtx); + SOCK_SENDBUF_UNLOCK(so); + so_splice_dispatch(sp); +} + /* * Do we need to notify the other side when I/O is possible? */ @@ -522,7 +548,9 @@ void sorwakeup_locked(struct socket *so) { SOCK_RECVBUF_LOCK_ASSERT(so); - if (sb_notify(&so->so_rcv)) + if (so->so_rcv.sb_flags & SB_SPLICED) + splice_push(so); + else if (sb_notify(&so->so_rcv)) sowakeup(so, SO_RCV); else SOCK_RECVBUF_UNLOCK(so); @@ -532,7 +560,9 @@ void sowwakeup_locked(struct socket *so) { SOCK_SENDBUF_LOCK_ASSERT(so); - if (sb_notify(&so->so_snd)) + if (so->so_snd.sb_flags & SB_SPLICED) + splice_pull(so); + else if (sb_notify(&so->so_snd)) sowakeup(so, SO_SND); else SOCK_SENDBUF_UNLOCK(so); diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index 42c43539b484..071530925892 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -122,6 +122,7 @@ #include #include #include +#include #include #include #include @@ -133,7 +134,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -159,8 +162,17 @@ #include #endif +static int soreceive_generic_locked(struct socket *so, + struct sockaddr **psa, struct uio *uio, struct mbuf **mp, + struct mbuf **controlp, int *flagsp); static int soreceive_rcvoob(struct socket *so, struct uio *uio, int flags); +static int soreceive_stream_locked(struct socket *so, struct sockbuf *sb, + struct sockaddr **psa, struct uio *uio, struct mbuf **mp, + struct mbuf **controlp, int flags); +static int sosend_generic_locked(struct socket *so, struct sockaddr *addr, + struct uio *uio, struct mbuf *top, struct mbuf *control, + int flags, struct thread *td); static void so_rdknl_lock(void *); static void so_rdknl_unlock(void *); static void so_rdknl_assert_lock(void *, int); @@ -206,6 +218,21 @@ VNET_DEFINE(struct hhook_head *, socket_hhh[HHOOK_SOCKET_LAST + 1]); static inline int hhook_run_socket(struct socket *, void *, int32_t); #endif +#ifdef COMPAT_FREEBSD32 +#ifdef __amd64__ +/* off_t has 4-byte alignment on i386 but not on other 32-bit platforms. */ +#define __splice32_packed __packed +#else +#define __splice32_packed +#endif +struct splice32 { + int32_t sp_fd; + int64_t sp_max; + struct timeval32 sp_idle; +} __splice32_packed; +#undef __splice32_packed +#endif + /* * Limit on the number of connections in the listen queue waiting * for accept(2). @@ -278,6 +305,371 @@ socket_zone_change(void *tag) maxsockets = uma_zone_set_max(socket_zone, maxsockets); } +static int splice_init_state; +static struct sx splice_init_lock; +SX_SYSINIT(splice_init_lock, &splice_init_lock, "splice_init"); + +static SYSCTL_NODE(_kern_ipc, OID_AUTO, splice, CTLFLAG_RW, 0, + "Settings relating to the SO_SPLICE socket option"); + +static bool splice_receive_stream = true; +SYSCTL_BOOL(_kern_ipc_splice, OID_AUTO, receive_stream, CTLFLAG_RWTUN, + &splice_receive_stream, 0, + "Use soreceive_stream() for stream splices"); + +static uma_zone_t splice_zone; +static struct proc *splice_proc; +struct splice_wq { + struct mtx mtx; + STAILQ_HEAD(, so_splice) head; + bool running; +} __aligned(CACHE_LINE_SIZE); +static struct splice_wq *splice_wq; +static uint32_t splice_index = 0; + +static void so_splice_timeout(void *arg, int pending); +static void so_splice_xfer(struct so_splice *s); +static int so_unsplice(struct socket *so, bool timeout); + +static void +splice_work_thread(void *ctx) +{ + struct splice_wq *wq = ctx; + struct so_splice *s, *s_temp; + STAILQ_HEAD(, so_splice) local_head; + int cpu; + + cpu = wq - splice_wq; + if (bootverbose) + printf("starting so_splice worker thread for CPU %d\n", cpu); + + for (;;) { + mtx_lock(&wq->mtx); + while (STAILQ_EMPTY(&wq->head)) { + wq->running = false; + mtx_sleep(wq, &wq->mtx, 0, "-", 0); + wq->running = true; + } + STAILQ_INIT(&local_head); + STAILQ_CONCAT(&local_head, &wq->head); + STAILQ_INIT(&wq->head); + mtx_unlock(&wq->mtx); + STAILQ_FOREACH_SAFE(s, &local_head, next, s_temp) { + mtx_lock(&s->mtx); + CURVNET_SET(s->src->so_vnet); + so_splice_xfer(s); + CURVNET_RESTORE(); + } + } +} + +static void +so_splice_dispatch_async(struct so_splice *sp) +{ + struct splice_wq *wq; + bool running; + + wq = &splice_wq[sp->wq_index]; + mtx_lock(&wq->mtx); + STAILQ_INSERT_TAIL(&wq->head, sp, next); + running = wq->running; + mtx_unlock(&wq->mtx); + if (!running) + wakeup(wq); +} + +void +so_splice_dispatch(struct so_splice *sp) +{ + mtx_assert(&sp->mtx, MA_OWNED); + + if (sp->state != SPLICE_IDLE) { + mtx_unlock(&sp->mtx); + } else { + sp->state = SPLICE_QUEUED; + mtx_unlock(&sp->mtx); + so_splice_dispatch_async(sp); + } +} + +static int +splice_zinit(void *mem, int size __unused, int flags __unused) +{ + struct so_splice *s; + + s = (struct so_splice *)mem; + mtx_init(&s->mtx, "so_splice", NULL, MTX_DEF); + return (0); +} + +static void +splice_zfini(void *mem, int size) +{ + struct so_splice *s; + + s = (struct so_splice *)mem; + mtx_destroy(&s->mtx); +} + +static int +splice_init(void) +{ + struct thread *td; + int error, i, state; + + state = atomic_load_acq_int(&splice_init_state); + if (__predict_true(state > 0)) + return (0); + if (state < 0) + return (ENXIO); + sx_xlock(&splice_init_lock); + if (splice_init_state != 0) { + sx_xunlock(&splice_init_lock); + return (0); + } + + splice_zone = uma_zcreate("splice", sizeof(struct so_splice), NULL, + NULL, splice_zinit, splice_zfini, UMA_ALIGN_CACHE, 0); + + splice_wq = mallocarray(mp_maxid + 1, sizeof(*splice_wq), M_TEMP, + M_WAITOK | M_ZERO); + + /* + * Initialize the workqueues to run the splice work. We create a + * work queue for each CPU. + */ + CPU_FOREACH(i) { + STAILQ_INIT(&splice_wq[i].head); + mtx_init(&splice_wq[i].mtx, "splice work queue", NULL, MTX_DEF); + } + + /* Start kthreads for each workqueue. */ + error = 0; + CPU_FOREACH(i) { + error = kproc_kthread_add(splice_work_thread, &splice_wq[i], + &splice_proc, &td, 0, 0, "so_splice", "thr_%d", i); + if (error) { + printf("Can't add so_splice thread %d error %d\n", + i, error); + break; + } + + /* + * It's possible to create loops with SO_SPLICE; ensure that + * worker threads aren't able to starve the system too easily. + */ + thread_lock(td); + sched_prio(td, PUSER); + thread_unlock(td); + } + + splice_init_state = error != 0 ? -1 : 1; + sx_xunlock(&splice_init_lock); + + return (error); +} + +/* + * Lock a pair of socket's I/O locks for splicing. Avoid blocking while holding + * one lock in order to avoid potential deadlocks in case there is some other + * code path which acquires more than one I/O lock at a time. + */ +static void +splice_lock_pair(struct socket *so_src, struct socket *so_dst) +{ + int error; + + for (;;) { + error = SOCK_IO_SEND_LOCK(so_dst, SBL_WAIT | SBL_NOINTR); + KASSERT(error == 0, + ("%s: failed to lock send I/O lock: %d", __func__, error)); + error = SOCK_IO_RECV_LOCK(so_src, 0); + KASSERT(error == 0 || error == EWOULDBLOCK, + ("%s: failed to lock recv I/O lock: %d", __func__, error)); + if (error == 0) + break; + SOCK_IO_SEND_UNLOCK(so_dst); + + error = SOCK_IO_RECV_LOCK(so_src, SBL_WAIT | SBL_NOINTR); + KASSERT(error == 0, + ("%s: failed to lock recv I/O lock: %d", __func__, error)); + error = SOCK_IO_SEND_LOCK(so_dst, 0); + KASSERT(error == 0 || error == EWOULDBLOCK, + ("%s: failed to lock send I/O lock: %d", __func__, error)); + if (error == 0) + break; + SOCK_IO_RECV_UNLOCK(so_src); + } +} + +static void +splice_unlock_pair(struct socket *so_src, struct socket *so_dst) +{ + SOCK_IO_RECV_UNLOCK(so_src); + SOCK_IO_SEND_UNLOCK(so_dst); +} + +/* + * Move data from the source to the sink. Assumes that both of the relevant + * socket I/O locks are held. + */ +static int +so_splice_xfer_data(struct socket *so_src, struct socket *so_dst, off_t max, + ssize_t *lenp) +{ + struct uio uio; + struct mbuf *m; + struct sockbuf *sb_src, *sb_dst; + ssize_t len; + long space; + int error, flags; + + SOCK_IO_RECV_ASSERT_LOCKED(so_src); + SOCK_IO_SEND_ASSERT_LOCKED(so_dst); + + error = 0; + m = NULL; + memset(&uio, 0, sizeof(uio)); + + sb_src = &so_src->so_rcv; + sb_dst = &so_dst->so_snd; + + space = sbspace(sb_dst); + if (space < 0) + space = 0; + len = MIN(max, MIN(space, sbavail(sb_src))); + if (len == 0) { + SOCK_RECVBUF_LOCK(so_src); + if ((sb_src->sb_state & SBS_CANTRCVMORE) != 0) + error = EPIPE; + SOCK_RECVBUF_UNLOCK(so_src); + } else { + flags = MSG_DONTWAIT; + uio.uio_resid = len; + if (splice_receive_stream && sb_src->sb_tls_info == NULL) { + error = soreceive_stream_locked(so_src, sb_src, NULL, + &uio, &m, NULL, flags); + } else { + error = soreceive_generic_locked(so_src, NULL, + &uio, &m, NULL, &flags); + } + if (error != 0 && m != NULL) { + m_freem(m); + m = NULL; + } + } + if (m != NULL) { + len -= uio.uio_resid; + error = sosend_generic_locked(so_dst, NULL, NULL, m, NULL, + MSG_DONTWAIT, curthread); + } else if (error == 0) { + len = 0; + SOCK_SENDBUF_LOCK(so_dst); + if ((sb_dst->sb_state & SBS_CANTSENDMORE) != 0) + error = EPIPE; + SOCK_SENDBUF_UNLOCK(so_dst); + } + if (error == 0) + *lenp = len; + return (error); +} + +/* + * Transfer data from the source to the sink. + * + * If "direct" is true, the transfer is done in the context of whichever thread + * is operating on one of the socket buffers. We do not know which locks are + * held, so we can only trylock the socket buffers; if this fails, we fall back + * to the worker thread, which invokes this routine with "direct" set to false. + */ +static void +so_splice_xfer(struct so_splice *sp) +{ + struct socket *so_src, *so_dst; + off_t max; + ssize_t len; + int error; + + mtx_assert(&sp->mtx, MA_OWNED); + KASSERT(sp->state == SPLICE_QUEUED || sp->state == SPLICE_CLOSING, + ("so_splice_xfer: invalid state %d", sp->state)); + KASSERT(sp->max != 0, ("so_splice_xfer: max == 0")); + + if (sp->state == SPLICE_CLOSING) { + /* Userspace asked us to close the splice. */ + goto closing; + } + + sp->state = SPLICE_RUNNING; + so_src = sp->src; + so_dst = sp->dst; + max = sp->max > 0 ? sp->max - so_src->so_splice_sent : OFF_MAX; + if (max < 0) + max = 0; + + /* + * Lock the sockets in order to block userspace from doing anything + * sneaky. If an error occurs or one of the sockets can no longer + * transfer data, we will automatically unsplice. + */ + mtx_unlock(&sp->mtx); + splice_lock_pair(so_src, so_dst); + + error = so_splice_xfer_data(so_src, so_dst, max, &len); + + mtx_lock(&sp->mtx); + + /* + * Update our stats while still holding the socket locks. This + * synchronizes with getsockopt(SO_SPLICE), see the comment there. + */ + if (error == 0) { + KASSERT(len >= 0, ("%s: len %zd < 0", __func__, len)); + so_src->so_splice_sent += len; + } + splice_unlock_pair(so_src, so_dst); + + switch (sp->state) { + case SPLICE_CLOSING: +closing: + sp->state = SPLICE_CLOSED; + wakeup(sp); + mtx_unlock(&sp->mtx); + break; + case SPLICE_RUNNING: + if (error != 0 || + (sp->max > 0 && so_src->so_splice_sent >= sp->max)) { + sp->state = SPLICE_EXCEPTION; + soref(so_src); + mtx_unlock(&sp->mtx); + (void)so_unsplice(so_src, false); + sorele(so_src); + } else { + /* + * Locklessly check for additional bytes in the source's + * receive buffer and queue more work if possible. We + * may end up queuing needless work, but that's ok, and + * if we race with a thread inserting more data into the + * buffer and observe sbavail() == 0, the splice mutex + * ensures that splice_push() will queue more work for + * us. + */ + if (sbavail(&so_src->so_rcv) > 0 && + sbspace(&so_dst->so_snd) > 0) { + sp->state = SPLICE_QUEUED; + mtx_unlock(&sp->mtx); + so_splice_dispatch_async(sp); + } else { + sp->state = SPLICE_IDLE; + mtx_unlock(&sp->mtx); + } + } + break; + default: + __assert_unreachable(); + } +} + static void socket_init(void *tag) { @@ -1213,6 +1605,219 @@ solisten_dequeue(struct socket *head, struct socket **ret, int flags) return (0); } +static struct so_splice * +so_splice_alloc(off_t max) +{ + struct so_splice *sp; + + sp = uma_zalloc(splice_zone, M_WAITOK); + sp->src = NULL; + sp->dst = NULL; + sp->max = max > 0 ? max : -1; + do { + sp->wq_index = atomic_fetchadd_32(&splice_index, 1) % + (mp_maxid + 1); + } while (CPU_ABSENT(sp->wq_index)); + sp->state = SPLICE_IDLE; + TIMEOUT_TASK_INIT(taskqueue_thread, &sp->timeout, 0, so_splice_timeout, + sp); + return (sp); +} + +static void +so_splice_free(struct so_splice *sp) +{ + KASSERT(sp->state == SPLICE_CLOSED, + ("so_splice_free: sp %p not closed", sp)); + uma_zfree(splice_zone, sp); +} + +static void +so_splice_timeout(void *arg, int pending __unused) +{ + struct so_splice *sp; + + sp = arg; + (void)so_unsplice(sp->src, true); +} + +/* + * Splice the output from so to the input of so2. + */ +static int +so_splice(struct socket *so, struct socket *so2, struct splice *splice) +{ + struct so_splice *sp; + int error; + + if (splice->sp_max < 0) + return (EINVAL); + /* Handle only TCP for now; TODO: other streaming protos */ + if (so->so_proto->pr_protocol != IPPROTO_TCP || + so2->so_proto->pr_protocol != IPPROTO_TCP) + return (EPROTONOSUPPORT); + if (so->so_vnet != so2->so_vnet) + return (EINVAL); + + /* so_splice_xfer() assumes that we're using these implementations. */ + KASSERT(so->so_proto->pr_sosend == sosend_generic, + ("so_splice: sosend not sosend_generic")); + KASSERT(so2->so_proto->pr_soreceive == soreceive_generic || + so2->so_proto->pr_soreceive == soreceive_stream, + ("so_splice: soreceive not soreceive_generic/stream")); + + sp = so_splice_alloc(splice->sp_max); + so->so_splice_sent = 0; + sp->src = so; + sp->dst = so2; + + error = 0; + SOCK_LOCK(so); + if (SOLISTENING(so)) + error = EINVAL; + else if ((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0) + error = ENOTCONN; + else if (so->so_splice != NULL) + error = EBUSY; + if (error != 0) { + SOCK_UNLOCK(so); + uma_zfree(splice_zone, sp); + return (error); + } + soref(so); + so->so_splice = sp; + SOCK_RECVBUF_LOCK(so); + so->so_rcv.sb_flags |= SB_SPLICED; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + + error = 0; + SOCK_LOCK(so2); + if (SOLISTENING(so2)) + error = EINVAL; + else if ((so2->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0) + error = ENOTCONN; + else if (so2->so_splice_back != NULL) + error = EBUSY; + if (error != 0) { + SOCK_UNLOCK(so2); + SOCK_LOCK(so); + so->so_splice = NULL; + SOCK_RECVBUF_LOCK(so); + so->so_rcv.sb_flags &= ~SB_SPLICED; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + sorele(so); + uma_zfree(splice_zone, sp); + return (error); + } + soref(so2); + so2->so_splice_back = sp; + SOCK_SENDBUF_LOCK(so2); + so2->so_snd.sb_flags |= SB_SPLICED; + mtx_lock(&sp->mtx); + SOCK_SENDBUF_UNLOCK(so2); + SOCK_UNLOCK(so2); + + if (splice->sp_idle.tv_sec != 0 || splice->sp_idle.tv_usec != 0) { + taskqueue_enqueue_timeout_sbt(taskqueue_thread, &sp->timeout, + tvtosbt(splice->sp_idle), 0, C_PREL(4)); + } + + /* + * Transfer any data already present in the socket buffer. + */ + sp->state = SPLICE_QUEUED; + so_splice_xfer(sp); + return (0); +} + +static int +so_unsplice(struct socket *so, bool timeout) +{ + struct socket *so2; + struct so_splice *sp; + bool drain; + + /* + * First unset SB_SPLICED and hide the splice structure so that + * wakeup routines will stop enqueuing work. This also ensures that + * a only a single thread will proceed with the unsplice. + */ + SOCK_LOCK(so); + if (SOLISTENING(so)) { + SOCK_UNLOCK(so); + return (EINVAL); + } + SOCK_RECVBUF_LOCK(so); + if ((so->so_rcv.sb_flags & SB_SPLICED) == 0) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + return (ENOTCONN); + } + so->so_rcv.sb_flags &= ~SB_SPLICED; + sp = so->so_splice; + so->so_splice = NULL; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + + so2 = sp->dst; + SOCK_LOCK(so2); + KASSERT(!SOLISTENING(so2), ("%s: so2 is listening", __func__)); + SOCK_SENDBUF_LOCK(so2); + KASSERT((so2->so_snd.sb_flags & SB_SPLICED) != 0, + ("%s: so2 is not spliced", __func__)); + KASSERT(so2->so_splice_back == sp, + ("%s: so_splice_back != sp", __func__)); + so2->so_snd.sb_flags &= ~SB_SPLICED; + so2->so_splice_back = NULL; + SOCK_SENDBUF_UNLOCK(so2); + SOCK_UNLOCK(so2); + + /* + * No new work is being enqueued. The worker thread might be + * splicing data right now, in which case we want to wait for it to + * finish before proceeding. + */ + mtx_lock(&sp->mtx); + switch (sp->state) { + case SPLICE_QUEUED: + case SPLICE_RUNNING: + sp->state = SPLICE_CLOSING; + while (sp->state == SPLICE_CLOSING) + msleep(sp, &sp->mtx, PSOCK, "unsplice", 0); + break; + case SPLICE_IDLE: + case SPLICE_EXCEPTION: + sp->state = SPLICE_CLOSED; + break; + default: + __assert_unreachable(); + } + if (!timeout) { + drain = taskqueue_cancel_timeout(taskqueue_thread, &sp->timeout, + NULL) != 0; + } else { + drain = false; + } + mtx_unlock(&sp->mtx); + if (drain) + taskqueue_drain_timeout(taskqueue_thread, &sp->timeout); + + /* + * Now we hold the sole reference to the splice structure. + * Clean up: signal userspace and release socket references. + */ + sorwakeup(so); + CURVNET_SET(so->so_vnet); + sorele(so); + sowwakeup(so2); + sorele(so2); + CURVNET_RESTORE(); + so_splice_free(sp); + return (0); +} + /* * Free socket upon release of the very last reference. */ @@ -1226,6 +1831,12 @@ sofree(struct socket *so) ("%s: so %p has references", __func__, so)); KASSERT(SOLISTENING(so) || so->so_qstate == SQ_NONE, ("%s: so %p is on listen queue", __func__, so)); + KASSERT(SOLISTENING(so) || (so->so_rcv.sb_flags & SB_SPLICED) == 0, + ("%s: so %p rcvbuf is spliced", __func__, so)); + KASSERT(SOLISTENING(so) || (so->so_snd.sb_flags & SB_SPLICED) == 0, + ("%s: so %p sndbuf is spliced", __func__, so)); + KASSERT(so->so_splice == NULL && so->so_splice_back == NULL, + ("%s: so %p has spliced data", __func__, so)); SOCK_UNLOCK(so); @@ -3318,6 +3929,59 @@ sosetopt(struct socket *so, struct sockopt *sopt) so->so_max_pacing_rate = val32; break; + case SO_SPLICE: { + struct splice splice; + +#ifdef COMPAT_FREEBSD32 + if (SV_CURPROC_FLAG(SV_ILP32)) { + struct splice32 splice32; + + error = sooptcopyin(sopt, &splice32, + sizeof(splice32), sizeof(splice32)); + if (error == 0) { + splice.sp_fd = splice32.sp_fd; + splice.sp_max = splice32.sp_max; + CP(splice32.sp_idle, splice.sp_idle, + tv_sec); + CP(splice32.sp_idle, splice.sp_idle, + tv_usec); + } + } else +#endif + { + error = sooptcopyin(sopt, &splice, + sizeof(splice), sizeof(splice)); + } + if (error) + goto bad; + ktrsplice(&splice); + + error = splice_init(); + if (error != 0) + goto bad; + + if (splice.sp_fd >= 0) { + struct file *fp; + struct socket *so2; + + if (!cap_rights_contains(sopt->sopt_rights, + &cap_recv_rights)) { + error = ENOTCAPABLE; + goto bad; + } + error = getsock(sopt->sopt_td, splice.sp_fd, + &cap_send_rights, &fp); + if (error != 0) + goto bad; + so2 = fp->f_data; + + error = so_splice(so, so2, &splice); + fdrop(fp, sopt->sopt_td); + } else { + error = so_unsplice(so, false); + } + break; + } default: #ifdef SOCKET_HHOOK if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0) @@ -3537,6 +4201,33 @@ integer: optval = so->so_max_pacing_rate; goto integer; + case SO_SPLICE: { + off_t n; + + /* + * Acquire the I/O lock to serialize with + * so_splice_xfer(). This is not required for + * correctness, but makes testing simpler: once a byte + * has been transmitted to the sink and observed (e.g., + * by reading from the socket to which the sink is + * connected), a subsequent getsockopt(SO_SPLICE) will + * return an up-to-date value. + */ + error = SOCK_IO_RECV_LOCK(so, SBL_WAIT); + if (error != 0) + goto bad; + SOCK_LOCK(so); + if (SOLISTENING(so)) { + n = 0; + } else { + n = so->so_splice_sent; + } + SOCK_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + error = sooptcopyout(sopt, &n, sizeof(n)); + break; + } *** 224 LINES SKIPPED ***