git: 660bd40a598a - main - netlink: use domain specific send buffer

From: Gleb Smirnoff <glebius_at_FreeBSD.org>
Date: Tue, 02 Jan 2024 21:06:54 UTC
The branch main has been updated by glebius:

URL: https://cgit.FreeBSD.org/src/commit/?id=660bd40a598a498ad850911772fcff3f2511875a

commit 660bd40a598a498ad850911772fcff3f2511875a
Author:     Gleb Smirnoff <glebius@FreeBSD.org>
AuthorDate: 2024-01-02 21:03:21 +0000
Commit:     Gleb Smirnoff <glebius@FreeBSD.org>
CommitDate: 2024-01-02 21:03:21 +0000

    netlink: use domain specific send buffer
    
    Instead of using generic socket code, create Netlink specific socket
    buffer.  It is a simple TAILQ of writes that came from userland.  This
    saves us one memory allocation that could fail and one memory copy.
    
    Reviewed by:            melifaro
    Differential Revision:  https://reviews.freebsd.org/D42522
---
 sys/netlink/netlink_domain.c | 102 +++++++++++++++++++++++--------
 sys/netlink/netlink_io.c     | 139 ++++++++++++-------------------------------
 sys/netlink/netlink_var.h    |  12 +++-
 sys/sys/sockbuf.h            |   6 ++
 4 files changed, 129 insertions(+), 130 deletions(-)

diff --git a/sys/netlink/netlink_domain.c b/sys/netlink/netlink_domain.c
index 38f4a2dfed94..ecd110d62c1f 100644
--- a/sys/netlink/netlink_domain.c
+++ b/sys/netlink/netlink_domain.c
@@ -47,6 +47,7 @@
 #include <sys/sysent.h>
 #include <sys/syslog.h>
 #include <sys/priv.h> /* priv_check */
+#include <sys/uio.h>
 
 #include <netlink/netlink.h>
 #include <netlink/netlink_ctl.h>
@@ -330,6 +331,8 @@ nl_pru_attach(struct socket *so, int proto, struct thread *td)
 		free(nlp, M_PCB);
 		return (error);
 	}
+	so->so_rcv.sb_mtx = &so->so_rcv_mtx;
+	TAILQ_INIT(&so->so_snd.nl_queue);
 	so->so_pcb = nlp;
 	nlp->nl_socket = so;
 	/* Copy so_cred to avoid having socket_var.h in every header */
@@ -337,7 +340,6 @@ nl_pru_attach(struct socket *so, int proto, struct thread *td)
 	nlp->nl_proto = proto;
 	nlp->nl_process_id = curproc->p_pid;
 	nlp->nl_linux = is_linux;
-	nlp->nl_active = true;
 	nlp->nl_unconstrained_vnet = !jailed_without_vnet(so->so_cred);
 	nlp->nl_need_thread_setup = true;
 	NLP_LOCK_INIT(nlp);
@@ -491,6 +493,7 @@ nl_close(struct socket *so)
 	struct nl_control *ctl = atomic_load_ptr(&V_nl_ctl);
 	MPASS(sotonlpcb(so) != NULL);
 	struct nlpcb *nlp;
+	struct nl_buf *nb;
 
 	NL_LOG(LOG_DEBUG2, "detaching socket %p, PID %d", so, curproc->p_pid);
 	nlp = sotonlpcb(so);
@@ -498,7 +501,6 @@ nl_close(struct socket *so)
 	/* Mark as inactive so no new work can be enqueued */
 	NLP_LOCK(nlp);
 	bool was_bound = nlp->nl_bound;
-	nlp->nl_active = false;
 	NLP_UNLOCK(nlp);
 
 	/* Wait till all scheduled work has been completed  */
@@ -518,6 +520,12 @@ nl_close(struct socket *so)
 
 	so->so_pcb = NULL;
 
+	while ((nb = TAILQ_FIRST(&so->so_snd.nl_queue)) != NULL) {
+		TAILQ_REMOVE(&so->so_snd.nl_queue, nb, tailq);
+		free(nb, M_NETLINK);
+	}
+	sbdestroy(so, SO_RCV);
+
 	NL_LOG(LOG_DEBUG3, "socket %p, detached", so);
 
 	/* XXX: is delayed free needed? */
