git: df7bbd8c354a - main - epair: Simplify the transmit path and address lost wakeups
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Mon, 06 Mar 2023 17:52:51 UTC
The branch main has been updated by markj: URL: https://cgit.FreeBSD.org/src/commit/?id=df7bbd8c354a907d2c2f85a6e18f356f76458f57 commit df7bbd8c354a907d2c2f85a6e18f356f76458f57 Author: Mark Johnston <markj@FreeBSD.org> AuthorDate: 2023-03-01 20:21:30 +0000 Commit: Mark Johnston <markj@FreeBSD.org> CommitDate: 2023-03-06 17:49:28 +0000 epair: Simplify the transmit path and address lost wakeups epairs currently shuttle all transmitted packets through a single global taskqueue thread. To hand packets over to the taskqueue thread, each epair maintains a pair of ring buffers and a lockless scheme for notifying the thread of pending work. The implementation can lead to lost wakeups, causing to-be-transmitted packets to end up stuck in the queue. Rather than extending the existing scheme, simply replace it with a linked list protected by a mutex, and use the mutex to synchronize wakeups of the taskqueue thread. This appears to give equivalent or better throughput with >= 16 producer threads and eliminates the lost wakeups. Reviewed by: kp MFC after: 1 week Sponsored by: Klara, Inc. Sponsored by: Modirum MDPay Differential Revision: https://reviews.freebsd.org/D38843 --- sys/net/if_epair.c | 152 ++++++++++++++++++++++++++--------------------------- 1 file changed, 75 insertions(+), 77 deletions(-) diff --git a/sys/net/if_epair.c b/sys/net/if_epair.c index 3fa4c47a0bca..32fd3afe309b 100644 --- a/sys/net/if_epair.c +++ b/sys/net/if_epair.c @@ -101,15 +101,16 @@ static unsigned int next_index = 0; #define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx) #define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx) -#define BIT_QUEUE_TASK 0 -#define BIT_MBUF_QUEUED 1 - struct epair_softc; struct epair_queue { + struct mtx mtx; + struct mbufq q; int id; - struct buf_ring *rxring[2]; - volatile int ridx; /* 0 || 1 */ - volatile long state; /* taskqueue coordination */ + enum { + EPAIR_QUEUE_IDLE, + EPAIR_QUEUE_WAKING, + EPAIR_QUEUE_RUNNING, + } state; struct task tx_task; struct epair_softc *sc; }; @@ -145,44 +146,49 @@ epair_clear_mbuf(struct mbuf *m) } static void -epair_if_input(struct epair_softc *sc, struct epair_queue *q, int ridx) +epair_tx_start_deferred(void *arg, int pending) { - struct ifnet *ifp; - struct mbuf *m; + struct epair_queue *q = (struct epair_queue *)arg; + if_t ifp; + struct mbuf *m, *n; + bool resched; - ifp = sc->ifp; + ifp = q->sc->ifp; + + if_ref(ifp); CURVNET_SET(ifp->if_vnet); - while (! buf_ring_empty(q->rxring[ridx])) { - m = buf_ring_dequeue_mc(q->rxring[ridx]); - if (m == NULL) - continue; - MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0); - (*ifp->if_input)(ifp, m); + mtx_lock(&q->mtx); + m = mbufq_flush(&q->q); + q->state = EPAIR_QUEUE_RUNNING; + mtx_unlock(&q->mtx); + while (m != NULL) { + n = STAILQ_NEXT(m, m_stailqpkt); + m->m_nextpkt = NULL; + if_input(ifp, m); + m = n; } - CURVNET_RESTORE(); -} -static void -epair_tx_start_deferred(void *arg, int pending) -{ - struct epair_queue *q = (struct epair_queue *)arg; - struct epair_softc *sc = q->sc; - int ridx, nidx; - - if_ref(sc->ifp); - ridx = atomic_load_int(&q->ridx); - do { - nidx = (ridx == 0) ? 1 : 0; - } while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx)); - epair_if_input(sc, q, ridx); - - atomic_clear_long(&q->state, (1 << BIT_QUEUE_TASK)); - if (atomic_testandclear_long(&q->state, BIT_MBUF_QUEUED)) + /* + * Avoid flushing the queue more than once per task. We can otherwise + * end up starving ourselves in a multi-epair routing configuration. + */ + mtx_lock(&q->mtx); + if (mbufq_len(&q->q) > 0) { + resched = true; + q->state = EPAIR_QUEUE_WAKING; + } else { + resched = false; + q->state = EPAIR_QUEUE_IDLE; + } + mtx_unlock(&q->mtx); + + if (resched) taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task); - if_rele(sc->ifp); + CURVNET_RESTORE(); + if_rele(ifp); } static struct epair_queue * @@ -236,9 +242,9 @@ epair_prepare_mbuf(struct mbuf *m, struct ifnet *src_ifp) static void epair_menq(struct mbuf *m, struct epair_softc *osc) { + struct epair_queue *q; struct ifnet *ifp, *oifp; - int len, ret; - int ridx; + int error, len; bool mcast; /* @@ -254,32 +260,26 @@ epair_menq(struct mbuf *m, struct epair_softc *osc) len = m->m_pkthdr.len; mcast = (m->m_flags & (M_BCAST | M_MCAST)) != 0; - struct epair_queue *q = epair_select_queue(osc, m); + q = epair_select_queue(osc, m); - atomic_set_long(&q->state, (1 << BIT_MBUF_QUEUED)); - ridx = atomic_load_int(&q->ridx); - ret = buf_ring_enqueue(q->rxring[ridx], m); - if (ret != 0) { - /* Ring is full. */ - if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1); - m_freem(m); - return; + mtx_lock(&q->mtx); + if (q->state == EPAIR_QUEUE_IDLE) { + q->state = EPAIR_QUEUE_WAKING; + taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task); } + error = mbufq_enqueue(&q->q, m); + mtx_unlock(&q->mtx); - if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1); - /* - * IFQ_HANDOFF_ADJ/ip_handoff() update statistics, - * but as we bypass all this we have to duplicate - * the logic another time. - */ - if_inc_counter(ifp, IFCOUNTER_OBYTES, len); - if (mcast) - if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1); - /* Someone else received the packet. */ - if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1); - - if (!atomic_testandset_long(&q->state, BIT_QUEUE_TASK)) - taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task); + if (error != 0) { + m_freem(m); + if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1); + } else { + if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1); + if_inc_counter(ifp, IFCOUNTER_OBYTES, len); + if (mcast) + if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1); + if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1); + } } static void @@ -514,10 +514,9 @@ epair_alloc_sc(struct if_clone *ifc) for (int i = 0; i < sc->num_queues; i++) { struct epair_queue *q = &sc->queues[i]; q->id = i; - q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL); - q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL); - q->ridx = 0; - q->state = 0; + q->state = EPAIR_QUEUE_IDLE; + mtx_init(&q->mtx, "epairq", NULL, MTX_DEF | MTX_NEW); + mbufq_init(&q->q, RXRSIZE); q->sc = sc; NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q); } @@ -610,8 +609,7 @@ epair_free_sc(struct epair_softc *sc) ifmedia_removeall(&sc->media); for (int i = 0; i < sc->num_queues; i++) { struct epair_queue *q = &sc->queues[i]; - buf_ring_free(q->rxring[0], M_EPAIR); - buf_ring_free(q->rxring[1], M_EPAIR); + mtx_destroy(&q->mtx); } free(sc->queues, M_EPAIR); free(sc, M_EPAIR); @@ -756,18 +754,18 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, static void epair_drain_rings(struct epair_softc *sc) { - int ridx; - struct mbuf *m; + for (int i = 0; i < sc->num_queues; i++) { + struct epair_queue *q; + struct mbuf *m, *n; - for (ridx = 0; ridx < 2; ridx++) { - for (int i = 0; i < sc->num_queues; i++) { - struct epair_queue *q = &sc->queues[i]; - do { - m = buf_ring_dequeue_sc(q->rxring[ridx]); - if (m == NULL) - break; - m_freem(m); - } while (1); + q = &sc->queues[i]; + mtx_lock(&q->mtx); + m = mbufq_flush(&q->q); + mtx_unlock(&q->mtx); + + for (; m != NULL; m = n) { + n = m->m_nextpkt; + m_freem(m); } } }