PERFORCE change 139439 for review
Aaron Meihm
alm at FreeBSD.org
Sun Apr 6 00:28:55 UTC 2008
http://perforce.freebsd.org/chv.cgi?CH=139439
Change 139439 by alm at alm_praetorian on 2008/04/06 00:27:56
Additional code working towards new threaded producer/consumer model.
Affected files ...
.. //depot/projects/trustedbsd/netauditd/grammar.y#3 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.h#13 edit
.. //depot/projects/trustedbsd/netauditd/reader.c#2 edit
.. //depot/projects/trustedbsd/netauditd/reader.h#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.c#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.h#2 edit
Differences ...
==== //depot/projects/trustedbsd/netauditd/grammar.y#3 (text+ko) ====
@@ -131,6 +131,7 @@
new->ac_name = $2;
new->ac_path = $3;
new->ac_init_func = writer_init_trail;
+ new->ac_write_func = writer_write_trail;
writer_q_init(new);
$$ = new;
}
==== //depot/projects/trustedbsd/netauditd/netauditd.h#13 (text+ko) ====
@@ -38,12 +38,11 @@
struct audit_record {
void *ar_buf;
u_int32_t ar_record_len;
- int ar_refcount;
};
struct au_queue_ent {
TAILQ_ENTRY(au_queue_ent) aq_glue;
- struct audit_record *aq_record;
+ struct audit_record aq_record;
u_int32_t aq_remain;
};
@@ -51,12 +50,12 @@
struct au_qpair {
au_q_t qp_a, qp_b;
- int qp_ready;
- au_q_t *qp_read, *qp_write;
pthread_mutex_t qp_lock;
- pthread_cond_t qp_cond;
- u_int32_t qp_read_size;
- time_t qp_time;
+ au_q_t *qp_store;
+ au_q_t *qp_hold;
+ au_q_t *qp_free;
+ u_int32_t qp_store_len;
+ int qp_store_n;
};
struct au_cmpnt {
@@ -76,6 +75,7 @@
int (*ac_init_func)(struct au_cmpnt *);
int (*ac_read_func)(struct au_cmpnt *);
+ int (*ac_write_func)(struct au_cmpnt *);
};
struct au_src_buffer {
==== //depot/projects/trustedbsd/netauditd/reader.c#2 (text+ko) ====
@@ -26,6 +26,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
+#include <sys/endian.h>
#include <stdio.h>
#include <stdlib.h>
@@ -43,22 +44,27 @@
#include "reader.h"
#include "writer.h"
-#define ROTATE(x) if (x.qp_read == &x.qp_a) { \
- x.qp_read = &x.qp_b; \
- x.qp_write = &x.qp_a; \
- } \
- else { \
- x.qp_read = &x.qp_a; \
- x.qp_write = &x.qp_b; \
- } \
- x.qp_read_size = 0; \
- x.qp_time = time(NULL);
-
#define SRC_BUFFER_INIT(x) x = malloc(sizeof(struct au_src_buffer)); \
assert (x != NULL); \
bzero(x, sizeof(struct au_src_buffer));
-static int srcs_online; /* All source components online */
+#define WRITER_SIGNAL(x) (void) pthread_mutex_lock(&ready_lock); \
+ records_waiting += x; \
+ x = 0; \
+ (void) pthread_cond_signal(&ready_cond); \
+ (void) pthread_mutex_unlock(&ready_lock)
+
+#define ROTATE assert(ac->ac_q.qp_free != NULL); \
+ ac->ac_q.qp_hold = ac->ac_q.qp_store; \
+ ac->ac_q.qp_store = ac->ac_q.qp_free; \
+ ac->ac_q.qp_free = NULL; \
+ ac->ac_q.qp_store_len = 0
+
+pthread_mutex_t ready_lock;
+pthread_cond_t ready_cond;
+u_int32_t records_waiting;
+
+static fd_set rfds;
int
reader_accept_client(struct au_cmpnt *ac)
@@ -82,6 +88,7 @@
bcopy(&addr, &new->as_addr, sizeof(new->as_addr));
new->as_addrlen = addrlen;
TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue);
+ reader_build_rfds(&rfds);
return (0);
}
@@ -89,25 +96,30 @@
reader_build_rfds(fd_set *rfds)
{
struct au_cmpnt *ac;
+ struct au_src_buffer *s;
FD_ZERO(rfds);
TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
- if (ac->ac_flags & FLAG_ONLINE)
- FD_SET(ac->ac_fd, rfds);
+ FD_SET(ac->ac_fd, rfds);
+ if (ac->ac_type == COMPONENT_NET) {
+ TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue)
+ FD_SET(s->as_fd, rfds);
+ }
}
}
void
reader_handler(fd_set *rfds)
{
- struct au_cmpnt *ac, *tmp;
+ struct au_src_buffer *as, *tmp;
+ struct au_cmpnt *ac;
fd_set lrfds;
struct timeval tv;
int ret, ret2;
lrfds = *rfds;
bzero(&tv, sizeof(struct timeval));
- tv.tv_sec = 1;
+ tv.tv_sec = 5;
ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv);
if (ret == -1) {
if (errno == EINTR)
@@ -115,15 +127,22 @@
else
exit(2);
}
- else if (ret == 0)
+ else if (ret == 0) {
+ reader_timeout();
return;
- TAILQ_FOREACH_SAFE(ac, &ac_list_src, ac_glue, tmp) {
+ }
+ TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
if (FD_ISSET(ac->ac_fd, &lrfds)) {
ret2 = ac->ac_read_func(ac);
- if (ret2 == -1) {
- (void) close(ac->ac_fd);
- ac->ac_flags &= FLAG_ONLINE;
- }
+ if (ret2 == -1)
+ exit(2);
+ }
+ if (ac->ac_type == COMPONENT_NET) {
+ TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue,
+ tmp)
+ if (FD_ISSET(as->as_fd, &lrfds)) {
+ ret2 = reader_read_socket(as);
+ }
}
}
}
@@ -131,30 +150,13 @@
void
reader_init()
{
- time_t t;
struct au_cmpnt *ac;
- srcs_online = 1;
- TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
- if (ac->ac_flags & FLAG_ONLINE)
- continue;
- t = time(NULL);
- if (ac->ac_failed != 0)
- if ((ac->ac_failed + READER_RETRY) > t) {
- srcs_online = 0;
- continue;
- }
- dprintf("reader_init: %s", ac->ac_name);
- if (ac->ac_init_func(ac) == -1) {
- srcs_online = 0;
- ac->ac_failed = time(NULL);
- }
- else {
- dprintf("reader_init: %s online", ac->ac_name);
- ac->ac_failed = 0;
- ac->ac_flags |= FLAG_ONLINE;
- }
- }
+ (void) pthread_mutex_init(&ready_lock, NULL);
+ (void) pthread_cond_init(&ready_cond, NULL);
+ TAILQ_FOREACH(ac, &ac_list_src, ac_glue)
+ if (ac->ac_init_func(ac) == -1)
+ exit(2);
}
int
@@ -205,50 +207,51 @@
for (i = 0; i < ac->ac_ndsts; i++)
reader_q_record_cmpnt(ar, ac->ac_dsts[i]);
+ /* Once we have copied the record to all this components consumers
+ * we can discard it. */
+ free(ar->ar_buf);
+ free(ar);
}
void
reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac)
{
struct au_queue_ent *new;
- int rotate = 0;
- time_t t;
- t = time(NULL);
- if (ac->ac_q.qp_time == 0)
- ac->ac_q.qp_time = t;
- else if ((ac->ac_q.qp_time + WRITER_ROTATE_TIMEOUT) <= t) {
- if (!TAILQ_EMPTY(ac->ac_q.qp_read))
- rotate = 1;
- }
- if (ac->ac_q.qp_read_size >= WRITER_LOW_WATER)
- rotate = 1;
- if (rotate) {
- if (pthread_mutex_lock(&ac->ac_q.qp_lock) != 0)
- exit(2);
- /* If the writer is still processing the other buffer, the
- * record is dropped. */
- if (!ac->ac_q.qp_ready) {
- if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0)
- exit(2);
- return;
- }
- dprintf("reader_q_record_cmpnt: %s: rotate", ac->ac_name);
- ROTATE(ac->ac_q);
- if (pthread_cond_signal(&ac->ac_q.qp_cond) != 0)
- exit(2);
- if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0)
- exit(2);
- }
- dprintf("reader_q_record: %p for %s", ar, ac->ac_name);
new = malloc(sizeof(struct au_queue_ent));
assert(new != NULL);
bzero(new, sizeof(struct au_queue_ent));
- new->aq_record = ar;
+ new->aq_record.ar_buf = malloc(ar->ar_record_len);
+ assert(new->aq_record.ar_buf != NULL);
+ bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len);
+ new->aq_record.ar_record_len = ar->ar_record_len;
new->aq_remain = ar->ar_record_len;
- ar->ar_refcount++;
- TAILQ_INSERT_TAIL(ac->ac_q.qp_read, new, aq_glue);
- ac->ac_q.qp_read_size += ar->ar_record_len;
+ (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+ if (ac->ac_q.qp_store_len < WRITER_MAX) {
+ dprintf("queueing record for %s (%d bytes in queue)",
+ ac->ac_name, ac->ac_q.qp_store_len);
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
+ ac->ac_q.qp_store_len += ar->ar_record_len;
+ ac->ac_q.qp_store_n++;
+ return;
+ }
+ if (ac->ac_q.qp_hold != NULL) {
+ /* This consumer is still processing it's queue, so the record
+ * is dropped. */
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ dprintf("dropping record for %s", ac->ac_name);
+ free(new->aq_record.ar_buf);
+ free(new);
+ return;
+ }
+ dprintf("rotating queues for %s", ac->ac_name);
+ ROTATE;
+ WRITER_SIGNAL(ac->ac_q.qp_store_n);
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
+ ac->ac_q.qp_store_len += ar->ar_record_len;
+ ac->ac_q.qp_store_n++;
}
int
@@ -279,16 +282,96 @@
return (0);
}
+int
+reader_read_socket(struct au_src_buffer *asb)
+{
+ u_char *bufptr, *recbufptr;
+ int ret, left;
+ u_int32_t hdr_remain, val, need;
+ u_char as_buf[2048];
+
+ ret = read(asb->as_fd, as_buf, sizeof(as_buf));
+ if (ret == -1) {
+ if (errno != EINTR)
+ return (-1);
+ else
+ return (0);
+ }
+ else if (ret == 0)
+ return (-1);
+ left = ret;
+ bufptr = as_buf;
+ while (left > 0) {
+ if (asb->as_record == NULL) {
+ hdr_remain = sizeof(asb->as_header) -
+ asb->as_nread;
+ if (left >= hdr_remain) {
+ (void) memcpy(asb->as_header + asb->as_nread,
+ bufptr, hdr_remain);
+ asb->as_nread += hdr_remain;
+ left -= hdr_remain;
+ bufptr += hdr_remain;
+ (void) memcpy(&val, asb->as_header + 1,
+ sizeof(val));
+ asb->as_record =
+ malloc(sizeof(struct audit_record));
+ assert(asb->as_record != NULL);
+ asb->as_record->ar_record_len = be32toh(val);
+ asb->as_record->ar_buf = \
+ malloc(asb->as_record->ar_record_len);
+ assert(asb->as_record->ar_buf != NULL);
+ (void) memcpy(asb->as_record->ar_buf,
+ asb->as_header, sizeof(asb->as_header));
+ continue;
+ }
+ else {
+ (void) memcpy(asb->as_header + asb->as_nread,
+ bufptr, left);
+ asb->as_nread += left;
+ return (0);
+ }
+ }
+ need = asb->as_record->ar_record_len - asb->as_nread;
+ recbufptr = asb->as_record->ar_buf + asb->as_nread;
+ if (left < need) {
+ (void) memcpy(recbufptr, bufptr, left);
+ asb->as_nread += left;
+ return (0);
+ }
+ else {
+ (void) memcpy(recbufptr, bufptr, need);
+ left -= need;
+ bufptr += need;
+ reader_q_record(asb->as_record, asb->as_parent);
+ asb->as_record = NULL;
+ asb->as_nread = 0;
+ }
+ }
+ return (0);
+}
+
void
reader_start()
{
- fd_set rfds;
+ reader_init();
+ reader_build_rfds(&rfds);
+ for (;;)
+ reader_handler(&rfds);
+}
+
+void
+reader_timeout()
+{
+ struct au_cmpnt *ac;
- for (;;) {
- if (!srcs_online) {
- reader_init();
- reader_build_rfds(&rfds);
+ TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
+ (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+ if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) {
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ continue;
}
- reader_handler(&rfds);
+ ROTATE;
+ WRITER_SIGNAL(ac->ac_q.qp_store_n);
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
}
}
==== //depot/projects/trustedbsd/netauditd/reader.h#2 (text+ko) ====
@@ -24,7 +24,9 @@
* SUCH DAMAGE.
*/
-#define READER_RETRY 30
+extern pthread_mutex_t ready_lock;
+extern pthread_cond_t ready_cond;
+u_int32_t records_waiting;
int reader_accept_client(struct au_cmpnt *);
void reader_build_rfds(fd_set *);
@@ -35,4 +37,6 @@
void reader_q_record(struct audit_record *, struct au_cmpnt *);
void reader_q_record_cmpnt(struct audit_record *, struct au_cmpnt *);
int reader_read_pipe(struct au_cmpnt *);
+int reader_read_socket(struct au_src_buffer *);
void reader_start(void);
+void reader_timeout(void);
==== //depot/projects/trustedbsd/netauditd/writer.c#2 (text+ko) ====
@@ -61,13 +61,14 @@
void
writer_handler(fd_set *wfds)
{
+ struct au_cmpnt *ac;
fd_set lwfds;
struct timeval *tv = NULL;
- int ret;
+ int ret, have_records = 1;
lwfds = *wfds;
if (!dsts_online) {
- dprintf("writer_handler: applying select timeout");
+ dprintf("writer applying select timeout");
tv = malloc(sizeof(struct timeval));
bzero(tv, sizeof(struct timeval));
tv->tv_sec = 1;
@@ -79,6 +80,34 @@
else
exit(2);
}
+ (void) pthread_mutex_lock(&ready_lock);
+ if (records_waiting == 0) {
+ have_records = 0;
+ if (dsts_online) {
+ while (records_waiting == 0) {
+ dprintf("writer waiting for records");
+ (void) pthread_cond_wait(&ready_cond,
+ &ready_lock);
+ }
+ have_records = 1;
+ (void) pthread_mutex_unlock(&ready_lock);
+ }
+ }
+ (void) pthread_mutex_unlock(&ready_lock);
+ if (!have_records)
+ return;
+ TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
+ if (!FD_ISSET(ac->ac_fd, &lwfds))
+ continue;
+ (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+ if (ac->ac_q.qp_hold != NULL) {
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ ret = ac->ac_write_func(ac);
+ if (ret == 1) /* Queue has been drained */
+ writer_q_drained(ac);
+ }
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ }
}
void
@@ -153,16 +182,25 @@
}
void
+writer_q_drained(struct au_cmpnt *ac)
+{
+ (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+ assert(ac->ac_q.qp_hold != NULL);
+ ac->ac_q.qp_free = ac->ac_q.qp_hold;
+ ac->ac_q.qp_hold = NULL;
+ (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+}
+
+void
writer_q_init(struct au_cmpnt *ac)
{
TAILQ_INIT(&ac->ac_q.qp_a);
TAILQ_INIT(&ac->ac_q.qp_b);
- ac->ac_q.qp_read = &ac->ac_q.qp_a;
- ac->ac_q.qp_write = &ac->ac_q.qp_b;
+ ac->ac_q.qp_store = &ac->ac_q.qp_a;
+ ac->ac_q.qp_free = &ac->ac_q.qp_b;
+ ac->ac_q.qp_hold = NULL;
if (pthread_mutex_init(&ac->ac_q.qp_lock, NULL) != 0)
exit(2);
- if (pthread_cond_init(&ac->ac_q.qp_cond, NULL) != 0)
- exit(2);
}
void *
@@ -179,3 +217,42 @@
}
return (NULL);
}
+
+int
+writer_write_trail(struct au_cmpnt *ac)
+{
+ struct au_queue_ent *aq, *tmp;
+ struct audit_record *ar;
+ u_int32_t offset;
+ int ret;
+
+ TAILQ_FOREACH_SAFE(aq, ac->ac_q.qp_hold, aq_glue, tmp) {
+ ar = &aq->aq_record;
+ offset = ar->ar_record_len - aq->aq_remain;
+ dprintf("write offset %d", offset);
+ ret = write(ac->ac_fd, ar->ar_buf + offset, aq->aq_remain);
+ if (ret == -1) {
+ if ((errno == EINTR) || (errno == EAGAIN))
+ return (0);
+ else
+ return (-1);
+ }
+ else if (ret == aq->aq_remain) {
+ dprintf("wrote %d bytes to %s (completed)", ret,
+ ac->ac_name);
+ (void) pthread_mutex_lock(&ready_lock);
+ records_waiting--;
+ dprintf("%d records waiting", records_waiting);
+ (void) pthread_mutex_unlock(&ready_lock);
+ TAILQ_REMOVE(ac->ac_q.qp_hold, aq, aq_glue);
+ free(aq->aq_record.ar_buf);
+ free(aq);
+ }
+ else {
+ dprintf("partial write");
+ aq->aq_remain -= ret;
+ return (0);
+ }
+ }
+ return (1);
+}
==== //depot/projects/trustedbsd/netauditd/writer.h#2 (text+ko) ====
@@ -24,7 +24,7 @@
* SUCH DAMAGE.
*/
-#define WRITER_LOW_WATER 1024000
+#define WRITER_MAX 32768
#define WRITER_ROTATE_TIMEOUT 5
#define WRITER_RETRY 60
@@ -33,5 +33,7 @@
void writer_init(void);
int writer_init_net(struct au_cmpnt *);
int writer_init_trail(struct au_cmpnt *);
+void writer_q_drained(struct au_cmpnt *);
void writer_q_init(struct au_cmpnt *);
void *writer_start(void *);
+int writer_write_trail(struct au_cmpnt *);
More information about the p4-projects
mailing list