git: c62ae124cc78 - main - rpc: limited multithread support for svc_nl
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Sat, 01 Feb 2025 09:02:27 UTC
The branch main has been updated by glebius: URL: https://cgit.FreeBSD.org/src/commit/?id=c62ae124cc783bc43ae9fa539e58683c46cc35c1 commit c62ae124cc783bc43ae9fa539e58683c46cc35c1 Author: Gleb Smirnoff <glebius@FreeBSD.org> AuthorDate: 2025-02-01 01:03:18 +0000 Commit: Gleb Smirnoff <glebius@FreeBSD.org> CommitDate: 2025-02-01 09:00:28 +0000 rpc: limited multithread support for svc_nl The rpc(3) itself was not designed with multithreading in mind, but we can actually achieve some parallelism without modifying the library and the framework. This transport will allow to process RPCs in threads, with some hacks on the application side (documented in code). We make reentrable only one method - SVC_REPLY(). Reading and parsing of incoming calls is still done synchronously. But the actual processing of the calls can be offloaded to a thread, and once finished the thread can safely execute svc_sendreply() and the reply would be sent with the correct xid. Differential Revision: https://reviews.freebsd.org/D48569 --- include/rpc/svc.h | 7 ++++ lib/libc/rpc/svc_nl.c | 100 +++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 93 insertions(+), 14 deletions(-) diff --git a/include/rpc/svc.h b/include/rpc/svc.h index deb626ad7c1d..99e2fcee4121 100644 --- a/include/rpc/svc.h +++ b/include/rpc/svc.h @@ -461,6 +461,13 @@ extern SVCXPRT *svcunixfd_create(int, u_int, u_int); */ extern SVCXPRT *svc_nl_create(const char *); +/* + * Arguments to SVC_CONTROL(svc_nl) + */ +enum { + SVCNL_GET_XIDKEY = 1, /* obtain pthread specific key for xid */ +}; + /* * Memory based rpc (for speed check and testing) */ diff --git a/lib/libc/rpc/svc_nl.c b/lib/libc/rpc/svc_nl.c index f866acaf3015..5cce44d9d98f 100644 --- a/lib/libc/rpc/svc_nl.c +++ b/lib/libc/rpc/svc_nl.c @@ -28,6 +28,7 @@ #include <stdlib.h> #include <unistd.h> #include <errno.h> +#include <pthread.h> #include <rpc/rpc.h> #include <rpc/clnt_nl.h> @@ -37,6 +38,7 @@ #include <netlink/netlink_snl_generic.h> #include "rpc_com.h" +#include "libc_private.h" /* * RPC server to serve a kernel RPC client(s) over netlink(4). See clnt_nl.c @@ -54,6 +56,7 @@ static bool_t svc_nl_reply(SVCXPRT *, struct rpc_msg *); static enum xprt_stat svc_nl_stat(SVCXPRT *); static bool_t svc_nl_getargs(SVCXPRT *, xdrproc_t, void *); static bool_t svc_nl_freeargs(SVCXPRT *, xdrproc_t, void *); +static bool_t svc_nl_control(SVCXPRT *, const u_int, void *); static struct xp_ops nl_ops = { .xp_recv = svc_nl_recv, @@ -63,11 +66,15 @@ static struct xp_ops nl_ops = { .xp_freeargs = svc_nl_freeargs, .xp_destroy = svc_nl_destroy, }; +static struct xp_ops2 nl_ops2 = { + .xp_control = svc_nl_control, +}; struct nl_softc { struct snl_state snl; XDR xdrs; struct nlmsghdr *hdr; + pthread_key_t xidkey; size_t mlen; enum xprt_stat stat; uint32_t xid; @@ -108,9 +115,15 @@ svc_nl_create(const char *service) sc->stat = XPRT_IDLE, sc->family = family; + if (__isthreaded && + (pthread_key_create(&sc->xidkey, NULL) != 0 || + pthread_setspecific(sc->xidkey, &sc->xid) != 0)) + goto fail; + xprt->xp_fd = sc->snl.fd, xprt->xp_p1 = sc, xprt->xp_ops = &nl_ops, + xprt->xp_ops2 = &nl_ops2, xprt->xp_rtaddr = (struct netbuf){ .maxlen = sizeof(struct sockaddr_nl), .len = sizeof(struct sockaddr_nl), @@ -208,26 +221,62 @@ svc_nl_recv(SVCXPRT *xprt, struct rpc_msg *msg) return (FALSE); } +/* + * Reenterable reply method. Note that both the softc and xprt are declared + * const. The qualifier for xprt is commented out to match the library + * prototype. If doing any substantial changes to the function please + * temporarily uncomment the const for xprt and check your changes. + * + * Applications that want to use svc_nl_reply in a spawned thread context + * should do the following hacks in self: + * 1) - Create xprt with svc_nl_create() with libc in threaded mode, e.g. + * at least one pthread_create() shall happen before svc_nl_create(). + * - After xprt creation query it for the pthread_key_t with the + * SVCNL_GET_XIDKEY control and save this key. + * 2) In the RPC function body that wants to become multithreaded: + * - Make a copy of the arguments and of the xid with help of + * pthread_getspecific() using the key. + * - pthread_create() the worker function, pointing it at the copy of + * arguments and xid. + * - return FALSE, so that RPC generated code doesn't do anything. + * 3) In the spawned thread: + * - Use arguments provided in the copy by the parent. + * - Allocate appropriately typed result on stack. + * - *** do the actual work *** + * - Populate the on-stack result same way as pointed result is populated + * in a regular RPC function. + * - Point the thread specific storage to the copy of xid provided by the + * parent with help of pthread_setspecific(). + * - Call svc_sendreply() just like the rpcgen(1) generated code does. + * + * If all done correctly svc_nl_reply() will use thread specific xid for + * a call that was processed asynchronously and most recent xid when entered + * synchronously. So you can make only some methods of your application + * reentrable, keeping others as is. + */ + static bool_t -svc_nl_reply(SVCXPRT *xprt, struct rpc_msg *msg) +svc_nl_reply(/* const */ SVCXPRT *xprt, struct rpc_msg *msg) { - struct nl_softc *sc = xprt->xp_p1; + const struct nl_softc *sc = xprt->xp_p1; struct snl_state snl; struct snl_writer nw; + XDR xdrs; struct nlattr *body; bool_t rv; - msg->rm_xid = sc->xid; + msg->rm_xid = __isthreaded ? + *(uint32_t *)pthread_getspecific(sc->xidkey) : + sc->xid; if (__predict_false(!snl_clone(&snl, &sc->snl))) return (FALSE); - snl_init_writer(&sc->snl, &nw); + snl_init_writer(&snl, &nw); snl_create_genl_msg_request(&nw, sc->family, RPCNL_REPLY); snl_add_msg_attr_u32(&nw, RPCNL_REPLY_GROUP, sc->group); body = snl_reserve_msg_attr_raw(&nw, RPCNL_REPLY_BODY, RPC_MAXDATASIZE); - xdrmem_create(&sc->xdrs, (char *)(body + 1), RPC_MAXDATASIZE, - XDR_ENCODE); + xdrmem_create(&xdrs, (char *)(body + 1), RPC_MAXDATASIZE, XDR_ENCODE); if (msg->rm_reply.rp_stat == MSG_ACCEPTED && msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { @@ -240,28 +289,28 @@ svc_nl_reply(SVCXPRT *xprt, struct rpc_msg *msg) msg->acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; msg->acpted_rply.ar_results.where = NULL; - pos = xdr_getpos(&sc->xdrs); - if (!xdr_replymsg(&sc->xdrs, msg) || - !SVCAUTH_WRAP(&SVC_AUTH(xprt), &sc->xdrs, xdr_proc, + pos = xdr_getpos(&xdrs); + if (!xdr_replymsg(&xdrs, msg) || + !SVCAUTH_WRAP(&SVC_AUTH(xprt), &xdrs, xdr_proc, xdr_where)) { - xdr_setpos(&sc->xdrs, pos); + xdr_setpos(&xdrs, pos); rv = FALSE; } else rv = TRUE; } else - rv = xdr_replymsg(&sc->xdrs, msg); + rv = xdr_replymsg(&xdrs, msg); if (rv) { /* snl_finalize_msg() really doesn't work for us */ - body->nla_len = sizeof(struct nlattr) + xdr_getpos(&sc->xdrs); + body->nla_len = sizeof(struct nlattr) + xdr_getpos(&xdrs); nw.hdr->nlmsg_len = ((char *)body - (char *)nw.hdr) + body->nla_len; nw.hdr->nlmsg_type = sc->family; nw.hdr->nlmsg_flags = NLM_F_REQUEST; - nw.hdr->nlmsg_seq = sc->xid; + nw.hdr->nlmsg_seq = msg->rm_xid; if (write(xprt->xp_fd, nw.hdr, nw.hdr->nlmsg_len) != nw.hdr->nlmsg_len) - DIE(sc); + DIE(__DECONST(struct nl_softc *, sc)); } snl_free(&snl); @@ -269,6 +318,29 @@ svc_nl_reply(SVCXPRT *xprt, struct rpc_msg *msg) return (rv); } +static bool_t +svc_nl_control(SVCXPRT *xprt, const u_int req, void *v) +{ + struct nl_softc *sc = xprt->xp_p1; + + switch (req) { + case SVCNL_GET_XIDKEY: + if (!__isthreaded) { + /* + * Report to application that it had created xprt not + * in threaded mode, but definitly plans to use it with + * threads. If it tries so, it would very likely crash. + */ + errno = EDOOFUS; + DIE(sc); + }; + *(pthread_key_t *)v = sc->xidkey; + return (TRUE); + default: + return (FALSE); + } +} + static enum xprt_stat svc_nl_stat(SVCXPRT *xprt) {