Application layer classifier for ipfw
Ermal Luçi
ermal.luci at gmail.com
Fri Aug 8 23:26:34 UTC 2008
On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm at wubethiopia.com> wrote:
> Mike Makonnen wrote:
>>
>> Patrick Tracanelli wrote:
>>>
>>> To let you know of my current (real world) tests:
>>>
>>> - Wireless Internet Provider 1:
>>> - 4Mbit/s of Internet Traffic
>>> - Classifying default protocols + soulseek + ssh
>>> - Classifying 100Mbit/s of dump over ssh
>>>
>>> Results in:
>>> No latency added, very low CPU usage, no packets dropping.
>>>
>>> - Wireless ISP 2:
>>> - 21 Mbit/s of Internet Traffic
>>> - Classifying default protocols + soulseek + ssh
>>>
>>> Results in:
>>> No tcp or udp traffic at all; everything that gets diverted never
>>> comes out of the divert socket, and ipfw-classifyd logs
>>>
>>> Aug 1 12:07:35 ourofino last message repeated 58 times
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
>>> (rule 50000)
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
>>> 50000)
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
>>> 50000)
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
>>> 1000)
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
>>> 50000)
>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh (rule
>>> 50000)
>>> Aug 1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
>>> socket: Operation not permitted
>>> Aug 1 12:18:50 ourofino last message repeated 90 times
>>
>> Hmmm... this part means that the call to sendto(2) to write the packet
>> back into network stack failed. This explains why you are not seein g any
>> traffic comming back out of the divert socket, but I don't see why it would
>> suddenly fail with a permission error. Could this be a kernel bug?
>>>
>>> Aug 1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
>>> Aug 1 12:19:11 ourofino last message repeated 94 times
>>>
>>> Raised queue len a lot (up to 40960), when the application starts it uses
>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
>>
>> This looks like a deadlock. If it weren't able to process packets fast
>> enough the cpu usage should be high even as it's spewing "packet dropped"
>> messages. Can you send me some more information like memory usage and the
>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
>> If you reduce the number of protocols you are trying to match against does
>> the behavior change? Using netstat -w1 -I<interface> can you tell me how
>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
>> the timestamps from the log file seem to show that the daemon is running for
>> approx. 34 sec. before the first "unable to write to write to divert socket"
>> message. Is it passing traffic during this time? Thanks.
>>
>> I've uploaded a newer version. Can you try that also please. It includes:
>> o SIGHUP forces it to re-read its configuration file
>> o rc.d script
>> o minor optimization (calls pthread_cond_signal with the mutex unlocked)
>> o code cleanup
>>
>> Also, for your convenience I have attached a patch against the earlier
>> version that removes a debugging printf that should remove spammage to your
>> log files (the current version has it removed already).
>>
>
> Ooops, a few minutes after I sent this email I found a couple of bugs (one
> major, and one minor). They were in the original tarball as well as the
> newer one I uploaded earlier today. I've uploaded a fixed version of the
> code. Can you try that instead please.
>
> Also, to help track down performance issues I've modified the Makefile to
> build a profiled version of the application so you can use gprof(1) to
> figure out where any problems lie.
>
Does this sound about right for implementing the GC and implementing syntax as
$protocol = dnpipe 20
$protocol2 = dnqueue 30
it has some extra goos for pf(4) and altq(4)
$protocol3 = queue $queue name
$protocol4 = tag TAGNAME
$protocol5 = action block
It adds 2 new options -e seconds for seconds before a flow is
considered expired and -n #packets proccessed before kicking the GC.
--- classifyd_old.c 2008-08-09 00:33:04.000000000 +0000
+++ classifyd.c 2008-08-09 00:33:34.000000000 +0000
@@ -28,13 +28,17 @@
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>
+#include <net/if.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/in_systm.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
+#include <net/pfvar.h>
#include <assert.h>
#include <err.h>
@@ -53,6 +57,7 @@
#include <unistd.h>
#include "hashtable.h"
+#include "hashtable_private.h"
#include "pathnames.h"
#include "protocols.h"
@@ -94,6 +99,7 @@
uint32_t if_datalen; /* length in bytes of if_data */
uint16_t if_pktcount; /* number of packets concatenated */
uint16_t if_fwrule; /* ipfw(4) rule associated with flow */
+ time_t expire; /* flow expire time */
};
/*
@@ -126,7 +132,7 @@
static struct ic_queue outQ;
/* divert(4) socket */
-static int dvtS;
+static int dvtS = 0;
/* config file path */
static const char *conf = IC_CONFIG_PATH;
@@ -137,12 +143,25 @@
/* List of protocols available to the system */
struct ic_protocols *fp;
+/* Our hashtables */
+struct hashtable *sh = NULL,
+ *th = NULL,
+ *uh = NULL;
+
+/* signaled to kick garbage collector */
+static pthread_cond_t gq_condvar;
+
+/* number of packets before kicking garbage collector */
+static unsigned int npackets = 250;
+
+static time_t time_expire = 40; /* 40 seconds */
/*
* Forward function declarations.
*/
void *classify_pthread(void *);
void *read_pthread(void *);
void *write_pthread(void *);
+void *garbage_pthread(void *);
static int equalkeys(void *, void *);
static unsigned int hashfromkey(void *);
static void test_re(void);
@@ -155,7 +174,7 @@
{
struct sockaddr_in addr;
struct sigaction sa;
- pthread_t classifytd, readtd, writetd;
+ pthread_t classifytd, readtd, writetd, garbagectd;
const char *errstr;
long long num;
uint16_t port, qmaxsz;
@@ -164,13 +183,27 @@
tflag = 0;
port = IC_DPORT;
qmaxsz = IC_QMAXSZ;
- while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
+ while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
switch(ch) {
case 'c':
conf = strdup(optarg);
if (conf == NULL)
err(EX_TEMPFAIL, "config file path");
break;
+ case 'e':
+ num = strtonum((const char *)optarg, 1, 400, &errstr);
+ if (num == 0 && errstr != NULL) {
+ errx(EX_USAGE, "invalud expire seconds: %s", errstr);
+ }
+ time_expire = (time_t)num;
+ break;
+ case 'n':
+ num = strtonum((const char *)optarg, 1,
65535, &errstr);
+ if (num == 0 && errstr != NULL) {
+ errx(EX_USAGE, "invalud expire
seconds: %s", errstr);
+ }
+ npackets = (unsigned int)num;
+ break;
case 'P':
protoDir = strdup(optarg);
if (protoDir == NULL)
@@ -230,6 +263,9 @@
error = pthread_cond_init(&outQ.fq_condvar, NULL);
if (error != 0)
err(EX_OSERR, "unable to initialize output queue condvar");
+ error = pthread_cond_init(&gq_condvar, NULL);
+ if (error != 0)
+ err(EX_OSERR, "unable to initialize garbage collector
condvar");
/*
* Create and bind the divert(4) socket.
@@ -276,32 +312,80 @@
if (error == -1)
err(EX_OSERR, "unable to set signal handler");
+ /*
+ * There are 3 tables: udp, tcp, and tcp syn.
+ * The tcp syn table tracks connections for which a
+ * SYN packet has been sent but no reply has been returned
+ * yet. Once the SYN ACK reply is detected it is moved to
+ * the regular tcp connection tracking table.
+ */
+ sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+ if (sh == NULL) {
+ syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
+ error = EX_SOFTWARE;
+ goto cleanup;
+ }
+ th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+ if (th == NULL) {
+ syslog(LOG_ERR, "unable to create TCP tracking table");
+ error = EX_SOFTWARE;
+ goto cleanup;
+ }
+ uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+ if (uh == NULL) {
+ syslog(LOG_ERR, "unable to create UDP tracking table");
+ error = EX_SOFTWARE;
+ goto cleanup;
+ }
+
/*
* Create the various threads.
*/
error = pthread_create(&readtd, NULL, read_pthread, NULL);
- if (error != 0)
- err(EX_OSERR, "unable to create reader thread");
+ if (error != 0) {
+ syslog(LOG_ERR, "unable to create reader thread");
+ error = EX_OSERR;
+ goto cleanup;
+ }
error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
- if (error != 0)
- err(EX_OSERR, "unable to create classifier thread");
+ if (error != 0) {
+ syslog(LOG_ERR, "unable to create classifier thread");
+ error = EX_OSERR;
+ goto cleanup;
+ }
error = pthread_create(&writetd, NULL, write_pthread, NULL);
- if (error != 0)
- err(EX_OSERR, "unable to create writer thread");
-
+ if (error != 0) {
+ syslog(LOG_ERR, "unable to create writer thread");
+ error = EX_OSERR;
+ goto cleanup;
+ }
+ error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
+ if (error != 0) {
+ syslog(LOG_ERR, "unable to create garbage collect thread");
+ error = EX_OSERR;
+ goto cleanup;
+ }
/*
* Wait for our threads to exit.
*/
pthread_join(readtd, NULL);
pthread_join(classifytd, NULL);
pthread_join(writetd, NULL);
-
+ pthread_join(garbagectd, NULL);
/*
* Cleanup
*/
- close(dvtS);
+cleanup:
+ if (dvtS > 0)
+ close(dvtS);
+ if (sh != NULL)
+ hashtable_destroy(sh, 1);
+ if (th != NULL)
+ hashtable_destroy(th, 1);
+ if (uh != NULL)
+ hashtable_destroy(uh, 1);
- return (0);
+ return (error);
}
void *
@@ -310,6 +394,7 @@
struct ic_pkt *pkt;
struct ip *ipp;
int len;
+ unsigned int pcktcnt = 0;
while (1) {
pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
@@ -353,6 +438,10 @@
STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
inQ.fq_size++;
pthread_mutex_unlock(&inQ.fq_mtx);
+ if (++pcktcnt > npackets) {
+ pcktcnt = 0;
+ pthread_cond_signal(&gq_condvar);
+ }
pthread_cond_signal(&inQ.fq_condvar);
}
@@ -420,39 +509,19 @@
struct tcphdr *tcp;
struct udphdr *udp;
struct ic_pkt *pkt;
- struct hashtable *sh, *th, *uh;
struct protocol *proto;
+ struct timeval tv;
regmatch_t pmatch;
u_char *data, *payload;
uint16_t trycount;
int datalen, error;
- /*
- * There are 3 tables: udp, tcp, and tcp syn.
- * The tcp syn table tracks connections for which a
- * SYN packet has been sent but no reply has been returned
- * yet. Once the SYN ACK reply is detected it is moved to
- * the regular tcp connection tracking table.
- */
- sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
- if (sh == NULL) {
- syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
- exit(EX_SOFTWARE);
- }
- th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
- if (th == NULL) {
- syslog(LOG_ERR, "unable to create TCP tracking table");
- exit(EX_SOFTWARE);
- }
- uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
- if (uh == NULL) {
- syslog(LOG_ERR, "unable to create UDP tracking table");
- exit(EX_SOFTWARE);
- }
-
flow = NULL;
key = NULL;
while(1) {
+ while(gettimeofday(&tv, NULL) != 0)
+ ;
+
pthread_mutex_lock(&inQ.fq_mtx);
pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
while (pkt == NULL) {
@@ -528,6 +597,8 @@
free(pkt);
continue;
}
+
+ flow->expire = tv.tv_sec;
goto enqueue;
/*
* Handle session tear-down.
@@ -583,8 +654,11 @@
* collecting IC_PKTMAXMATCH packets, just pass it through.
*/
} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
- flow->if_fwrule == 0)
+ flow->if_fwrule == 0) {
+ flow->expire = tv.tv_sec;
goto enqueue;
+ }
+ flow->expire = tv.tv_sec;
goto classify;
}
@@ -630,6 +704,7 @@
free(pkt);
continue;
}
+ flow->expire = tv.tv_sec;
goto classify;
}
@@ -688,6 +763,7 @@
flow->if_datalen = datalen;
flow->if_pktcount = 1;
flow->if_fwrule = 0;
+ flow->expire = tv.tv_sec;
if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) {
syslog(LOG_WARNING,
"packet dropped: unable to insert into table");
@@ -715,19 +791,26 @@
flow->if_data = data;
flow->if_datalen += datalen;
flow->if_pktcount++;
+ flow->expire = tv.tv_sec;
/*
* If we haven't been able to classify this flow after
* collecting IC_PKTMAXMATCH packets, just pass it through.
*/
} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
- flow->if_fwrule == 0)
+ flow->if_fwrule == 0) {
+ flow->expire = tv.tv_sec;
goto enqueue;
+ }
} else
/* Not an TCP or UDP packet. */
goto enqueue;
classify:
- assert(flow != NULL);
+ if (flow == NULL) {
+ syslog(LOG_ERR, "flow is null argghhhhhhh");
+ goto enqueue;
+ }
+ //assert(flow != NULL);
/*
* Inform divert(4) what rule to send it to by
@@ -823,6 +906,80 @@
return (NULL);
}
+void *
+garbage_pthread(void *arg __unused)
+{
+ char errbuf[LINE_MAX];
+ struct entry *e, *f;
+ unsigned int i, flows_expired, error;
+ struct timeval tv;
+
+ while (1) {
+ flows_expired = 0;
+ while (gettimeofday(&tv, NULL) != 0)
+ ;
+ tv.tv_sec -= time_expire;
+
+ pthread_mutex_lock(&inQ.fq_mtx);
+ error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
+ if (error != 0) {
+ strerror_r(error, errbuf, sizeof(errbuf));
+ syslog(EX_OSERR, "unable to wait on garbage
collection: %s",
+ errbuf);
+ exit(EX_OSERR);
+ }
+
+ for (i = 0; i < sh->tablelength; i++) {
+ e = sh->table[i];
+ while (e != NULL) {
+ f = e; e = e->next;
+ if (((struct ip_flow *)f->v)->expire < tv.tv_sec) {
+ freekey(f->k);
+ sh->entrycount--;
+ if (f->v != NULL)
+ free(f->v);
+ free(f);
+ flows_expired++;
+ }
+ }
+ }
+ for (i = 0; i < th->tablelength; i++) {
+ e = th->table[i];
+ while (e != NULL) {
+ f = e; e = e->next;
+ if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+ freekey(f->k);
+ th->entrycount--;
+ if (f->v != NULL)
+ free(f->v);
+ free(f);
+ flows_expired++;
+ }
+ }
+ }
+ for (i = 0; i < uh->tablelength; i++) {
+ e = uh->table[i];
+ while (e != NULL) {
+ f = e; e = e->next;
+ if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+ freekey(f->k);
+ uh->entrycount--;
+ if (f->v != NULL)
+ free(f->v);
+ free(f);
+ flows_expired++;
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&inQ.fq_mtx);
+
+ syslog(LOG_WARNING, "expired %u flows", flows_expired);
+ }
+
+ return (NULL);
+}
+
/*
* NOTE: The protocol list (plist) passed as an argument is a global
* variable. It is accessed from 3 functions: classify_pthread,
@@ -840,12 +997,20 @@
static int
read_config(const char *file, struct ic_protocols *plist)
{
+ enum { bufsize = 2048 };
struct protocol *proto;
properties props;
- const char *errmsg, *name, *value;
- int fd;
+ const char *errmsg, *name;
+ char *value;
+ int fd, fdpf;
uint16_t rule;
+ char **ap, *argv[bufsize];
+ fdpf = open("/dev/pf", O_RDONLY);
+ if (fdpf == -1) {
+ syslog(LOG_ERR, "unable to open /dev/pf");
+ return (EX_OSERR);
+ }
fd = open(file, O_RDONLY);
if (fd == -1) {
syslog(LOG_ERR, "unable to open configuration file");
@@ -863,10 +1028,48 @@
/* Do not match traffic against this pattern */
if (value == NULL)
continue;
- rule = strtonum(value, 1, 65535, &errmsg);
- if (rule == 0) {
+ for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
+ if (**ap != '\0')
+ if (++ap >= &argv[bufsize])
+ break;
+ if (!strncmp(argv[0], "queue", strlen("queue"))) {
+ if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
+ syslog(LOG_WARNING,
+ "could not get ALTQ translation for"
+ " queue %s", argv[1]);
+ continue;
+ }
+ if (rule == 0) {
+ syslog(LOG_WARNING,
+ "queue %s does not exists!", argv[1]);
+ continue;
+ }
+ } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
+ rule = strtonum(argv[1], 1, 65535, &errmsg);
+ else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
+ rule = strtonum(argv[1], 1, 65535, &errmsg);
+ else if (!strncmp(argv[0], "tag", strlen("tag"))) {
+ if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
+ syslog(LOG_WARNING,
+ "could not get tag translation for"
+ " queue %s", argv[1]);
+ continue;
+ }
+ if (rule == 0) {
+ syslog(LOG_WARNING,
+ "tag %s does not exists!", argv[1]);
+ continue;
+ }
+ } else if (!strncmp(argv[0], "action", strlen("action"))) {
+ if (strncmp(argv[1], "block", strlen("block")))
+ rule = PF_DROP;
+ else if (strncmp(argv[1], "allow", strlen("allow")))
+ rule = PF_PASS;
+ else
+ continue;
+ } else {
syslog(LOG_WARNING,
- "invalid rule number for %s protocol: %s",
+ "invalid action specified for %s protocol: %s",
proto->p_name, errmsg);
continue;
}
@@ -953,10 +1156,14 @@
static void
usage(const char *arg0)
{
- printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
basename(arg0));
+ printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
+ "[-p port] [-P dir] [-q length]\n", basename(arg0));
printf("usage: %s -t -P dir\n", basename(arg0));
printf( " -c file : path to configuration file\n"
+ " -e secs : number of seconds before a flow is expired\n"
" -h : this help screen\n"
+ " -n packets: number of packets before the garbage collector"
+ " tries to expire flows\n"
" -P dir : directory containing protocol patterns\n"
" -p port : port number of divert socket\n"
" -q length : max length (in packets) of in/out queues\n"
--
Ermal
More information about the freebsd-net
mailing list