git: 762ad964ee34 - stable/13 - epair: Simplify the transmit path and address lost wakeups

From: Mark Johnston <markj_at_FreeBSD.org>
Date: Mon, 13 Mar 2023 15:53:30 UTC
The branch stable/13 has been updated by markj:

URL: https://cgit.FreeBSD.org/src/commit/?id=762ad964ee346cffdbf3eaa6ff87fa5b32d30738

commit 762ad964ee346cffdbf3eaa6ff87fa5b32d30738
Author:     Mark Johnston <markj@FreeBSD.org>
AuthorDate: 2023-03-01 20:21:30 +0000
Commit:     Mark Johnston <markj@FreeBSD.org>
CommitDate: 2023-03-13 15:53:16 +0000

    epair: Simplify the transmit path and address lost wakeups
    
    epairs currently shuttle all transmitted packets through a single global
    taskqueue thread.  To hand packets over to the taskqueue thread, each
    epair maintains a pair of ring buffers and a lockless scheme for
    notifying the thread of pending work.  The implementation can lead to
    lost wakeups, causing to-be-transmitted packets to end up stuck in the
    queue.
    
    Rather than extending the existing scheme, simply replace it with a
    linked list protected by a mutex, and use the mutex to synchronize
    wakeups of the taskqueue thread.  This appears to give equivalent or
    better throughput with >= 16 producer threads and eliminates the lost
    wakeups.
    
    Reviewed by:    kp
    MFC after:      1 week
    Sponsored by:   Klara, Inc.
    Sponsored by:   Modirum MDPay
    Differential Revision:  https://reviews.freebsd.org/D38843
    
    (cherry picked from commit df7bbd8c354a907d2c2f85a6e18f356f76458f57)
---
 sys/net/if_epair.c | 162 ++++++++++++++++++++++++++---------------------------
 1 file changed, 79 insertions(+), 83 deletions(-)

diff --git a/sys/net/if_epair.c b/sys/net/if_epair.c
index 05667633fa42..68b68c11af06 100644
--- a/sys/net/if_epair.c
+++ b/sys/net/if_epair.c
@@ -104,15 +104,16 @@ static unsigned int next_index = 0;
 #define	EPAIR_LOCK()			mtx_lock(&epair_n_index_mtx)
 #define	EPAIR_UNLOCK()			mtx_unlock(&epair_n_index_mtx)
 
-#define BIT_QUEUE_TASK		0
-#define BIT_MBUF_QUEUED		1
-
 struct epair_softc;
 struct epair_queue {
+	struct mtx		 mtx;
+	struct mbufq		 q;
 	int			 id;
-	struct buf_ring		*rxring[2];
-	volatile int		 ridx;		/* 0 || 1 */
-	volatile long		 state;		/* taskqueue coordination */
+	enum {
+		EPAIR_QUEUE_IDLE,
+		EPAIR_QUEUE_WAKING,
+		EPAIR_QUEUE_RUNNING,
+	}			 state;
 	struct task		 tx_task;
 	struct epair_softc	*sc;
 };
@@ -148,44 +149,49 @@ epair_clear_mbuf(struct mbuf *m)
 }
 
 static void
-epair_if_input(struct epair_softc *sc, struct epair_queue *q, int ridx)
+epair_tx_start_deferred(void *arg, int pending)
 {
-	struct ifnet *ifp;
-	struct mbuf *m;
+	struct epair_queue *q = (struct epair_queue *)arg;
+	if_t ifp;
+	struct mbuf *m, *n;
+	bool resched;
+
+	ifp = q->sc->ifp;
 
-	ifp = sc->ifp;
+	if_ref(ifp);
 	CURVNET_SET(ifp->if_vnet);
-	while (! buf_ring_empty(q->rxring[ridx])) {
-		m = buf_ring_dequeue_mc(q->rxring[ridx]);
-		if (m == NULL)
-			continue;
 
-		MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
-		(*ifp->if_input)(ifp, m);
+	mtx_lock(&q->mtx);
+	m = mbufq_flush(&q->q);
+	q->state = EPAIR_QUEUE_RUNNING;
+	mtx_unlock(&q->mtx);
 
+	while (m != NULL) {
+		n = STAILQ_NEXT(m, m_stailqpkt);
+		m->m_nextpkt = NULL;
+		if_input(ifp, m);
+		m = n;
 	}
-	CURVNET_RESTORE();
-}
 
-static void
-epair_tx_start_deferred(void *arg, int pending)
-{
-	struct epair_queue *q = (struct epair_queue *)arg;
-	struct epair_softc *sc = q->sc;
-	int ridx, nidx;
-
-	if_ref(sc->ifp);
-	ridx = atomic_load_int(&q->ridx);
-	do {
-		nidx = (ridx == 0) ? 1 : 0;
-	} while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx));
-	epair_if_input(sc, q, ridx);
-
-	atomic_clear_long(&q->state, (1 << BIT_QUEUE_TASK));
-	if (atomic_testandclear_long(&q->state, BIT_MBUF_QUEUED))
+	/*
+	 * Avoid flushing the queue more than once per task.  We can otherwise
+	 * end up starving ourselves in a multi-epair routing configuration.
+	 */
+	mtx_lock(&q->mtx);
+	if (mbufq_len(&q->q) > 0) {
+		resched = true;
+		q->state = EPAIR_QUEUE_WAKING;
+	} else {
+		resched = false;
+		q->state = EPAIR_QUEUE_IDLE;
+	}
+	mtx_unlock(&q->mtx);
+
+	if (resched)
 		taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
 