@@ -556,36 +564,79 @@ nl_sockaddr(struct socket *so, struct sockaddr *sa)
 }
 
 static int
-nl_pru_output(struct mbuf *m, struct socket *so, ...)
+nl_sosend(struct socket *so, struct sockaddr *addr, struct uio *uio,
+    struct mbuf *m, struct mbuf *control, int flags, struct thread *td)
 {
+	struct nlpcb *nlp = sotonlpcb(so);
+	struct sockbuf *sb = &so->so_snd;
+	struct nl_buf *nb;
+	u_int len;
+	int error;
 
-	if (__predict_false(m == NULL ||
-	    ((m->m_len < sizeof(struct nlmsghdr)) &&
-		(m = m_pullup(m, sizeof(struct nlmsghdr))) == NULL)))
-		return (ENOBUFS);
-	MPASS((m->m_flags & M_PKTHDR) != 0);
-
-	NL_LOG(LOG_DEBUG3, "sending message to kernel async processing");
-	nl_receive_async(m, so);
-	return (0);
-}
-
+	MPASS(m == NULL && uio != NULL);
 
-static int
-nl_pru_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *sa,
-    struct mbuf *control, struct thread *td)
-{
         NL_LOG(LOG_DEBUG2, "sending message to kernel");
 
 	if (__predict_false(control != NULL)) {
-		if (control->m_len) {
-			m_freem(control);
-			return (EINVAL);
-		}
 		m_freem(control);
+		return (EINVAL);
 	}
 
-	return (nl_pru_output(m, so));
+	if (__predict_false(flags & MSG_OOB))	/* XXXGL: or just ignore? */
+		return (EOPNOTSUPP);
+
+	if (__predict_false(uio->uio_resid < sizeof(struct nlmsghdr)))
+		return (ENOBUFS);		/* XXXGL: any better error? */
+
+	NL_LOG(LOG_DEBUG3, "sending message to kernel async processing");
+
+	error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
+	if (error)
+		return (error);
+
+	len = roundup2(uio->uio_resid, 8) + SCRATCH_BUFFER_SIZE;
+	if (nlp->nl_linux)
+		len += roundup2(uio->uio_resid, 8);
+	nb = malloc(sizeof(*nb) + len, M_NETLINK, M_WAITOK);
+	nb->datalen = uio->uio_resid;
+	nb->buflen = len;
+	nb->offset = 0;
+	error = uiomove(&nb->data[0], uio->uio_resid, uio);
+	if (__predict_false(error))
+		goto out;
+
+	SOCK_SENDBUF_LOCK(so);
+restart:
+	if (sb->sb_hiwat - sb->sb_ccc >= nb->datalen) {
+		TAILQ_INSERT_TAIL(&sb->nl_queue, nb, tailq);
+		sb->sb_acc += nb->datalen;
+		sb->sb_ccc += nb->datalen;
+		nb = NULL;
+	} else if ((so->so_state & SS_NBIO) ||
+	    (flags & (MSG_NBIO | MSG_DONTWAIT)) != 0) {
+		SOCK_SENDBUF_UNLOCK(so);
+		error = EWOULDBLOCK;
+		goto out;
+	} else {
+		if ((error = sbwait(so, SO_SND)) != 0) {
+			SOCK_SENDBUF_UNLOCK(so);
+			goto out;
+		} else
+			goto restart;
+	}
+	SOCK_SENDBUF_UNLOCK(so);
+
+	if (nb == NULL) {
+		NL_LOG(LOG_DEBUG3, "enqueue %u bytes", nb->datalen);
+		NLP_LOCK(nlp);
+		nl_schedule_taskqueue(nlp);
+		NLP_UNLOCK(nlp);
+	}
+
+out:
+	SOCK_IO_SEND_UNLOCK(so);
+	free(nb, M_NETLINK);
+	return (error);
 }
 
 static int
@@ -747,14 +798,15 @@ nl_setsbopt(struct socket *so, struct sockopt *sopt)
 }
 
 #define	NETLINK_PROTOSW						\
