git: fd8f61d6e970 - main - cxgbei: Dispatch sent PDUs to the NIC asynchronously.
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Tue, 08 Feb 2022 00:21:16 UTC
The branch main has been updated by jhb: URL: https://cgit.FreeBSD.org/src/commit/?id=fd8f61d6e970fa443d393d330ae70c54c9a523a4 commit fd8f61d6e970fa443d393d330ae70c54c9a523a4 Author: John Baldwin <jhb@FreeBSD.org> AuthorDate: 2022-02-08 00:20:06 +0000 Commit: John Baldwin <jhb@FreeBSD.org> CommitDate: 2022-02-08 00:20:06 +0000 cxgbei: Dispatch sent PDUs to the NIC asynchronously. Previously the driver was called to send PDUs to the NIC synchronously from the icl_conn_pdu_queue_cb callback. However, this performed a fair bit of work while holding the icl connection lock. Instead, change the callback to add sent PDUs to a STAILQ and defer dispatching of PDUs to the NIC to a helper thread similar to the scheme used in the TCP iSCSI backend. - Replace rx_flags int and the sole RXF_ACTIVE flag with a simple rx_active bool. - Add a pool of transmit worker threads for cxgbei. - Fix worker thread exit to depend on the wakeup in kthread_exit() to fix a race with module unload. Reported by: mav Sponsored by: Chelsio Communications --- sys/dev/cxgbe/cxgbei/cxgbei.c | 187 +++++++++++++++++++++----------------- sys/dev/cxgbe/cxgbei/cxgbei.h | 21 +++-- sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 172 +++++++++++++++++++++++++++++------ 3 files changed, 260 insertions(+), 120 deletions(-) diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c index 4a8df99b3d48..c06e39005197 100644 --- a/sys/dev/cxgbe/cxgbei/cxgbei.c +++ b/sys/dev/cxgbe/cxgbei/cxgbei.c @@ -95,8 +95,9 @@ __FBSDID("$FreeBSD$"); #include "cxgbei.h" static int worker_thread_count; -static struct cxgbei_worker_thread_softc *cwt_softc; -static struct proc *cxgbei_proc; +static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads; + +static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc); static void read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len, @@ -585,17 +586,9 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) icl_cxgbei_new_pdu_set_conn(ip, ic); STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next); - if ((icc->rx_flags & RXF_ACTIVE) == 0) { - struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt]; - - mtx_lock(&cwt->cwt_lock); - icc->rx_flags |= RXF_ACTIVE; - TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link); - if (cwt->cwt_state == CWT_SLEEPING) { - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); - } - mtx_unlock(&cwt->cwt_lock); + if (!icc->rx_active) { + icc->rx_active = true; + cwt_queue_for_rx(icc); } SOCKBUF_UNLOCK(sb); INP_WUNLOCK(inp); @@ -836,17 +829,9 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) /* Enqueue the PDU to the received pdus queue. */ STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next); - if ((icc->rx_flags & RXF_ACTIVE) == 0) { - struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt]; - - mtx_lock(&cwt->cwt_lock); - icc->rx_flags |= RXF_ACTIVE; - TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link); - if (cwt->cwt_state == CWT_SLEEPING) { - cwt->cwt_state = CWT_RUNNING; - cv_signal(&cwt->cwt_cv); - } - mtx_unlock(&cwt->cwt_lock); + if (!icc->rx_active) { + icc->rx_active = true; + cwt_queue_for_rx(icc); } SOCKBUF_UNLOCK(sb); INP_WUNLOCK(inp); @@ -944,9 +929,9 @@ static struct uld_info cxgbei_uld_info = { }; static void -cwt_main(void *arg) +cwt_rx_main(void *arg) { - struct cxgbei_worker_thread_softc *cwt = arg; + struct cxgbei_worker_thread *cwt = arg; struct icl_cxgbei_conn *icc = NULL; struct icl_conn *ic; struct icl_pdu *ip; @@ -962,8 +947,8 @@ cwt_main(void *arg) while (__predict_true(cwt->cwt_state != CWT_STOP)) { cwt->cwt_state = CWT_RUNNING; - while ((icc = TAILQ_FIRST(&cwt->rx_head)) != NULL) { - TAILQ_REMOVE(&cwt->rx_head, icc, rx_link); + while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) { + TAILQ_REMOVE(&cwt->icc_head, icc, rx_link); mtx_unlock(&cwt->cwt_lock); ic = &icc->ic; @@ -979,7 +964,7 @@ cwt_main(void *arg) */ parse_pdus(icc, sb); } - MPASS(icc->rx_flags & RXF_ACTIVE); + MPASS(icc->rx_active); if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) { MPASS(STAILQ_EMPTY(&rx_pdus)); STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu); @@ -994,11 +979,16 @@ cwt_main(void *arg) SOCKBUF_LOCK(sb); MPASS(STAILQ_EMPTY(&rx_pdus)); } - MPASS(icc->rx_flags & RXF_ACTIVE); + MPASS(icc->rx_active); if (STAILQ_EMPTY(&icc->rcvd_pdus) || __predict_false(sb->sb_state & SBS_CANTRCVMORE)) { - icc->rx_flags &= ~RXF_ACTIVE; + icc->rx_active = false; + SOCKBUF_UNLOCK(sb); + + mtx_lock(&cwt->cwt_lock); } else { + SOCKBUF_UNLOCK(sb); + /* * More PDUs were received while we were busy * handing over the previous batch to ICL. @@ -1006,13 +996,9 @@ cwt_main(void *arg) * queue. */ mtx_lock(&cwt->cwt_lock); - TAILQ_INSERT_TAIL(&cwt->rx_head, icc, + TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link); - mtx_unlock(&cwt->cwt_lock); } - SOCKBUF_UNLOCK(sb); - - mtx_lock(&cwt->cwt_lock); } /* Inner loop doesn't check for CWT_STOP, do that first. */ @@ -1022,84 +1008,121 @@ cwt_main(void *arg) cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); } - MPASS(TAILQ_FIRST(&cwt->rx_head) == NULL); - mtx_assert(&cwt->cwt_lock, MA_OWNED); - cwt->cwt_state = CWT_STOPPED; - cv_signal(&cwt->cwt_cv); + MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL); mtx_unlock(&cwt->cwt_lock); kthread_exit(); } +static void +cwt_queue_for_rx(struct icl_cxgbei_conn *icc) +{ + struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt]; + + mtx_lock(&cwt->cwt_lock); + TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link); + if (cwt->cwt_state == CWT_SLEEPING) { + cwt->cwt_state = CWT_RUNNING; + cv_signal(&cwt->cwt_cv); + } + mtx_unlock(&cwt->cwt_lock); +} + +void +cwt_queue_for_tx(struct icl_cxgbei_conn *icc) +{ + struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt]; + + mtx_lock(&cwt->cwt_lock); + TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link); + if (cwt->cwt_state == CWT_SLEEPING) { + cwt->cwt_state = CWT_RUNNING; + cv_signal(&cwt->cwt_cv); + } + mtx_unlock(&cwt->cwt_lock); +} + static int start_worker_threads(void) { + struct proc *cxgbei_proc; int i, rc; - struct cxgbei_worker_thread_softc *cwt; + struct cxgbei_worker_thread *cwt; worker_thread_count = min(mp_ncpus, 32); - cwt_softc = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE, + cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE, + M_WAITOK | M_ZERO); + cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE, M_WAITOK | M_ZERO); - MPASS(cxgbei_proc == NULL); - for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) { + for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count; + i++, cwt++) { + mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF); + cv_init(&cwt->cwt_cv, "cwt cv"); + TAILQ_INIT(&cwt->icc_head); + } + + for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count; + i++, cwt++) { mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF); cv_init(&cwt->cwt_cv, "cwt cv"); - TAILQ_INIT(&cwt->rx_head); - rc = kproc_kthread_add(cwt_main, cwt, &cxgbei_proc, NULL, 0, 0, - "cxgbei", "%d", i); + TAILQ_INIT(&cwt->icc_head); + } + + cxgbei_proc = NULL; + for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count; + i++, cwt++) { + rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc, + &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i); if (rc != 0) { - printf("cxgbei: failed to start thread #%d/%d (%d)\n", + printf("cxgbei: failed to start rx thread #%d/%d (%d)\n", i + 1, worker_thread_count, rc); - mtx_destroy(&cwt->cwt_lock); - cv_destroy(&cwt->cwt_cv); - bzero(cwt, sizeof(*cwt)); - if (i == 0) { - free(cwt_softc, M_CXGBE); - worker_thread_count = 0; - - return (rc); - } - - /* Not fatal, carry on with fewer threads. */ - worker_thread_count = i; - rc = 0; - break; + return (rc); } + } - /* Wait for thread to start before moving on to the next one. */ - mtx_lock(&cwt->cwt_lock); - while (cwt->cwt_state == 0) - cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); - mtx_unlock(&cwt->cwt_lock); + for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count; + i++, cwt++) { + rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc, + &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i); + if (rc != 0) { + printf("cxgbei: failed to start tx thread #%d/%d (%d)\n", + i + 1, worker_thread_count, rc); + return (rc); + } } - MPASS(cwt_softc != NULL); - MPASS(worker_thread_count > 0); return (0); } static void -stop_worker_threads(void) +stop_worker_threads1(struct cxgbei_worker_thread *threads) { + struct cxgbei_worker_thread *cwt; int i; - struct cxgbei_worker_thread_softc *cwt = &cwt_softc[0]; - MPASS(worker_thread_count >= 0); - - for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) { + for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) { mtx_lock(&cwt->cwt_lock); - MPASS(cwt->cwt_state == CWT_RUNNING || - cwt->cwt_state == CWT_SLEEPING); - cwt->cwt_state = CWT_STOP; - cv_signal(&cwt->cwt_cv); - do { - cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); - } while (cwt->cwt_state != CWT_STOPPED); + if (cwt->cwt_td != NULL) { + MPASS(cwt->cwt_state == CWT_RUNNING || + cwt->cwt_state == CWT_SLEEPING); + cwt->cwt_state = CWT_STOP; + cv_signal(&cwt->cwt_cv); + mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0); + } mtx_unlock(&cwt->cwt_lock); mtx_destroy(&cwt->cwt_lock); cv_destroy(&cwt->cwt_cv); } - free(cwt_softc, M_CXGBE); + free(threads, M_CXGBE); +} + +static void +stop_worker_threads(void) +{ + + MPASS(worker_thread_count >= 0); + stop_worker_threads1(cwt_rx_threads); + stop_worker_threads1(cwt_tx_threads); } /* Select a worker thread for a connection. */ diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h index 58a5dac6d63b..b078f3110d62 100644 --- a/sys/dev/cxgbe/cxgbei/cxgbei.h +++ b/sys/dev/cxgbe/cxgbei/cxgbei.h @@ -36,23 +36,19 @@ enum { CWT_SLEEPING = 1, CWT_RUNNING = 2, CWT_STOP = 3, - CWT_STOPPED = 4, }; -struct cxgbei_worker_thread_softc { +struct cxgbei_worker_thread { struct mtx cwt_lock; struct cv cwt_cv; volatile int cwt_state; + struct thread *cwt_td; - TAILQ_HEAD(, icl_cxgbei_conn) rx_head; + TAILQ_HEAD(, icl_cxgbei_conn) icc_head; } __aligned(CACHE_LINE_SIZE); #define CXGBEI_CONN_SIGNATURE 0x56788765 -enum { - RXF_ACTIVE = 1 << 0, /* In the worker thread's queue */ -}; - struct cxgbei_cmp { LIST_ENTRY(cxgbei_cmp) link; @@ -71,16 +67,21 @@ struct icl_cxgbei_conn { int ulp_submode; struct adapter *sc; struct toepcb *toep; + u_int cwt; /* Receive related. */ - u_int rx_flags; /* protected by so_rcv lock */ - u_int cwt; + bool rx_active; /* protected by so_rcv lock */ STAILQ_HEAD(, icl_pdu) rcvd_pdus; /* protected by so_rcv lock */ TAILQ_ENTRY(icl_cxgbei_conn) rx_link; /* protected by cwt lock */ struct cxgbei_cmp_head *cmp_table; /* protected by cmp_lock */ struct mtx cmp_lock; unsigned long cmp_hash_mask; + + /* Transmit related. */ + bool tx_active; /* protected by ic lock */ + STAILQ_HEAD(, icl_pdu) sent_pdus; /* protected by ic lock */ + TAILQ_ENTRY(icl_cxgbei_conn) tx_link; /* protected by cwt lock */ }; static inline struct icl_cxgbei_conn * @@ -134,8 +135,10 @@ struct cxgbei_data { /* cxgbei.c */ u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *); +void cwt_queue_for_tx(struct icl_cxgbei_conn *); /* icl_cxgbei.c */ +void cwt_tx_main(void *); int icl_cxgbei_mod_load(void); int icl_cxgbei_mod_unload(void); struct icl_pdu *icl_cxgbei_new_pdu(int); diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c index f66a959f6311..516ab931a49c 100644 --- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c +++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c @@ -421,6 +421,128 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp) return (m); } +static void +cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq) +{ + struct epoch_tracker et; + struct icl_conn *ic = &icc->ic; + struct toepcb *toep = icc->toep; + struct inpcb *inp; + + /* + * Do not get inp from toep->inp as the toepcb might have + * detached already. + */ + inp = sotoinpcb(so); + CURVNET_SET(toep->vnet); + NET_EPOCH_ENTER(et); + INP_WLOCK(inp); + + ICL_CONN_UNLOCK(ic); + if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) || + __predict_false((toep->flags & TPF_ATTACHED) == 0)) { + mbufq_drain(mq); + } else { + mbufq_concat(&toep->ulp_pduq, mq); + t4_push_pdus(icc->sc, toep, 0); + } + INP_WUNLOCK(inp); + NET_EPOCH_EXIT(et); + CURVNET_RESTORE(); + + ICL_CONN_LOCK(ic); +} + +void +cwt_tx_main(void *arg) +{ + struct cxgbei_worker_thread *cwt = arg; + struct icl_cxgbei_conn *icc; + struct icl_conn *ic; + struct icl_pdu *ip; + struct socket *so; + struct mbuf *m; + struct mbufq mq; + STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus); + + MPASS(cwt != NULL); + + mtx_lock(&cwt->cwt_lock); + MPASS(cwt->cwt_state == 0); + cwt->cwt_state = CWT_RUNNING; + cv_signal(&cwt->cwt_cv); + + mbufq_init(&mq, INT_MAX); + while (__predict_true(cwt->cwt_state != CWT_STOP)) { + cwt->cwt_state = CWT_RUNNING; + while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) { + TAILQ_REMOVE(&cwt->icc_head, icc, tx_link); + mtx_unlock(&cwt->cwt_lock); + + ic = &icc->ic; + + ICL_CONN_LOCK(ic); + MPASS(icc->tx_active); + STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu); + ICL_CONN_UNLOCK(ic); + + while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) { + STAILQ_REMOVE_HEAD(&tx_pdus, ip_next); + + m = finalize_pdu(icc, ip_to_icp(ip)); + M_ASSERTPKTHDR(m); + MPASS((m->m_pkthdr.len & 3) == 0); + + mbufq_enqueue(&mq, m); + } + + ICL_CONN_LOCK(ic); + so = ic->ic_socket; + if (__predict_false(ic->ic_disconnecting) || + __predict_false(so == NULL)) { + mbufq_drain(&mq); + icc->tx_active = false; + ICL_CONN_UNLOCK(ic); + + mtx_lock(&cwt->cwt_lock); + continue; + } + + cwt_push_pdus(icc, so, &mq); + + MPASS(icc->tx_active); + if (STAILQ_EMPTY(&icc->sent_pdus)) { + icc->tx_active = false; + ICL_CONN_UNLOCK(ic); + + mtx_lock(&cwt->cwt_lock); + } else { + ICL_CONN_UNLOCK(ic); + + /* + * More PDUs were queued while we were + * busy sending the previous batch. + * Re-add this connection to the end + * of the queue. + */ + mtx_lock(&cwt->cwt_lock); + TAILQ_INSERT_TAIL(&cwt->icc_head, icc, + tx_link); + } + } + + /* Inner loop doesn't check for CWT_STOP, do that first. */ + if (__predict_false(cwt->cwt_state == CWT_STOP)) + break; + cwt->cwt_state = CWT_SLEEPING; + cv_wait(&cwt->cwt_cv, &cwt->cwt_lock); + } + + MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL); + mtx_unlock(&cwt->cwt_lock); + kthread_exit(); +} + int icl_cxgbei_conn_pdu_append_data(struct icl_conn *ic, struct icl_pdu *ip, const void *addr, size_t len, int flags) @@ -534,13 +656,9 @@ void icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip, icl_pdu_cb cb) { - struct epoch_tracker et; struct icl_cxgbei_conn *icc = ic_to_icc(ic); struct icl_cxgbei_pdu *icp = ip_to_icp(ip); struct socket *so = ic->ic_socket; - struct toepcb *toep = icc->toep; - struct inpcb *inp; - struct mbuf *m; MPASS(ic == ip->ip_conn); MPASS(ip->ip_bhs_mbuf != NULL); @@ -557,28 +675,11 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip, return; } - m = finalize_pdu(icc, icp); - M_ASSERTPKTHDR(m); - MPASS((m->m_pkthdr.len & 3) == 0); - - /* - * Do not get inp from toep->inp as the toepcb might have detached - * already. - */ - inp = sotoinpcb(so); - CURVNET_SET(toep->vnet); - NET_EPOCH_ENTER(et); - INP_WLOCK(inp); - if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) || - __predict_false((toep->flags & TPF_ATTACHED) == 0)) - m_freem(m); - else { - mbufq_enqueue(&toep->ulp_pduq, m); - t4_push_pdus(icc->sc, toep, 0); + STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next); + if (!icc->tx_active) { + icc->tx_active = true; + cwt_queue_for_tx(icc); } - INP_WUNLOCK(inp); - NET_EPOCH_EXIT(et); - CURVNET_RESTORE(); } static struct icl_conn * @@ -593,6 +694,7 @@ icl_cxgbei_new_conn(const char *name, struct mtx *lock) M_WAITOK | M_ZERO); icc->icc_signature = CXGBEI_CONN_SIGNATURE; STAILQ_INIT(&icc->rcvd_pdus); + STAILQ_INIT(&icc->sent_pdus); icc->cmp_table = hashinit(64, M_CXGBEI, &icc->cmp_hash_mask); mtx_init(&icc->cmp_lock, "cxgbei_cmp", NULL, MTX_DEF); @@ -935,21 +1037,33 @@ icl_cxgbei_conn_close(struct icl_conn *ic) if (toep != NULL) { /* NULL if connection was never offloaded. */ toep->ulpcb = NULL; + /* + * Wait for the cwt threads to stop processing this + * connection for transmit. + */ + while (icc->tx_active) + rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1); + /* Discard PDUs queued for TX. */ + while (!STAILQ_EMPTY(&icc->sent_pdus)) { + ip = STAILQ_FIRST(&icc->sent_pdus); + STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next); + icl_cxgbei_pdu_done(ip, ENOTCONN); + } mbufq_drain(&toep->ulp_pduq); /* * Wait for the cwt threads to stop processing this - * connection. + * connection for receive. */ SOCKBUF_LOCK(sb); - if (icc->rx_flags & RXF_ACTIVE) { - volatile u_int *p = &icc->rx_flags; + if (icc->rx_active) { + volatile bool *p = &icc->rx_active; SOCKBUF_UNLOCK(sb); INP_WUNLOCK(inp); - while (*p & RXF_ACTIVE) + while (*p) pause("conclo", 1); INP_WLOCK(inp);