svn commit: r267786 - in stable/9/sys: nfs rpc
Alexander Motin
mav at FreeBSD.org
Mon Jun 23 13:09:51 UTC 2014
Author: mav
Date: Mon Jun 23 13:09:49 2014
New Revision: 267786
URL: http://svnweb.freebsd.org/changeset/base/267786
Log:
MFC r267221, r267278:
Introduce new per-thread lock to protect the list of requests.
This allows to slightly simplify svc_run_internal() code: if we processed
all the requests in a queue, then we know that new one will not appear.
Modified:
stable/9/sys/nfs/nfs_fha.c
stable/9/sys/rpc/svc.c
stable/9/sys/rpc/svc.h
Directory Properties:
stable/9/ (props changed)
stable/9/sys/ (props changed)
Modified: stable/9/sys/nfs/nfs_fha.c
==============================================================================
--- stable/9/sys/nfs/nfs_fha.c Mon Jun 23 12:43:30 2014 (r267785)
+++ stable/9/sys/nfs/nfs_fha.c Mon Jun 23 13:09:49 2014 (r267786)
@@ -288,11 +288,7 @@ fha_hash_entry_add_op(struct fha_hash_en
* Get the service thread currently associated with the fhe that is
* appropriate to handle this operation.
*/
-SVCTHREAD *
-fha_hash_entry_choose_thread(struct fha_params *softc,
- struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread);
-
-SVCTHREAD *
+static SVCTHREAD *
fha_hash_entry_choose_thread(struct fha_params *softc,
struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread)
{
@@ -428,13 +424,13 @@ fha_assign(SVCTHREAD *this_thread, struc
* Grab the pool lock here to not let chosen thread go away before
* the new request inserted to its queue while we drop fhe lock.
*/
- mtx_lock(&(*softc->pool)->sp_lock);
+ mtx_lock(&thread->st_lock);
mtx_unlock(fhe->mtx);
return (thread);
thist:
req->rq_p1 = NULL;
- mtx_lock(&(*softc->pool)->sp_lock);
+ mtx_lock(&this_thread->st_lock);
return (this_thread);
}
Modified: stable/9/sys/rpc/svc.c
==============================================================================
--- stable/9/sys/rpc/svc.c Mon Jun 23 12:43:30 2014 (r267785)
+++ stable/9/sys/rpc/svc.c Mon Jun 23 13:09:49 2014 (r267786)
@@ -1072,7 +1072,6 @@ svc_request_space_available(SVCPOOL *poo
static void
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
- struct svc_reqlist reqs;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
@@ -1081,11 +1080,11 @@ svc_run_internal(SVCPOOL *pool, bool_t i
int error;
st = mem_alloc(sizeof(*st));
+ mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
st->st_pool = pool;
st->st_xprt = NULL;
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
- STAILQ_INIT(&reqs);
mtx_lock(&pool->sp_lock);
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
@@ -1119,7 +1118,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
}
xprt = st->st_xprt;
- if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
+ if (!xprt) {
/*
* Enforce maxthreads count.
*/
@@ -1161,8 +1160,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
if (!ismaster
&& (pool->sp_threadcount
> pool->sp_minthreads)
- && !st->st_xprt
- && STAILQ_EMPTY(&st->st_reqs))
+ && !st->st_xprt)
break;
} else if (error) {
mtx_unlock(&pool->sp_lock);
@@ -1172,93 +1170,71 @@ svc_run_internal(SVCPOOL *pool, bool_t i
}
continue;
}
+ mtx_unlock(&pool->sp_lock);
- if (xprt) {
- /*
- * Drain the transport socket and queue up any
- * RPCs.
- */
- xprt->xp_lastactive = time_uptime;
- do {
- if (!svc_request_space_available(pool))
- break;
- mtx_unlock(&pool->sp_lock);
- rqstp = NULL;
- stat = svc_getreq(xprt, &rqstp);
- if (rqstp) {
- svc_change_space_used(pool, rqstp->rq_size);
- /*
- * See if the application has
- * a preference for some other
- * thread.
- */
- stpref = st;
- if (pool->sp_assign)
- stpref = pool->sp_assign(st,
- rqstp);
- else
- mtx_lock(&pool->sp_lock);
-
+ /*
+ * Drain the transport socket and queue up any RPCs.
+ */
+ xprt->xp_lastactive = time_uptime;
+ do {
+ if (!svc_request_space_available(pool))
+ break;
+ rqstp = NULL;
+ stat = svc_getreq(xprt, &rqstp);
+ if (rqstp) {
+ svc_change_space_used(pool, rqstp->rq_size);
+ /*
+ * See if the application has a preference
+ * for some other thread.
+ */
+ if (pool->sp_assign) {
+ stpref = pool->sp_assign(st, rqstp);
rqstp->rq_thread = stpref;
STAILQ_INSERT_TAIL(&stpref->st_reqs,
rqstp, rq_link);
-
- /*
- * If we assigned the request
- * to another thread, make
- * sure its awake and continue
- * reading from the
- * socket. Otherwise, try to
- * find some other thread to
- * read from the socket and
- * execute the request
- * immediately.
- */
- if (stpref == st)
- break;
- if (stpref->st_idle) {
- LIST_REMOVE(stpref, st_ilink);
- stpref->st_idle = FALSE;
- cv_signal(&stpref->st_cond);
- }
- } else
- mtx_lock(&pool->sp_lock);
- } while (stat == XPRT_MOREREQS
- && pool->sp_state != SVCPOOL_CLOSING);
-
- /*
- * Move this transport to the end of the
- * active list to ensure fairness when
- * multiple transports are active. If this was
- * the last queued request, svc_getreq will
- * end up calling xprt_inactive to remove from
- * the active list.
- */
- xprt->xp_thread = NULL;
- st->st_xprt = NULL;
- if (xprt->xp_active) {
- if (!svc_request_space_available(pool) ||
- !xprt_assignthread(xprt))
- TAILQ_INSERT_TAIL(&pool->sp_active,
- xprt, xp_alink);
+ mtx_unlock(&stpref->st_lock);
+ if (stpref != st)
+ rqstp = NULL;
+ } else {
+ rqstp->rq_thread = st;
+ STAILQ_INSERT_TAIL(&st->st_reqs,
+ rqstp, rq_link);
+ }
}
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
- SVC_RELEASE(xprt);
- } else {
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
+ } while (rqstp == NULL && stat == XPRT_MOREREQS
+ && pool->sp_state != SVCPOOL_CLOSING);
+
+ /*
+ * Move this transport to the end of the active list to
+ * ensure fairness when multiple transports are active.
+ * If this was the last queued request, svc_getreq will end
+ * up calling xprt_inactive to remove from the active list.
+ */
+ mtx_lock(&pool->sp_lock);
+ xprt->xp_thread = NULL;
+ st->st_xprt = NULL;
+ if (xprt->xp_active) {
+ if (!svc_request_space_available(pool) ||
+ !xprt_assignthread(xprt))
+ TAILQ_INSERT_TAIL(&pool->sp_active,
+ xprt, xp_alink);
}
+ mtx_unlock(&pool->sp_lock);
+ SVC_RELEASE(xprt);
/*
* Execute what we have queued.
*/
sz = 0;
- while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
- STAILQ_REMOVE_HEAD(&reqs, rq_link);
+ mtx_lock(&st->st_lock);
+ while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
+ STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
+ mtx_unlock(&st->st_lock);
sz += rqstp->rq_size;
svc_executereq(rqstp);
+ mtx_lock(&st->st_lock);
}
+ mtx_unlock(&st->st_lock);
svc_change_space_used(pool, -sz);
mtx_lock(&pool->sp_lock);
}
@@ -1275,6 +1251,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
mtx_unlock(&pool->sp_lock);
+ mtx_destroy(&st->st_lock);
cv_destroy(&st->st_cond);
mem_free(st, sizeof(*st));
Modified: stable/9/sys/rpc/svc.h
==============================================================================
--- stable/9/sys/rpc/svc.h Mon Jun 23 12:43:30 2014 (r267785)
+++ stable/9/sys/rpc/svc.h Mon Jun 23 13:09:49 2014 (r267786)
@@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req);
* thread to read and execute pending RPCs.
*/
typedef struct __rpc_svcthread {
+ struct mtx_padalign st_lock; /* protects st_reqs field */
struct __rpc_svcpool *st_pool;
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */
More information about the svn-src-stable-9
mailing list