svn commit: r346418 - in projects/fuse2: sys/fs/fuse tests/sys/fs/fusefs
Alan Somers
asomers at FreeBSD.org
Tue Sep 3 14:07:11 UTC 2019
Author: asomers
Date: Fri Apr 19 21:50:23 2019
New Revision: 346418
URL: https://svnweb.freebsd.org/changeset/base/346418
Log:
fusefs: give priority to FUSE_INTERRUPT operations
When interrupting a FUSE operation, send the FUSE_INTERRUPT op to the daemon
ASAP, ahead of other unrelated operations.
PR: 236530
Sponsored by: The FreeBSD Foundation
Modified:
projects/fuse2/sys/fs/fuse/fuse_internal.c
projects/fuse2/sys/fs/fuse/fuse_ipc.c
projects/fuse2/sys/fs/fuse/fuse_ipc.h
projects/fuse2/sys/fs/fuse/fuse_vnops.c
projects/fuse2/tests/sys/fs/fusefs/interrupt.cc
Modified: projects/fuse2/sys/fs/fuse/fuse_internal.c
==============================================================================
--- projects/fuse2/sys/fs/fuse/fuse_internal.c Fri Apr 19 20:31:12 2019 (r346417)
+++ projects/fuse2/sys/fs/fuse/fuse_internal.c Fri Apr 19 21:50:23 2019 (r346418)
@@ -295,7 +295,7 @@ fuse_internal_fsync(struct vnode *vp,
} else {
fuse_insert_callback(fdi.tick,
fuse_internal_fsync_callback);
- fuse_insert_message(fdi.tick);
+ fuse_insert_message(fdi.tick, false);
}
if (err == ENOSYS) {
/* ENOSYS means "success, and don't call again" */
@@ -593,7 +593,7 @@ fuse_internal_forget_send(struct mount *mp,
ffi = fdi.indata;
ffi->nlookup = nlookup;
- fuse_insert_message(fdi.tick);
+ fuse_insert_message(fdi.tick, false);
fdisp_destroy(&fdi);
}
@@ -736,7 +736,7 @@ fuse_internal_send_init(struct fuse_data *data, struct
fiii->flags = FUSE_POSIX_LOCKS;
fuse_insert_callback(fdi.tick, fuse_internal_init_callback);
- fuse_insert_message(fdi.tick);
+ fuse_insert_message(fdi.tick, false);
fdisp_destroy(&fdi);
}
Modified: projects/fuse2/sys/fs/fuse/fuse_ipc.c
==============================================================================
--- projects/fuse2/sys/fs/fuse/fuse_ipc.c Fri Apr 19 20:31:12 2019 (r346417)
+++ projects/fuse2/sys/fs/fuse/fuse_ipc.c Fri Apr 19 21:50:23 2019 (r346418)
@@ -238,7 +238,8 @@ fuse_interrupt_send(struct fuse_ticket *otick, int err
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
otick->irq_unique = fdi.tick->tk_unique;
- fuse_insert_message(fdi.tick);
+ /* Interrupt ops should be delivered ASAP */
+ fuse_insert_message(fdi.tick, true);
fdisp_destroy(&fdi);
} else {
/* This ticket has already been interrupted */
@@ -660,8 +661,14 @@ fuse_insert_callback(struct fuse_ticket *ftick, fuse_h
fuse_lck_mtx_unlock(ftick->tk_data->aw_mtx);
}
+/*
+ * Insert a new upgoing ticket into the message queue
+ *
+ * If urgent is true, insert at the front of the queue. Otherwise, insert in
+ * FIFO order.
+ */
void
-fuse_insert_message(struct fuse_ticket *ftick)
+fuse_insert_message(struct fuse_ticket *ftick, bool urgent)
{
if (ftick->tk_flag & FT_DIRTY) {
panic("FUSE: ticket reused without being refreshed");
@@ -672,7 +679,10 @@ fuse_insert_message(struct fuse_ticket *ftick)
return;
}
fuse_lck_mtx_lock(ftick->tk_data->ms_mtx);
- fuse_ms_push(ftick);
+ if (urgent)
+ fuse_ms_push_head(ftick);
+ else
+ fuse_ms_push(ftick);
wakeup_one(ftick->tk_data);
selwakeuppri(&ftick->tk_data->ks_rsel, PZERO + 1);
fuse_lck_mtx_unlock(ftick->tk_data->ms_mtx);
@@ -972,7 +982,7 @@ fdisp_wait_answ(struct fuse_dispatcher *fdip)
fdip->answ_stat = 0;
fuse_insert_callback(fdip->tick, fuse_standard_handler);
- fuse_insert_message(fdip->tick);
+ fuse_insert_message(fdip->tick, false);
if ((err = fticket_wait_answer(fdip->tick))) {
fuse_lck_mtx_lock(fdip->tick->tk_aw_mtx);
Modified: projects/fuse2/sys/fs/fuse/fuse_ipc.h
==============================================================================
--- projects/fuse2/sys/fs/fuse/fuse_ipc.h Fri Apr 19 20:31:12 2019 (r346417)
+++ projects/fuse2/sys/fs/fuse/fuse_ipc.h Fri Apr 19 21:50:23 2019 (r346418)
@@ -285,6 +285,7 @@ fsess_opt_brokenio(struct mount *mp)
return (fuse_fix_broken_io || (data->dataflags & FSESS_BROKENIO));
}
+/* Insert a new upgoing message */
static inline void
fuse_ms_push(struct fuse_ticket *ftick)
{
@@ -293,6 +294,15 @@ fuse_ms_push(struct fuse_ticket *ftick)
STAILQ_INSERT_TAIL(&ftick->tk_data->ms_head, ftick, tk_ms_link);
}
+/* Insert a new upgoing message to the front of the queue */
+static inline void
+fuse_ms_push_head(struct fuse_ticket *ftick)
+{
+ mtx_assert(&ftick->tk_data->ms_mtx, MA_OWNED);
+ refcount_acquire(&ftick->tk_refcount);
+ STAILQ_INSERT_HEAD(&ftick->tk_data->ms_head, ftick, tk_ms_link);
+}
+
static inline struct fuse_ticket *
fuse_ms_pop(struct fuse_data *data)
{
@@ -345,7 +355,7 @@ fuse_aw_pop(struct fuse_data *data)
struct fuse_ticket *fuse_ticket_fetch(struct fuse_data *data);
int fuse_ticket_drop(struct fuse_ticket *ftick);
void fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t *handler);
-void fuse_insert_message(struct fuse_ticket *ftick);
+void fuse_insert_message(struct fuse_ticket *ftick, bool irq);
static inline bool
fuse_libabi_geq(struct fuse_data *data, uint32_t abi_maj, uint32_t abi_min)
Modified: projects/fuse2/sys/fs/fuse/fuse_vnops.c
==============================================================================
--- projects/fuse2/sys/fs/fuse/fuse_vnops.c Fri Apr 19 20:31:12 2019 (r346417)
+++ projects/fuse2/sys/fs/fuse/fuse_vnops.c Fri Apr 19 21:50:23 2019 (r346418)
@@ -592,7 +592,7 @@ fuse_vnop_create(struct vop_create_args *ap)
fri->fh = fh_id;
fri->flags = flags;
fuse_insert_callback(fdip->tick, fuse_internal_forget_callback);
- fuse_insert_message(fdip->tick);
+ fuse_insert_message(fdip->tick, false);
goto out;
}
ASSERT_VOP_ELOCKED(*vpp, "fuse_vnop_create");
Modified: projects/fuse2/tests/sys/fs/fusefs/interrupt.cc
==============================================================================
--- projects/fuse2/tests/sys/fs/fusefs/interrupt.cc Fri Apr 19 20:31:12 2019 (r346417)
+++ projects/fuse2/tests/sys/fs/fusefs/interrupt.cc Fri Apr 19 21:50:23 2019 (r346418)
@@ -46,6 +46,8 @@ using namespace testing;
/* Initial size of files used by these tests */
const off_t FILESIZE = 1000;
+static sem_t *signaled_semaphore;
+
/* Don't do anything; all we care about is that the syscall gets interrupted */
void sigusr2_handler(int __unused sig) {
if (verbosity > 1) {
@@ -63,6 +65,8 @@ void* killer(void* target) {
if (verbosity > 1)
printf("Signalling! thread %p\n", target);
pthread_kill((pthread_t)target, SIGUSR2);
+ if (signaled_semaphore != NULL)
+ sem_post(signaled_semaphore);
return(NULL);
}
@@ -112,13 +116,18 @@ void expect_write(uint64_t ino, uint64_t *write_unique
}));
}
-void setup_interruptor(pthread_t self)
+void setup_interruptor(pthread_t target)
{
ASSERT_NE(SIG_ERR, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
- ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)self))
+ ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)target))
<< strerror(errno);
}
+void SetUp() {
+ signaled_semaphore = NULL;
+ FuseTest::SetUp();
+}
+
void TearDown() {
struct sigaction sa;
@@ -624,6 +633,98 @@ TEST_F(Interrupt, in_progress_read)
EXPECT_EQ(EINTR, errno);
/* Deliberately leak fd. close(2) will be tested in release.cc */
+}
+
+/* FUSE_INTERRUPT operations should take priority over other pending ops */
+TEST_F(Interrupt, priority)
+{
+ const char FULLPATH0[] = "mountpoint/some_file.txt";
+ const char RELPATH0[] = "some_file.txt";
+ const char FULLPATH1[] = "mountpoint/other_file.txt";
+ const char RELPATH1[] = "other_file.txt";
+ const char *CONTENTS = "ijklmnop";
+ Sequence seq;
+ ssize_t bufsize = strlen(CONTENTS);
+ uint64_t ino0 = 42, ino1 = 43;
+ int fd0, fd1;
+ uint64_t write_unique;
+ pthread_t self, th0;
+ sem_t sem0, sem1;
+
+ ASSERT_EQ(0, sem_init(&sem0, 0, 0)) << strerror(errno);
+ ASSERT_EQ(0, sem_init(&sem1, 0, 0)) << strerror(errno);
+ self = pthread_self();
+
+ expect_lookup(RELPATH0, ino0);
+ expect_open(ino0, 0, 1);
+ expect_lookup(RELPATH1, ino1);
+ expect_open(ino1, 0, 1);
+ EXPECT_CALL(*m_mock, process(
+ ResultOf([=](auto in) {
+ return (in->header.opcode == FUSE_WRITE &&
+ in->header.nodeid == ino0);
+ }, Eq(true)),
+ _)
+ ).InSequence(seq)
+ .WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) {
+ write_unique = in->header.unique;
+
+ /* Let the next write proceed */
+ sem_post(&sem1);
+
+ /* Pause the daemon thread so it won't read the next op */
+ sem_wait(&sem0);
+
+ /* Finally, interrupt the original op */
+ out->header.error = -EINTR;
+ out->header.unique = write_unique;
+ out->header.len = sizeof(out->header);
+ })));
+ /*
+ * FUSE_INTERRUPT should be received before the second FUSE_WRITE, even
+ * though it was generated later
+ */
+ EXPECT_CALL(*m_mock, process(
+ ResultOf([&](auto in) {
+ return (in->header.opcode == FUSE_INTERRUPT &&
+ in->body.interrupt.unique == write_unique);
+ }, Eq(true)),
+ _)
+ ).InSequence(seq)
+ .WillOnce(Invoke(ReturnErrno(EAGAIN)));
+ EXPECT_CALL(*m_mock, process(
+ ResultOf([&](auto in) {
+ return (in->header.opcode == FUSE_WRITE &&
+ in->header.nodeid == ino1);
+ }, Eq(true)),
+ _)
+ ).InSequence(seq)
+ .WillOnce(Invoke(ReturnImmediate([=](auto in , auto out) {
+ SET_OUT_HEADER_LEN(out, write);
+ out->body.write.size = in->body.write.size;
+ })));
+
+ fd0 = open(FULLPATH0, O_WRONLY);
+ ASSERT_LE(0, fd0) << strerror(errno);
+ fd1 = open(FULLPATH1, O_WRONLY);
+ ASSERT_LE(0, fd1) << strerror(errno);
+
+ /* Use a separate thread for the first write */
+ ASSERT_EQ(0, pthread_create(&th0, NULL, write0, (void*)(intptr_t)fd0))
+ << strerror(errno);
+
+ signaled_semaphore = &sem0;
+
+ sem_wait(&sem1); /* Sequence the two writes */
+ setup_interruptor(th0);
+ ASSERT_EQ(bufsize, write(fd1, CONTENTS, bufsize)) << strerror(errno);
+
+ /* Wait awhile to make sure the signal generates no FUSE_INTERRUPT */
+ usleep(250'000);
+
+ pthread_join(th0, NULL);
+ sem_destroy(&sem1);
+ sem_destroy(&sem0);
}
/*
More information about the svn-src-projects
mailing list