svn commit: r197775 - head/sys/kern
Robert Watson
rwatson at FreeBSD.org
Mon Oct 5 14:49:17 UTC 2009
Author: rwatson
Date: Mon Oct 5 14:49:16 2009
New Revision: 197775
URL: http://svn.freebsd.org/changeset/base/197775
Log:
First cut at implementing SOCK_SEQPACKET support for UNIX (local) domain
sockets. This allows for reliable bi-directional datagram communication
over UNIX domain sockets, in contrast to SOCK_DGRAM (M:N, unreliable) or
SOCK_STERAM (bi-directional bytestream). Largely, this reuses existing
UNIX domain socket code. This allows applications requiring record-
oriented semantics to do so reliably via local IPC.
Some implementation notes (also present in XXX comments):
- Currently we lack an sbappend variant able to do datagrams and control
data without doing addresses, so we mark SOCK_SEQPACKET as PR_ADDR.
Adding a new variant will solve this problem.
- UNIX domain sockets on FreeBSD provide back-pressure/flow control
notification for stream sockets by manipulating the send socket
buffer's size during pru_send and pru_rcvd. This trick works less well
for SOCK_SEQPACKET as sosend_generic() uses sb_hiwat not just to
manage blocking, but also to determine maximum datagram size. Fixing
this requires rethinking how back-pressure is done for SOCK_SEQPACKET;
in the mean time, it's possible to get EMSGSIZE when buffers fill,
instead of blocking.
Discussed with: benl
Reviewed by: bz, rpaulo
MFC after: 3 months
Sponsored by: Google
Modified:
head/sys/kern/uipc_usrreq.c
Modified: head/sys/kern/uipc_usrreq.c
==============================================================================
--- head/sys/kern/uipc_usrreq.c Mon Oct 5 14:46:56 2009 (r197774)
+++ head/sys/kern/uipc_usrreq.c Mon Oct 5 14:49:16 2009 (r197775)
@@ -50,7 +50,8 @@
* garbage collector to find and tear down cycles of disconnected sockets.
*
* TODO:
- * SEQPACKET, RDM
+ * RDM
+ * distinguish datagram size limits from flow control limits in SEQPACKET
* rethink name space problems
* need a proper out-of-band
*/
@@ -112,6 +113,7 @@ static ino_t unp_ino; /* Prototype for
static int unp_rights; /* (g) File descriptors in flight. */
static struct unp_head unp_shead; /* (l) List of stream sockets. */
static struct unp_head unp_dhead; /* (l) List of datagram sockets. */
+static struct unp_head unp_sphead; /* (l) List of seqpacket sockets. */
static const struct sockaddr sun_noname = { sizeof(sun_noname), AF_LOCAL };
@@ -139,10 +141,14 @@ static u_long unpst_sendspace = PIPSIZ;
static u_long unpst_recvspace = PIPSIZ;
static u_long unpdg_sendspace = 2*1024; /* really max datagram size */
static u_long unpdg_recvspace = 4*1024;
+static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */
+static u_long unpsp_recvspace = PIPSIZ;
SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW, 0, "Local domain");
SYSCTL_NODE(_net_local, SOCK_STREAM, stream, CTLFLAG_RW, 0, "SOCK_STREAM");
SYSCTL_NODE(_net_local, SOCK_DGRAM, dgram, CTLFLAG_RW, 0, "SOCK_DGRAM");
+SYSCTL_NODE(_net_local, SOCK_SEQPACKET, seqpacket, CTLFLAG_RW, 0,
+ "SOCK_SEQPACKET");
SYSCTL_ULONG(_net_local_stream, OID_AUTO, sendspace, CTLFLAG_RW,
&unpst_sendspace, 0, "Default stream send space.");
@@ -152,6 +158,10 @@ SYSCTL_ULONG(_net_local_dgram, OID_AUTO,
&unpdg_sendspace, 0, "Default datagram send space.");
SYSCTL_ULONG(_net_local_dgram, OID_AUTO, recvspace, CTLFLAG_RW,
&unpdg_recvspace, 0, "Default datagram receive space.");
+SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, maxseqpacket, CTLFLAG_RW,
+ &unpsp_sendspace, 0, "Default seqpacket send space.");
+SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, recvspace, CTLFLAG_RW,
+ &unpsp_recvspace, 0, "Default seqpacket receive space.");
SYSCTL_INT(_net_local, OID_AUTO, inflight, CTLFLAG_RD, &unp_rights, 0,
"File descriptors in flight.");
@@ -257,6 +267,7 @@ static struct mbuf *unp_addsockcred(stru
*/
static struct domain localdomain;
static struct pr_usrreqs uipc_usrreqs_dgram, uipc_usrreqs_stream;
+static struct pr_usrreqs uipc_usrreqs_seqpacket;
static struct protosw localsw[] = {
{
.pr_type = SOCK_STREAM,
@@ -271,6 +282,19 @@ static struct protosw localsw[] = {
.pr_flags = PR_ATOMIC|PR_ADDR|PR_RIGHTS,
.pr_usrreqs = &uipc_usrreqs_dgram
},
+{
+ .pr_type = SOCK_SEQPACKET,
+ .pr_domain = &localdomain,
+
+ /*
+ * XXXRW: For now, PR_ADDR because soreceive will bump into them
+ * due to our use of sbappendaddr. A new sbappend variants is needed
+ * that supports both atomic record writes and control data.
+ */
+ .pr_flags = PR_ADDR|PR_ATOMIC|PR_CONNREQUIRED|PR_WANTRCVD|
+ PR_RIGHTS,
+ .pr_usrreqs = &uipc_usrreqs_seqpacket,
+},
};
static struct domain localdomain = {
@@ -353,6 +377,11 @@ uipc_attach(struct socket *so, int proto
recvspace = unpdg_recvspace;
break;
+ case SOCK_SEQPACKET:
+ sendspace = unpsp_sendspace;
+ recvspace = unpsp_recvspace;
+ break;
+
default:
panic("uipc_attach");
}
@@ -372,8 +401,22 @@ uipc_attach(struct socket *so, int proto
UNP_LIST_LOCK();
unp->unp_gencnt = ++unp_gencnt;
unp_count++;
- LIST_INSERT_HEAD(so->so_type == SOCK_DGRAM ? &unp_dhead : &unp_shead,
- unp, unp_link);
+ switch (so->so_type) {
+ case SOCK_STREAM:
+ LIST_INSERT_HEAD(&unp_shead, unp, unp_link);
+ break;
+
+ case SOCK_DGRAM:
+ LIST_INSERT_HEAD(&unp_dhead, unp, unp_link);
+ break;
+
+ case SOCK_SEQPACKET:
+ LIST_INSERT_HEAD(&unp_sphead, unp, unp_link);
+ break;
+
+ default:
+ panic("uipc_attach");
+ }
UNP_LIST_UNLOCK();
return (0);
@@ -705,11 +748,8 @@ uipc_rcvd(struct socket *so, int flags)
unp = sotounpcb(so);
KASSERT(unp != NULL, ("uipc_rcvd: unp == NULL"));
- if (so->so_type == SOCK_DGRAM)
- panic("uipc_rcvd DGRAM?");
-
- if (so->so_type != SOCK_STREAM)
- panic("uipc_rcvd unknown socktype");
+ if (so->so_type != SOCK_STREAM && so->so_type != SOCK_SEQPACKET)
+ panic("uipc_rcvd socktype %d", so->so_type);
/*
* Adjust backpressure on sender and wakeup any waiting to write.
@@ -824,6 +864,7 @@ uipc_send(struct socket *so, int flags,
break;
}
+ case SOCK_SEQPACKET:
case SOCK_STREAM:
if ((so->so_state & SS_ISCONNECTED) == 0) {
if (nam != NULL) {
@@ -875,11 +916,33 @@ uipc_send(struct socket *so, int flags,
* Send to paired receive port, and then reduce send buffer
* hiwater marks to maintain backpressure. Wake up readers.
*/
- if (control != NULL) {
- if (sbappendcontrol_locked(&so2->so_rcv, m, control))
+ switch (so->so_type) {
+ case SOCK_STREAM:
+ if (control != NULL) {
+ if (sbappendcontrol_locked(&so2->so_rcv, m,
+ control))
+ control = NULL;
+ } else
+ sbappend_locked(&so2->so_rcv, m);
+ break;
+
+ case SOCK_SEQPACKET: {
+ const struct sockaddr *from;
+
+ from = &sun_noname;
+ if (sbappendaddr_locked(&so2->so_rcv, from, m,
+ control))
control = NULL;
- } else
- sbappend_locked(&so2->so_rcv, m);
+ break;
+ }
+ }
+
+ /*
+ * XXXRW: While fine for SOCK_STREAM, this conflates maximum
+ * datagram size and back-pressure for SOCK_SEQPACKET, which
+ * can lead to undesired return of EMSGSIZE on send instead
+ * of more desirable blocking.
+ */
mbcnt_delta = so2->so_rcv.sb_mbcnt - unp2->unp_mbcnt;
unp2->unp_mbcnt = so2->so_rcv.sb_mbcnt;
sbcc = so2->so_rcv.sb_cc;
@@ -939,7 +1002,8 @@ uipc_sense(struct socket *so, struct sta
UNP_LINK_RLOCK();
UNP_PCB_LOCK(unp);
unp2 = unp->unp_conn;
- if (so->so_type == SOCK_STREAM && unp2 != NULL) {
+ if ((so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET) &&
+ unp2 != NULL) {
so2 = unp2->unp_socket;
sb->st_blksize += so2->so_rcv.sb_cc;
}
@@ -1009,6 +1073,26 @@ static struct pr_usrreqs uipc_usrreqs_dg
.pru_close = uipc_close,
};
+static struct pr_usrreqs uipc_usrreqs_seqpacket = {
+ .pru_abort = uipc_abort,
+ .pru_accept = uipc_accept,
+ .pru_attach = uipc_attach,
+ .pru_bind = uipc_bind,
+ .pru_connect = uipc_connect,
+ .pru_connect2 = uipc_connect2,
+ .pru_detach = uipc_detach,
+ .pru_disconnect = uipc_disconnect,
+ .pru_listen = uipc_listen,
+ .pru_peeraddr = uipc_peeraddr,
+ .pru_rcvd = uipc_rcvd,
+ .pru_send = uipc_send,
+ .pru_sense = uipc_sense,
+ .pru_shutdown = uipc_shutdown,
+ .pru_sockaddr = uipc_sockaddr,
+ .pru_soreceive = soreceive_generic, /* XXX: or...? */
+ .pru_close = uipc_close,
+};
+
static struct pr_usrreqs uipc_usrreqs_stream = {
.pru_abort = uipc_abort,
.pru_accept = uipc_accept,
@@ -1306,6 +1390,7 @@ unp_connect2(struct socket *so, struct s
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
unp2->unp_conn = unp;
if (req == PRU_CONNECT &&
((unp->unp_flags | unp2->unp_flags) & UNP_CONNWAIT))
@@ -1343,6 +1428,7 @@ unp_disconnect(struct unpcb *unp, struct
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
soisdisconnected(unp->unp_socket);
unp2->unp_conn = NULL;
soisdisconnected(unp2->unp_socket);
@@ -1368,7 +1454,22 @@ unp_pcblist(SYSCTL_HANDLER_ARGS)
struct unp_head *head;
struct xunpcb *xu;
- head = ((intptr_t)arg1 == SOCK_DGRAM ? &unp_dhead : &unp_shead);
+ switch ((intptr_t)arg1) {
+ case SOCK_STREAM:
+ head = &unp_shead;
+ break;
+
+ case SOCK_DGRAM:
+ head = &unp_dhead;
+ break;
+
+ case SOCK_SEQPACKET:
+ head = &unp_sphead;
+ break;
+
+ default:
+ panic("unp_pcblist: arg1 %d", (intptr_t)arg1);
+ }
/*
* The process of preparing the PCB list is too time-consuming and
@@ -1481,6 +1582,9 @@ SYSCTL_PROC(_net_local_dgram, OID_AUTO,
SYSCTL_PROC(_net_local_stream, OID_AUTO, pcblist, CTLFLAG_RD,
(caddr_t)(long)SOCK_STREAM, 0, unp_pcblist, "S,xunpcb",
"List of active local stream sockets");
+SYSCTL_PROC(_net_local_seqpacket, OID_AUTO, pcblist, CTLFLAG_RD,
+ (caddr_t)(long)SOCK_SEQPACKET, 0, unp_pcblist, "S,xunpcb",
+ "List of active local seqpacket sockets");
static void
unp_shutdown(struct unpcb *unp)
@@ -1492,7 +1596,8 @@ unp_shutdown(struct unpcb *unp)
UNP_PCB_LOCK_ASSERT(unp);
unp2 = unp->unp_conn;
- if (unp->unp_socket->so_type == SOCK_STREAM && unp2 != NULL) {
+ if ((unp->unp_socket->so_type == SOCK_STREAM ||
+ (unp->unp_socket->so_type == SOCK_SEQPACKET)) && unp2 != NULL) {
so = unp2->unp_socket;
if (so != NULL)
socantrcvmore(so);
@@ -1658,6 +1763,7 @@ unp_init(void)
NULL, EVENTHANDLER_PRI_ANY);
LIST_INIT(&unp_dhead);
LIST_INIT(&unp_shead);
+ LIST_INIT(&unp_sphead);
TASK_INIT(&unp_gc_task, 0, unp_gc, NULL);
UNP_LINK_LOCK_INIT();
UNP_LIST_LOCK_INIT();
@@ -1974,7 +2080,8 @@ SYSCTL_INT(_net_local, OID_AUTO, taskcou
static void
unp_gc(__unused void *arg, int pending)
{
- struct unp_head *heads[] = { &unp_dhead, &unp_shead, NULL };
+ struct unp_head *heads[] = { &unp_dhead, &unp_shead, &unp_sphead,
+ NULL };
struct unp_head **head;
struct file **unref;
struct unpcb *unp;
More information about the svn-src-head
mailing list