-	.pr_flags = PR_ATOMIC | PR_ADDR | PR_WANTRCVD,		\
+	.pr_flags = PR_ATOMIC | PR_ADDR | PR_WANTRCVD |		\
+	    PR_SOCKBUF,						\
 	.pr_ctloutput = nl_ctloutput,				\
 	.pr_setsbopt = nl_setsbopt,				\
 	.pr_attach = nl_pru_attach,				\
 	.pr_bind = nl_pru_bind,					\
 	.pr_connect = nl_pru_connect,				\
 	.pr_disconnect = nl_pru_disconnect,			\
-	.pr_send = nl_pru_send,					\
+	.pr_sosend = nl_sosend,					\
 	.pr_rcvd = nl_pru_rcvd,					\
 	.pr_shutdown = nl_pru_shutdown,				\
 	.pr_sockaddr = nl_sockaddr,				\
diff --git a/sys/netlink/netlink_io.c b/sys/netlink/netlink_io.c
index 3fe01bb443a1..7e2e098e4a9a 100644
--- a/sys/netlink/netlink_io.c
+++ b/sys/netlink/netlink_io.c
@@ -58,8 +58,7 @@ static const struct sockaddr_nl _nl_empty_src = {
 };
 static const struct sockaddr *nl_empty_src = (const struct sockaddr *)&_nl_empty_src;
 
-static struct mbuf *nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp);
-
+static bool nl_process_nbuf(struct nl_buf *nb, struct nlpcb *nlp);
 
 static void
 queue_push(struct nl_io_queue *q, struct mbuf *mq)
@@ -74,15 +73,6 @@ queue_push(struct nl_io_queue *q, struct mbuf *mq)
 	}
 }
 
-static void
-queue_push_head(struct nl_io_queue *q, struct mbuf *m)
-{
-	MPASS(m->m_nextpkt == NULL);
-
-	q->length += m_length(m, NULL);
-	STAILQ_INSERT_HEAD(&q->head, m, m_stailqpkt);
-}
-
 static struct mbuf *
 queue_pop(struct nl_io_queue *q)
 {
@@ -172,7 +162,7 @@ extract_msg_info(struct mbuf *m)
 	return (NULL);
 }
 
-static void
+void
 nl_schedule_taskqueue(struct nlpcb *nlp)
 {
 	if (!nlp->nl_task_pending) {
@@ -184,32 +174,6 @@ nl_schedule_taskqueue(struct nlpcb *nlp)
 	}
 }
 
