svn commit: r228403 - projects/nfsv4.1-client/sys/rpc
Rick Macklem
rmacklem at FreeBSD.org
Sat Dec 10 23:57:32 UTC 2011
Author: rmacklem
Date: Sat Dec 10 23:57:32 2011
New Revision: 228403
URL: http://svn.freebsd.org/changeset/base/228403
Log:
Modify the TCP RPC client code to handle the backchannel. The main
change is to recognize the case of an RPC request (CALL) message
in the receive upcall and marshalling it to the server via the
xprt structure in ct_backchannelxprt. The locking is:
- ct_lock protects the ct_backchannelxprt field along with other ct_data
fields and is held during most of the backchannel changes.
- xp_lock protects the fields inside the xprt structure and must be
held when the structure is being released in clnt_vc_destroy().
The ct_request and ct_data structures are now defined in _krpc.h.
Modified:
projects/nfsv4.1-client/sys/rpc/clnt_vc.c
Modified: projects/nfsv4.1-client/sys/rpc/clnt_vc.c
==============================================================================
--- projects/nfsv4.1-client/sys/rpc/clnt_vc.c Sat Dec 10 23:35:05 2011 (r228402)
+++ projects/nfsv4.1-client/sys/rpc/clnt_vc.c Sat Dec 10 23:57:32 2011 (r228403)
@@ -67,6 +67,7 @@ __FBSDID("$FreeBSD$");
#include <sys/protosw.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
+#include <sys/sx.h>
#include <sys/syslog.h>
#include <sys/time.h>
#include <sys/uio.h>
@@ -77,8 +78,7 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc.h>
#include <rpc/rpc_com.h>
-
-#define MCALL_MSG_SIZE 24
+#include <rpc/_krpc.h>
struct cmessage {
struct cmsghdr cmsg;
@@ -106,43 +106,6 @@ static struct clnt_ops clnt_vc_ops = {
.cl_control = clnt_vc_control
};
-/*
- * A pending RPC request which awaits a reply. Requests which have
- * received their reply will have cr_xid set to zero and cr_mrep to
- * the mbuf chain of the reply.
- */
-struct ct_request {
- TAILQ_ENTRY(ct_request) cr_link;
- uint32_t cr_xid; /* XID of request */
- struct mbuf *cr_mrep; /* reply received by upcall */
- int cr_error; /* any error from upcall */
- char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
-};
-
-TAILQ_HEAD(ct_request_list, ct_request);
-
-struct ct_data {
- struct mtx ct_lock;
- int ct_threads; /* number of threads in clnt_vc_call */
- bool_t ct_closing; /* TRUE if we are closing */
- bool_t ct_closed; /* TRUE if we are closed */
- struct socket *ct_socket; /* connection socket */
- bool_t ct_closeit; /* close it on destroy */
- struct timeval ct_wait; /* wait interval in milliseconds */
- struct sockaddr_storage ct_addr; /* remote addr */
- struct rpc_err ct_error;
- uint32_t ct_xid;
- char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
- size_t ct_mpos; /* pos after marshal */
- const char *ct_waitchan;
- int ct_waitflag;
- struct mbuf *ct_record; /* current reply record */
- size_t ct_record_resid; /* how much left of reply to read */
- bool_t ct_record_eor; /* true if reading last fragment */
- struct ct_request_list ct_pending;
- int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
-};
-
static void clnt_vc_upcallsdone(struct ct_data *);
static const char clnt_vc_errstr[] = "%s : %s";
@@ -641,6 +604,7 @@ clnt_vc_control(CLIENT *cl, u_int reques
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
void *infop = info;
+ SVCXPRT *xprt;
mtx_lock(&ct->ct_lock);
@@ -752,6 +716,14 @@ clnt_vc_control(CLIENT *cl, u_int reques
*(int *) info = FALSE;
break;
+ case CLSET_BACKCHANNEL:
+ xprt = (SVCXPRT *)info;
+ if (ct->ct_backchannelxprt == NULL) {
+ xprt->xp_p2 = ct;
+ ct->ct_backchannelxprt = xprt;
+ }
+ break;
+
default:
mtx_unlock(&ct->ct_lock);
return (FALSE);
@@ -817,10 +789,20 @@ clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
struct socket *so = NULL;
+ SVCXPRT *xprt;
clnt_vc_close(cl);
mtx_lock(&ct->ct_lock);
+ xprt = ct->ct_backchannelxprt;
+ ct->ct_backchannelxprt = NULL;
+ if (xprt != NULL) {
+ mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */
+ sx_xlock(&xprt->xp_lock);
+ mtx_lock(&ct->ct_lock);
+ xprt->xp_p2 = NULL;
+ xprt_unregister(xprt);
+ }
if (ct->ct_socket) {
if (ct->ct_closeit) {
@@ -829,6 +811,11 @@ clnt_vc_destroy(CLIENT *cl)
}
mtx_unlock(&ct->ct_lock);
+ if (xprt != NULL) {
+ sx_xunlock(&xprt->xp_lock);
+ SVC_RELEASE(xprt);
+printf("xprt rel\n");
+ }
mtx_destroy(&ct->ct_lock);
if (so) {
@@ -855,11 +842,13 @@ clnt_vc_soupcall(struct socket *so, void
{
struct ct_data *ct = (struct ct_data *) arg;
struct uio uio;
- struct mbuf *m;
+ struct mbuf *m, *m2;
struct ct_request *cr;
int error, rcvflag, foundreq;
- uint32_t xid, header;
+ uint32_t xid_plus_direction[2], header;
bool_t do_read;
+ SVCXPRT *xprt;
+ struct cf_conn *cd;
ct->ct_upcallrefs++;
uio.uio_td = curthread;
@@ -974,45 +963,91 @@ clnt_vc_soupcall(struct socket *so, void
&& ct->ct_record_eor) {
/*
* The XID is in the first uint32_t of
- * the reply.
+ * the reply and the message direction
+ * is the second one.
*/
- if (ct->ct_record->m_len < sizeof(xid) &&
+ if (ct->ct_record->m_len <
+ sizeof(xid_plus_direction) &&
m_length(ct->ct_record, NULL) <
- sizeof(xid)) {
+ sizeof(xid_plus_direction)) {
m_freem(ct->ct_record);
break;
}
- m_copydata(ct->ct_record, 0, sizeof(xid),
- (char *)&xid);
- xid = ntohl(xid);
-
- mtx_lock(&ct->ct_lock);
- foundreq = 0;
- TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
- if (cr->cr_xid == xid) {
+ m_copydata(ct->ct_record, 0,
+ sizeof(xid_plus_direction),
+ (char *)xid_plus_direction);
+ xid_plus_direction[0] =
+ ntohl(xid_plus_direction[0]);
+ xid_plus_direction[1] =
+ ntohl(xid_plus_direction[1]);
+ /* Check message direction. */
+ if (xid_plus_direction[1] == CALL) {
+ /* This is a backchannel request. */
+printf("bc req\n");
+ mtx_lock(&ct->ct_lock);
+ xprt = ct->ct_backchannelxprt;
+ if (xprt == NULL) {
+ mtx_unlock(&ct->ct_lock);
+ /* Just throw it away. */
+ m_freem(ct->ct_record);
+ ct->ct_record = NULL;
+ } else {
+ cd = (struct cf_conn *)
+ xprt->xp_p1;
+ m2 = cd->mreq;
/*
- * This one
- * matches. We leave
- * the reply mbuf in
- * cr->cr_mrep. Set
- * the XID to zero so
- * that we will ignore
- * any duplicaed
- * replies.
+ * The requests are chained
+ * in the m_nextpkt list.
*/
- cr->cr_xid = 0;
- cr->cr_mrep = ct->ct_record;
- cr->cr_error = 0;
- foundreq = 1;
- wakeup(cr);
- break;
+ while (m2 != NULL &&
+ m2->m_nextpkt != NULL)
+ /* Find end of list. */
+ m2 = m2->m_nextpkt;
+ if (m2 != NULL)
+ m2->m_nextpkt =
+ ct->ct_record;
+ else
+ cd->mreq =
+ ct->ct_record;
+ ct->ct_record->m_nextpkt =
+ NULL;
+ ct->ct_record = NULL;
+ xprt_active(xprt);
+ mtx_unlock(&ct->ct_lock);
+printf("got bmsg\n");
}
- }
- mtx_unlock(&ct->ct_lock);
+ } else {
+ mtx_lock(&ct->ct_lock);
+ foundreq = 0;
+ TAILQ_FOREACH(cr, &ct->ct_pending,
+ cr_link) {
+ if (cr->cr_xid ==
+ xid_plus_direction[0]) {
+ /*
+ * This one
+ * matches. We leave
+ * the reply mbuf in
+ * cr->cr_mrep. Set
+ * the XID to zero so
+ * that we will ignore
+ * any duplicated
+ * replies.
+ */
+ cr->cr_xid = 0;
+ cr->cr_mrep =
+ ct->ct_record;
+ cr->cr_error = 0;
+ foundreq = 1;
+ wakeup(cr);
+ break;
+ }
+ }
+ mtx_unlock(&ct->ct_lock);
- if (!foundreq)
- m_freem(ct->ct_record);
- ct->ct_record = NULL;
+ if (!foundreq)
+ m_freem(ct->ct_record);
+ ct->ct_record = NULL;
+ }
}
}
} while (m);
More information about the svn-src-projects
mailing list