Application layer classifier for ipfw
Daniel Dias Gonçalves
ddg at yan.com.br
Wed Aug 20 02:45:25 UTC 2008
Ermal Luçi escreveu:
> On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm at wubethiopia.com> wrote:
>
>> Mike Makonnen wrote:
>>
>>> Patrick Tracanelli wrote:
>>>
>>>> To let you know of my current (real world) tests:
>>>>
>>>> - Wireless Internet Provider 1:
>>>> - 4Mbit/s of Internet Traffic
>>>> - Classifying default protocols + soulseek + ssh
>>>> - Classifying 100Mbit/s of dump over ssh
>>>>
>>>> Results in:
>>>> No latency added, very low CPU usage, no packets dropping.
>>>>
>>>> - Wireless ISP 2:
>>>> - 21 Mbit/s of Internet Traffic
>>>> - Classifying default protocols + soulseek + ssh
>>>>
>>>> Results in:
>>>> No tcp or udp traffic at all; everything that gets diverted never
>>>> comes out of the divert socket, and ipfw-classifyd logs
>>>>
>>>> Aug 1 12:07:35 ourofino last message repeated 58 times
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
>>>> (rule 50000)
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
>>>> 50000)
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
>>>> 50000)
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
>>>> 1000)
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
>>>> 50000)
>>>> Aug 1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh (rule
>>>> 50000)
>>>> Aug 1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
>>>> socket: Operation not permitted
>>>> Aug 1 12:18:50 ourofino last message repeated 90 times
>>>>
>>> Hmmm... this part means that the call to sendto(2) to write the packet
>>> back into network stack failed. This explains why you are not seein g any
>>> traffic comming back out of the divert socket, but I don't see why it would
>>> suddenly fail with a permission error. Could this be a kernel bug?
>>>
>>>> Aug 1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
>>>> Aug 1 12:19:11 ourofino last message repeated 94 times
>>>>
>>>> Raised queue len a lot (up to 40960), when the application starts it uses
>>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
>>>>
>>> This looks like a deadlock. If it weren't able to process packets fast
>>> enough the cpu usage should be high even as it's spewing "packet dropped"
>>> messages. Can you send me some more information like memory usage and the
>>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
>>> If you reduce the number of protocols you are trying to match against does
>>> the behavior change? Using netstat -w1 -I<interface> can you tell me how
>>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
>>> the timestamps from the log file seem to show that the daemon is running for
>>> approx. 34 sec. before the first "unable to write to write to divert socket"
>>> message. Is it passing traffic during this time? Thanks.
>>>
>>> I've uploaded a newer version. Can you try that also please. It includes:
>>> o SIGHUP forces it to re-read its configuration file
>>> o rc.d script
>>> o minor optimization (calls pthread_cond_signal with the mutex unlocked)
>>> o code cleanup
>>>
>>> Also, for your convenience I have attached a patch against the earlier
>>> version that removes a debugging printf that should remove spammage to your
>>> log files (the current version has it removed already).
>>>
>>>
>> Ooops, a few minutes after I sent this email I found a couple of bugs (one
>> major, and one minor). They were in the original tarball as well as the
>> newer one I uploaded earlier today. I've uploaded a fixed version of the
>> code. Can you try that instead please.
>>
>> Also, to help track down performance issues I've modified the Makefile to
>> build a profiled version of the application so you can use gprof(1) to
>> figure out where any problems lie.
>>
>>
>
> Does this sound about right for implementing the GC and implementing syntax as
> $protocol = dnpipe 20
> $protocol2 = dnqueue 30
> it has some extra goos for pf(4) and altq(4)
> $protocol3 = queue $queue name
> $protocol4 = tag TAGNAME
> $protocol5 = action block
>
> It adds 2 new options -e seconds for seconds before a flow is
> considered expired and -n #packets proccessed before kicking the GC.
>
> --- classifyd_old.c 2008-08-09 00:33:04.000000000 +0000
> +++ classifyd.c 2008-08-09 00:33:34.000000000 +0000
> @@ -28,13 +28,17 @@
>
> #include <sys/types.h>
> #include <sys/socket.h>
> +#include <sys/ioctl.h>
> +#include <sys/time.h>
>
> +#include <net/if.h>
> #include <arpa/inet.h>
> #include <netinet/in.h>
> #include <netinet/in_systm.h>
> #include <netinet/ip.h>
> #include <netinet/tcp.h>
> #include <netinet/udp.h>
> +#include <net/pfvar.h>
>
> #include <assert.h>
> #include <err.h>
> @@ -53,6 +57,7 @@
> #include <unistd.h>
>
> #include "hashtable.h"
> +#include "hashtable_private.h"
> #include "pathnames.h"
> #include "protocols.h"
>
> @@ -94,6 +99,7 @@
> uint32_t if_datalen; /* length in bytes of if_data */
> uint16_t if_pktcount; /* number of packets concatenated */
> uint16_t if_fwrule; /* ipfw(4) rule associated with flow */
> + time_t expire; /* flow expire time */
> };
>
> /*
> @@ -126,7 +132,7 @@
> static struct ic_queue outQ;
>
> /* divert(4) socket */
> -static int dvtS;
> +static int dvtS = 0;
>
> /* config file path */
> static const char *conf = IC_CONFIG_PATH;
> @@ -137,12 +143,25 @@
> /* List of protocols available to the system */
> struct ic_protocols *fp;
>
> +/* Our hashtables */
> +struct hashtable *sh = NULL,
> + *th = NULL,
> + *uh = NULL;
> +
> +/* signaled to kick garbage collector */
> +static pthread_cond_t gq_condvar;
> +
> +/* number of packets before kicking garbage collector */
> +static unsigned int npackets = 250;
> +
> +static time_t time_expire = 40; /* 40 seconds */
> /*
> * Forward function declarations.
> */
> void *classify_pthread(void *);
> void *read_pthread(void *);
> void *write_pthread(void *);
> +void *garbage_pthread(void *);
> static int equalkeys(void *, void *);
> static unsigned int hashfromkey(void *);
> static void test_re(void);
> @@ -155,7 +174,7 @@
> {
> struct sockaddr_in addr;
> struct sigaction sa;
> - pthread_t classifytd, readtd, writetd;
> + pthread_t classifytd, readtd, writetd, garbagectd;
> const char *errstr;
> long long num;
> uint16_t port, qmaxsz;
> @@ -164,13 +183,27 @@
> tflag = 0;
> port = IC_DPORT;
> qmaxsz = IC_QMAXSZ;
> - while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
> + while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
> switch(ch) {
> case 'c':
> conf = strdup(optarg);
> if (conf == NULL)
> err(EX_TEMPFAIL, "config file path");
> break;
> + case 'e':
> + num = strtonum((const char *)optarg, 1, 400, &errstr);
> + if (num == 0 && errstr != NULL) {
> + errx(EX_USAGE, "invalud expire seconds: %s", errstr);
> + }
> + time_expire = (time_t)num;
> + break;
> + case 'n':
> + num = strtonum((const char *)optarg, 1,
> 65535, &errstr);
> + if (num == 0 && errstr != NULL) {
> + errx(EX_USAGE, "invalud expire
> seconds: %s", errstr);
> + }
> + npackets = (unsigned int)num;
> + break;
> case 'P':
> protoDir = strdup(optarg);
> if (protoDir == NULL)
> @@ -230,6 +263,9 @@
> error = pthread_cond_init(&outQ.fq_condvar, NULL);
> if (error != 0)
> err(EX_OSERR, "unable to initialize output queue condvar");
> + error = pthread_cond_init(&gq_condvar, NULL);
> + if (error != 0)
> + err(EX_OSERR, "unable to initialize garbage collector
> condvar");
>
> /*
> * Create and bind the divert(4) socket.
> @@ -276,32 +312,80 @@
> if (error == -1)
> err(EX_OSERR, "unable to set signal handler");
>
> + /*
> + * There are 3 tables: udp, tcp, and tcp syn.
> + * The tcp syn table tracks connections for which a
> + * SYN packet has been sent but no reply has been returned
> + * yet. Once the SYN ACK reply is detected it is moved to
> + * the regular tcp connection tracking table.
> + */
> + sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> + if (sh == NULL) {
> + syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
> + error = EX_SOFTWARE;
> + goto cleanup;
> + }
> + th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> + if (th == NULL) {
> + syslog(LOG_ERR, "unable to create TCP tracking table");
> + error = EX_SOFTWARE;
> + goto cleanup;
> + }
> + uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> + if (uh == NULL) {
> + syslog(LOG_ERR, "unable to create UDP tracking table");
> + error = EX_SOFTWARE;
> + goto cleanup;
> + }
> +
> /*
> * Create the various threads.
> */
> error = pthread_create(&readtd, NULL, read_pthread, NULL);
> - if (error != 0)
> - err(EX_OSERR, "unable to create reader thread");
> + if (error != 0) {
> + syslog(LOG_ERR, "unable to create reader thread");
> + error = EX_OSERR;
> + goto cleanup;
> + }
> error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
> - if (error != 0)
> - err(EX_OSERR, "unable to create classifier thread");
> + if (error != 0) {
> + syslog(LOG_ERR, "unable to create classifier thread");
> + error = EX_OSERR;
> + goto cleanup;
> + }
> error = pthread_create(&writetd, NULL, write_pthread, NULL);
> - if (error != 0)
> - err(EX_OSERR, "unable to create writer thread");
> -
> + if (error != 0) {
> + syslog(LOG_ERR, "unable to create writer thread");
> + error = EX_OSERR;
> + goto cleanup;
> + }
> + error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
> + if (error != 0) {
> + syslog(LOG_ERR, "unable to create garbage collect thread");
> + error = EX_OSERR;
> + goto cleanup;
> + }
> /*
> * Wait for our threads to exit.
> */
> pthread_join(readtd, NULL);
> pthread_join(classifytd, NULL);
> pthread_join(writetd, NULL);
> -
> + pthread_join(garbagectd, NULL);
> /*
> * Cleanup
> */
> - close(dvtS);
> +cleanup:
> + if (dvtS > 0)
> + close(dvtS);
> + if (sh != NULL)
> + hashtable_destroy(sh, 1);
> + if (th != NULL)
> + hashtable_destroy(th, 1);
> + if (uh != NULL)
> + hashtable_destroy(uh, 1);
>
> - return (0);
> + return (error);
> }
>
> void *
> @@ -310,6 +394,7 @@
> struct ic_pkt *pkt;
> struct ip *ipp;
> int len;
> + unsigned int pcktcnt = 0;
>
> while (1) {
> pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
> @@ -353,6 +438,10 @@
> STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
> inQ.fq_size++;
> pthread_mutex_unlock(&inQ.fq_mtx);
> + if (++pcktcnt > npackets) {
> + pcktcnt = 0;
> + pthread_cond_signal(&gq_condvar);
> + }
> pthread_cond_signal(&inQ.fq_condvar);
> }
>
> @@ -420,39 +509,19 @@
> struct tcphdr *tcp;
> struct udphdr *udp;
> struct ic_pkt *pkt;
> - struct hashtable *sh, *th, *uh;
> struct protocol *proto;
> + struct timeval tv;
> regmatch_t pmatch;
> u_char *data, *payload;
> uint16_t trycount;
> int datalen, error;
>
> - /*
> - * There are 3 tables: udp, tcp, and tcp syn.
> - * The tcp syn table tracks connections for which a
> - * SYN packet has been sent but no reply has been returned
> - * yet. Once the SYN ACK reply is detected it is moved to
> - * the regular tcp connection tracking table.
> - */
> - sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> - if (sh == NULL) {
> - syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
> - exit(EX_SOFTWARE);
> - }
> - th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> - if (th == NULL) {
> - syslog(LOG_ERR, "unable to create TCP tracking table");
> - exit(EX_SOFTWARE);
> - }
> - uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> - if (uh == NULL) {
> - syslog(LOG_ERR, "unable to create UDP tracking table");
> - exit(EX_SOFTWARE);
> - }
> -
> flow = NULL;
> key = NULL;
> while(1) {
> + while(gettimeofday(&tv, NULL) != 0)
> + ;
> +
> pthread_mutex_lock(&inQ.fq_mtx);
> pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
> while (pkt == NULL) {
> @@ -528,6 +597,8 @@
> free(pkt);
> continue;
> }
> +
> + flow->expire = tv.tv_sec;
> goto enqueue;
> /*
> * Handle session tear-down.
> @@ -583,8 +654,11 @@
> * collecting IC_PKTMAXMATCH packets, just pass it through.
> */
> } else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
> - flow->if_fwrule == 0)
> + flow->if_fwrule == 0) {
> + flow->expire = tv.tv_sec;
> goto enqueue;
> + }
> + flow->expire = tv.tv_sec;
> goto classify;
> }
>
> @@ -630,6 +704,7 @@
> free(pkt);
> continue;
> }
> + flow->expire = tv.tv_sec;
> goto classify;
> }
>
> @@ -688,6 +763,7 @@
> flow->if_datalen = datalen;
> flow->if_pktcount = 1;
> flow->if_fwrule = 0;
> + flow->expire = tv.tv_sec;
> if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) {
> syslog(LOG_WARNING,
> "packet dropped: unable to insert into table");
> @@ -715,19 +791,26 @@
> flow->if_data = data;
> flow->if_datalen += datalen;
> flow->if_pktcount++;
> + flow->expire = tv.tv_sec;
> /*
> * If we haven't been able to classify this flow after
> * collecting IC_PKTMAXMATCH packets, just pass it through.
> */
> } else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
> - flow->if_fwrule == 0)
> + flow->if_fwrule == 0) {
> + flow->expire = tv.tv_sec;
> goto enqueue;
> + }
> } else
> /* Not an TCP or UDP packet. */
> goto enqueue;
>
> classify:
> - assert(flow != NULL);
> + if (flow == NULL) {
> + syslog(LOG_ERR, "flow is null argghhhhhhh");
> + goto enqueue;
> + }
> + //assert(flow != NULL);
>
> /*
> * Inform divert(4) what rule to send it to by
> @@ -823,6 +906,80 @@
> return (NULL);
> }
>
> +void *
> +garbage_pthread(void *arg __unused)
> +{
> + char errbuf[LINE_MAX];
> + struct entry *e, *f;
> + unsigned int i, flows_expired, error;
> + struct timeval tv;
> +
> + while (1) {
> + flows_expired = 0;
> + while (gettimeofday(&tv, NULL) != 0)
> + ;
> + tv.tv_sec -= time_expire;
> +
> + pthread_mutex_lock(&inQ.fq_mtx);
> + error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
> + if (error != 0) {
> + strerror_r(error, errbuf, sizeof(errbuf));
> + syslog(EX_OSERR, "unable to wait on garbage
> collection: %s",
> + errbuf);
> + exit(EX_OSERR);
> + }
> +
> + for (i = 0; i < sh->tablelength; i++) {
> + e = sh->table[i];
> + while (e != NULL) {
> + f = e; e = e->next;
> + if (((struct ip_flow *)f->v)->expire < tv.tv_sec) {
> + freekey(f->k);
> + sh->entrycount--;
> + if (f->v != NULL)
> + free(f->v);
> + free(f);
> + flows_expired++;
> + }
> + }
> + }
> + for (i = 0; i < th->tablelength; i++) {
> + e = th->table[i];
> + while (e != NULL) {
> + f = e; e = e->next;
> + if (((struct ip_flow *)f->v)->expire
> < tv.tv_sec) {
> + freekey(f->k);
> + th->entrycount--;
> + if (f->v != NULL)
> + free(f->v);
> + free(f);
> + flows_expired++;
> + }
> + }
> + }
> + for (i = 0; i < uh->tablelength; i++) {
> + e = uh->table[i];
> + while (e != NULL) {
> + f = e; e = e->next;
> + if (((struct ip_flow *)f->v)->expire
> < tv.tv_sec) {
> + freekey(f->k);
> + uh->entrycount--;
> + if (f->v != NULL)
> + free(f->v);
> + free(f);
> + flows_expired++;
> + }
> + }
> + }
> +
> + pthread_mutex_unlock(&inQ.fq_mtx);
> +
> + syslog(LOG_WARNING, "expired %u flows", flows_expired);
> + }
> +
> + return (NULL);
> +}
> +
> /*
> * NOTE: The protocol list (plist) passed as an argument is a global
> * variable. It is accessed from 3 functions: classify_pthread,
> @@ -840,12 +997,20 @@
> static int
> read_config(const char *file, struct ic_protocols *plist)
> {
> + enum { bufsize = 2048 };
> struct protocol *proto;
> properties props;
> - const char *errmsg, *name, *value;
> - int fd;
> + const char *errmsg, *name;
> + char *value;
> + int fd, fdpf;
> uint16_t rule;
> + char **ap, *argv[bufsize];
>
> + fdpf = open("/dev/pf", O_RDONLY);
> + if (fdpf == -1) {
> + syslog(LOG_ERR, "unable to open /dev/pf");
> + return (EX_OSERR);
> + }
> fd = open(file, O_RDONLY);
> if (fd == -1) {
> syslog(LOG_ERR, "unable to open configuration file");
> @@ -863,10 +1028,48 @@
> /* Do not match traffic against this pattern */
> if (value == NULL)
> continue;
> - rule = strtonum(value, 1, 65535, &errmsg);
> - if (rule == 0) {
> + for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
> + if (**ap != '\0')
> + if (++ap >= &argv[bufsize])
> + break;
> + if (!strncmp(argv[0], "queue", strlen("queue"))) {
> + if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
> + syslog(LOG_WARNING,
> + "could not get ALTQ translation for"
> + " queue %s", argv[1]);
> + continue;
> + }
> + if (rule == 0) {
> + syslog(LOG_WARNING,
> + "queue %s does not exists!", argv[1]);
> + continue;
> + }
> + } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
> + rule = strtonum(argv[1], 1, 65535, &errmsg);
> + else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
> + rule = strtonum(argv[1], 1, 65535, &errmsg);
> + else if (!strncmp(argv[0], "tag", strlen("tag"))) {
> + if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
> + syslog(LOG_WARNING,
> + "could not get tag translation for"
> + " queue %s", argv[1]);
> + continue;
> + }
> + if (rule == 0) {
> + syslog(LOG_WARNING,
> + "tag %s does not exists!", argv[1]);
> + continue;
> + }
> + } else if (!strncmp(argv[0], "action", strlen("action"))) {
> + if (strncmp(argv[1], "block", strlen("block")))
> + rule = PF_DROP;
> + else if (strncmp(argv[1], "allow", strlen("allow")))
> + rule = PF_PASS;
> + else
> + continue;
> + } else {
> syslog(LOG_WARNING,
> - "invalid rule number for %s protocol: %s",
> + "invalid action specified for %s protocol: %s",
> proto->p_name, errmsg);
> continue;
> }
> @@ -953,10 +1156,14 @@
> static void
> usage(const char *arg0)
> {
> - printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
> basename(arg0));
> + printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
> + "[-p port] [-P dir] [-q length]\n", basename(arg0));
> printf("usage: %s -t -P dir\n", basename(arg0));
> printf( " -c file : path to configuration file\n"
> + " -e secs : number of seconds before a flow is expired\n"
> " -h : this help screen\n"
> + " -n packets: number of packets before the garbage collector"
> + " tries to expire flows\n"
> " -P dir : directory containing protocol patterns\n"
> " -p port : port number of divert socket\n"
> " -q length : max length (in packets) of in/out queues\n"
>
>
Some progress in solution of presented problems ?
--
Daniel
More information about the freebsd-net
mailing list