-int
-nl_receive_async(struct mbuf *m, struct socket *so)
-{
-	struct nlpcb *nlp = sotonlpcb(so);
-	int error = 0;
-
-	m->m_nextpkt = NULL;
-
-	NLP_LOCK(nlp);
-
-	if ((__predict_true(nlp->nl_active))) {
-		sbappend(&so->so_snd, m, 0);
-		NL_LOG(LOG_DEBUG3, "enqueue %u bytes", m_length(m, NULL));
-		nl_schedule_taskqueue(nlp);
-	} else {
-		NL_LOG(LOG_DEBUG, "ignoring %u bytes on non-active socket",
-		    m_length(m, NULL));
-		m_free(m);
-		error = EINVAL;
-	}
-
-	NLP_UNLOCK(nlp);
-
-	return (error);
-}
-
 static bool
 tx_check_locked(struct nlpcb *nlp)
 {
@@ -252,6 +216,9 @@ tx_check_locked(struct nlpcb *nlp)
 static bool
 nl_process_received_one(struct nlpcb *nlp)
 {
+	struct socket *so = nlp->nl_socket;
+	struct sockbuf *sb = &so->so_snd;
+	struct nl_buf *nb;
 	bool reschedule = false;
 
 	NLP_LOCK(nlp);
@@ -263,39 +230,28 @@ nl_process_received_one(struct nlpcb *nlp)
 		return (false);
 	}
 
-	if (queue_empty(&nlp->rx_queue)) {
-		/*
-		 * Grab all data we have from the socket TX queue
-		 * and store it the internal queue, so it can be worked on
-		 * w/o holding socket lock.
-		 */
-		struct sockbuf *sb = &nlp->nl_socket->so_snd;
-
-		SOCKBUF_LOCK(sb);
-		unsigned int avail = sbavail(sb);
-		if (avail > 0) {
-			NL_LOG(LOG_DEBUG3, "grabbed %u bytes", avail);
-			queue_push(&nlp->rx_queue, sbcut_locked(sb, avail));
-		}
-		SOCKBUF_UNLOCK(sb);
-	} else {
-		/* Schedule another pass to read from the socket queue */
-		reschedule = true;
-	}
-
 	int prev_hiwat = nlp->tx_queue.hiwat;
 	NLP_UNLOCK(nlp);
 
-	while (!queue_empty(&nlp->rx_queue)) {
-		struct mbuf *m = queue_pop(&nlp->rx_queue);
-
-		m = nl_process_mbuf(m, nlp);
-		if (m != NULL) {
-			queue_push_head(&nlp->rx_queue, m);
-			reschedule = false;
+	SOCK_SENDBUF_LOCK(so);
+	while ((nb = TAILQ_FIRST(&sb->nl_queue)) != NULL) {
+		TAILQ_REMOVE(&sb->nl_queue, nb, tailq);
+		SOCK_SENDBUF_UNLOCK(so);
+		reschedule = nl_process_nbuf(nb, nlp);
+		SOCK_SENDBUF_LOCK(so);
+		if (reschedule) {
+			sb->sb_acc -= nb->datalen;
+			sb->sb_ccc -= nb->datalen;
+			/* XXXGL: potentially can reduce lock&unlock count. */
+			sowwakeup_locked(so);
+			free(nb, M_NETLINK);
+			SOCK_SENDBUF_LOCK(so);
+		} else {
+			TAILQ_INSERT_HEAD(&sb->nl_queue, nb, tailq);
 			break;
 		}
 	}
+	SOCK_SENDBUF_UNLOCK(so);
 	if (nlp->tx_queue.hiwat > prev_hiwat) {
 		NLP_LOG(LOG_DEBUG, nlp, "TX override peaked to %d", nlp->tx_queue.hiwat);
 
@@ -323,14 +279,12 @@ nl_process_received(struct nlpcb *nlp)
 void
 nl_init_io(struct nlpcb *nlp)
 {
-	STAILQ_INIT(&nlp->rx_queue.head);
 	STAILQ_INIT(&nlp->tx_queue.head);
 }
 
 void
 nl_free_io(struct nlpcb *nlp)
 {
-	queue_free(&nlp->rx_queue);
 	queue_free(&nlp->tx_queue);
 }
 
@@ -529,70 +483,51 @@ npt_clear(struct nl_pstate *npt)
 /*
  * Processes an incoming packet, which can contain multiple netlink messages
  */
-static struct mbuf *
-nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp)
+static bool
+nl_process_nbuf(struct nl_buf *nb, struct nlpcb *nlp)
 {
-	int offset, buffer_length;
 	struct nlmsghdr *hdr;
-	char *buffer;
 	int error;
 
-	NL_LOG(LOG_DEBUG3, "RX netlink mbuf %p on %p", m, nlp->nl_socket);
+	NL_LOG(LOG_DEBUG3, "RX netlink buf %p on %p", nb, nlp->nl_socket);
 
 	struct nl_writer nw = {};
 	if (!nlmsg_get_unicast_writer(&nw, NLMSG_SMALL, nlp)) {
-		m_freem(m);
 		NL_LOG(LOG_DEBUG, "error allocating socket writer");
-		return (NULL);
+		return (true);
 	}
 
 	nlmsg_ignore_limit(&nw);
-	/* TODO: alloc this buf once for nlp */
-	int data_length = m_length(m, NULL);
-	buffer_length = roundup2(data_length, 8) + SCRATCH_BUFFER_SIZE;
-	if (nlp->nl_linux)
-		buffer_length += roundup2(data_length, 8);
-	buffer = malloc(buffer_length, M_NETLINK, M_NOWAIT | M_ZERO);
-	if (buffer == NULL) {
-		m_freem(m);
-		nlmsg_flush(&nw);
-		NL_LOG(LOG_DEBUG, "Unable to allocate %d bytes of memory",
-		    buffer_length);
-		return (NULL);
-	}
-	m_copydata(m, 0, data_length, buffer);
 
 	struct nl_pstate npt = {
 		.nlp = nlp,
-		.lb.base = &buffer[roundup2(data_length, 8)],
-		.lb.size = buffer_length - roundup2(data_length, 8),
+		.lb.base = &nb->data[roundup2(nb->datalen, 8)],
+		.lb.size = nb->buflen - roundup2(nb->datalen, 8),
 		.nw = &nw,
 		.strict = nlp->nl_flags & NLF_STRICT,
 	};
 
-	for (offset = 0; offset + sizeof(struct nlmsghdr) <= data_length;) {
-		hdr = (struct nlmsghdr *)&buffer[offset];
+	for (; nb->offset + sizeof(struct nlmsghdr) <= nb->datalen;) {
+		hdr = (struct nlmsghdr *)&nb->data[nb->offset];
 		/* Save length prior to calling handler */
 		int msglen = NLMSG_ALIGN(hdr->nlmsg_len);
-		NL_LOG(LOG_DEBUG3, "parsing offset %d/%d", offset, data_length);
+		NL_LOG(LOG_DEBUG3, "parsing offset %d/%d",
+		    nb->offset, nb->datalen);
 		npt_clear(&npt);
-		error = nl_receive_message(hdr, data_length - offset, nlp, &npt);
-		offset += msglen;
+		error = nl_receive_message(hdr, nb->datalen - nb->offset, nlp,
+		    &npt);
+		nb->offset += msglen;
 		if (__predict_false(error != 0 || nlp->nl_tx_blocked))
 			break;
 	}
 	NL_LOG(LOG_DEBUG3, "packet parsing done");
-	free(buffer, M_NETLINK);
 	nlmsg_flush(&nw);
 
 	if (nlp->nl_tx_blocked) {
 		NLP_LOCK(nlp);
 		nlp->nl_tx_blocked = false;
 		NLP_UNLOCK(nlp);
-		m_adj(m, offset);
-		return (m);
-	} else {
-		m_freem(m);
-		return (NULL);
-	}
+		return (false);
+	} else
+		return (true);
 }
diff --git a/sys/netlink/netlink_var.h b/sys/netlink/netlink_var.h
index 36b7c61974c9..ec174e17d1a2 100644
--- a/sys/netlink/netlink_var.h
+++ b/sys/netlink/netlink_var.h
@@ -49,6 +49,14 @@ struct nl_io_queue {
 	int			hiwat;
 };
 
+struct nl_buf {
+	TAILQ_ENTRY(nl_buf)	tailq;
+	u_int			buflen;
+	u_int			datalen;
+	u_int			offset;
+	char			data[];
+};
+
 #define	NLP_MAX_GROUPS		128
 
 struct nlpcb {
@@ -58,14 +66,12 @@ struct nlpcb {
 	uint32_t	        nl_flags;
 	uint32_t	        nl_process_id;
         int                     nl_proto;
-        bool			nl_active;
 	bool			nl_bound;
         bool			nl_task_pending;
 	bool			nl_tx_blocked; /* No new requests accepted */
 	bool			nl_linux; /* true if running under compat */
 	bool			nl_unconstrained_vnet; /* true if running under VNET jail (or without jail) */
 	bool			nl_need_thread_setup;
-	struct nl_io_queue	rx_queue;
 	struct nl_io_queue	tx_queue;
 	struct taskqueue	*nl_taskqueue;
 	struct task		nl_task;
@@ -141,7 +147,7 @@ void nl_init_io(struct nlpcb *nlp);
 void nl_free_io(struct nlpcb *nlp);
 
 void nl_taskqueue_handler(void *_arg, int pending);
-int nl_receive_async(struct mbuf *m, struct socket *so);
+void nl_schedule_taskqueue(struct nlpcb *nlp);
 void nl_process_receive_locked(struct nlpcb *nlp);
 void nl_set_source_metadata(struct mbuf *m, int num_messages);
 void nl_add_msg_info(struct mbuf *m);
diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h
index 92b9964072fb..c6093883be4a 100644
--- a/sys/sys/sockbuf.h
+++ b/sys/sys/sockbuf.h
@@ -163,6 +163,12 @@ struct sockbuf {
 			u_int uxdg_ctl;
 			u_int uxdg_mbcnt;
 		};
+		/*
+		 * Netlink socket.
+		 */
+		struct {
+			TAILQ_HEAD(, nl_buf)	nl_queue;
+		};
 	};
 };