-	if_rele(sc->ifp);
+	CURVNET_RESTORE();
+	if_rele(ifp);
 }
 
 static struct epair_queue *
@@ -239,9 +245,9 @@ epair_prepare_mbuf(struct mbuf *m, struct ifnet *src_ifp)
 static void
 epair_menq(struct mbuf *m, struct epair_softc *osc)
 {
+	struct epair_queue *q;
 	struct ifnet *ifp, *oifp;
-	int len, ret;
-	int ridx;
+	int error, len;
 	bool mcast;
 
 	/*
@@ -257,32 +263,26 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
 	len = m->m_pkthdr.len;
 	mcast = (m->m_flags & (M_BCAST | M_MCAST)) != 0;
 
-	struct epair_queue *q = epair_select_queue(osc, m);
+	q = epair_select_queue(osc, m);
 
-	atomic_set_long(&q->state, (1 << BIT_MBUF_QUEUED));
-	ridx = atomic_load_int(&q->ridx);
-	ret = buf_ring_enqueue(q->rxring[ridx], m);
-	if (ret != 0) {
-		/* Ring is full. */
-		if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
-		m_freem(m);
-		return;
+	mtx_lock(&q->mtx);
+	if (q->state == EPAIR_QUEUE_IDLE) {
+		q->state = EPAIR_QUEUE_WAKING;
+		taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
 	}
+	error = mbufq_enqueue(&q->q, m);
+	mtx_unlock(&q->mtx);
 
-	if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
-	/*
-	 * IFQ_HANDOFF_ADJ/ip_handoff() update statistics,
-	 * but as we bypass all this we have to duplicate
-	 * the logic another time.
-	 */
-	if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
-	if (mcast)
-		if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
-	/* Someone else received the packet. */
-	if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
-
-	if (!atomic_testandset_long(&q->state, BIT_QUEUE_TASK))
-		taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
+	if (error != 0) {
+		m_freem(m);
+		if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
+	} else {
+		if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
+		if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
+		if (mcast)
+			if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
+		if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
+	}
 }
 
 static void
@@ -561,10 +561,9 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
 	for (int i = 0; i < sca->num_queues; i++) {
 		struct epair_queue *q = &sca->queues[i];
 		q->id = i;
-		q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
-		q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
-		q->ridx = 0;
-		q->state = 0;
+		q->state = EPAIR_QUEUE_IDLE;
+		mtx_init(&q->mtx, "epaiq", NULL, MTX_DEF | MTX_NEW);
+		mbufq_init(&q->q, RXRSIZE);
 		q->sc = sca;
 		NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
 	}
@@ -584,10 +583,9 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
 	for (int i = 0; i < scb->num_queues; i++) {
 		struct epair_queue *q = &scb->queues[i];
 		q->id = i;
-		q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
-		q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
-		q->ridx = 0;
-		q->state = 0;
+		q->state = EPAIR_QUEUE_IDLE;
+		mtx_init(&q->mtx, "epaiq", NULL, MTX_DEF | MTX_NEW);
+		mbufq_init(&q->q, RXRSIZE);
 		q->sc = scb;
 		NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
 	}
@@ -707,18 +705,18 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
 static void
 epair_drain_rings(struct epair_softc *sc)
 {
-	int ridx;
-	struct mbuf *m;
+	for (int i = 0; i < sc->num_queues; i++) {
+		struct epair_queue *q;
+		struct mbuf *m, *n;
 
-	for (ridx = 0; ridx < 2; ridx++) {
-		for (int i = 0; i < sc->num_queues; i++) {
-			struct epair_queue *q = &sc->queues[i];
-			do {
-				m = buf_ring_dequeue_sc(q->rxring[ridx]);
-				if (m == NULL)
-					break;
-				m_freem(m);
-			} while (1);
+		q = &sc->queues[i];
+		mtx_lock(&q->mtx);
+		m = mbufq_flush(&q->q);
+		mtx_unlock(&q->mtx);
+
+		for (; m != NULL; m = n) {
+			n = m->m_nextpkt;
+			m_freem(m);
 		}
 	}
 }
@@ -764,8 +762,7 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
 	ifmedia_removeall(&scb->media);
 	for (int i = 0; i < scb->num_queues; i++) {
 		struct epair_queue *q = &scb->queues[i];
-		buf_ring_free(q->rxring[0], M_EPAIR);
-		buf_ring_free(q->rxring[1], M_EPAIR);
+		mtx_destroy(&q->mtx);
 	}
 	free(scb->queues, M_EPAIR);
 	free(scb, M_EPAIR);
@@ -776,8 +773,7 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
 	ifmedia_removeall(&sca->media);
 	for (int i = 0; i < sca->num_queues; i++) {
 		struct epair_queue *q = &sca->queues[i];
-		buf_ring_free(q->rxring[0], M_EPAIR);
-		buf_ring_free(q->rxring[1], M_EPAIR);
+		mtx_destroy(&q->mtx);
 	}
 	free(sca->queues, M_EPAIR);
 	free(sca, M_EPAIR);