socsvn commit: r256164 - in soc2013/ambarisha/head/usr.bin: dmget dms

ambarisha at FreeBSD.org ambarisha at FreeBSD.org
Mon Aug 19 20:39:15 UTC 2013


Author: ambarisha
Date: Mon Aug 19 20:39:15 2013
New Revision: 256164
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256164

Log:
  Duplicate request handing.
  
  When it receives a request, DMS downloads the requested file into
  a temporary file. It then uses this file to serve both the original
  request that started this download and any duplicate requests received
  in the mean time.
  

Modified:
  soc2013/ambarisha/head/usr.bin/dmget/utils.c
  soc2013/ambarisha/head/usr.bin/dms/Makefile
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/dms.h
  soc2013/ambarisha/head/usr.bin/dms/utils.c
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/utils.c	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dmget/utils.c	Mon Aug 19 20:39:15 2013	(r256164)
@@ -185,18 +185,27 @@
 {
 	int bufsize = 0;
 	int err;
-	struct dmmsg *msg;
+	struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg));
 	err = Read(sock, &bufsize, sizeof(bufsize));
 	if (err == 0) {
 		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		free(msg);
 		return (NULL);
 	}
 
+
 	bufsize -= sizeof(bufsize);
 
 	err = Read(sock, &(msg->op), sizeof(msg->op));
 	if (err == 0) {
 		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		free(msg);
 		return (NULL);
 	}
 	bufsize -= sizeof(msg->op);
@@ -209,6 +218,10 @@
 		free(msg->buf);
 		msg->len = 0;
 		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		free(msg);
 		return (NULL);
 	}
 
@@ -218,6 +231,7 @@
 void
 free_msg(struct dmmsg **msg)
 {
+
 	free((*msg)->buf);
 	free(*msg);
 	*msg = NULL;

Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 20:39:15 2013	(r256164)
@@ -11,5 +11,6 @@
 DPADD=		${LIBFETCH}
 LDADD=		-lfetch
 .endif
+CFLAGS+=	-g
 
 .include <bsd.prog.mk>

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:39:15 2013	(r256164)
@@ -200,6 +200,7 @@
 		dmjob = mk_dmjob(dmreq, csock);
 		jobs = add_job(jobs, dmjob);
 		pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
+		pthread_detach(dmjob->worker);
 		break;
 	default:
 		goto error;
@@ -211,7 +212,7 @@
 error:
 	ret = -1;	
 done:
-	free_msg(msg);
+	free_msg(&msg);
 	return ret;
 }
 
@@ -222,16 +223,15 @@
 	exit(1); // Temporary
 }
 
-static int
+static state_t
 service_job(struct dmjob *job, fd_set *fdset)
 {
 	int ret = 0;
-	if (FD_ISSET(job->client, fdset))
-		pthread_kill(job->worker, SIGUSR1);
-
-	if (job->state == DONE)
-		ret = 1;
-	return (ret);
+	if (FD_ISSET(job->client, fdset)) {
+		/* TODO: Worker can't handle this signal yet */
+		//pthread_kill(job->worker, SIGUSR1);
+	}
+	return (job->state);
 }
 
 static void
@@ -248,6 +248,7 @@
 
 		/* Prepare fdset and make select call */
 		FD_ZERO(&fdset);
+		maxfd = socket;
 		FD_SET(socket, &fdset);
 
 		cur = jobs;
@@ -257,37 +258,37 @@
 				maxfd = cur->client;
 			cur = cur->next;
 		}
-		
+
 		Select(maxfd + 1, &fdset, NULL, NULL, NULL);
 
-		if (FD_ISSET(socket, &fdset)) {
-			struct sockaddr_un cliaddr;
-			size_t cliaddrlen = sizeof(cliaddr);
-			int csock = Accept(socket, (struct sockaddr *) &cliaddr,
-					&cliaddrlen);
-			handle_request(csock);
-		}
-		
 		cur = jobs;
 		while (cur != NULL) {
 			ret = service_job(cur, &fdset);
-			if (ret == 1) {
+			if (ret == DONE) {
 				close(cur->client);
-				pthread_join(cur->worker, &retptr);
 				jobs = rm_job(jobs, cur);
 			}
 			cur = cur->next;
 		}
-			
+
+		if (FD_ISSET(socket, &fdset)) {
+			struct sockaddr_un cliaddr;
+			size_t cliaddrlen = sizeof(cliaddr);
+			int csock = Accept(socket, (struct sockaddr *) &cliaddr,
+					&cliaddrlen);
+			handle_request(csock);
+		}
 	}
 
+	/* Notify all running workers that we've to wrap up */
 	cur = jobs;
-	while (cur != NULL) {	
-		close(cur->client);
-		ret = service_job(cur, &fdset);
-		/* TODO: Force the worker to quit as well */
+	while (cur != NULL) {
+		if (cur->state == RUNNING)
+			pthread_kill(cur->worker, SIGINT);
+
+		rm_dmreq(&(cur->request));	
 		jobs = rm_job(jobs, cur);
-		cur = jobs;
+		cur = cur->next;
 	}
 }
 

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 20:39:15 2013	(r256164)
@@ -3,7 +3,7 @@
 
 #include <sys/types.h>
 
-typedef enum {RUNNING=0, DONE=1} state_t;
+typedef enum {RUNNING=0, DONE, DUPLICATE} state_t;
 
 struct dmjob {
 	int		 ofd;
@@ -13,9 +13,10 @@
 	int	 	 sigalrm;
 	int	 	 siginfo;
 	int	 	 siginfo_en;
+	unsigned	 timeout;
+	pthread_t	 worker;
 	struct dmreq 	*request;
 	struct url	*url;
-	pthread_t	*worker;
 
 	struct dmjob 	*next;
 	struct dmjob	*prev;

Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.c	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.c	Mon Aug 19 20:39:15 2013	(r256164)
@@ -183,30 +183,29 @@
 struct dmmsg *
 recv_msg(int sock)
 {
-	printf("in recv_msg\n");
 	int bufsize = 0;
 	int err;
-	struct dmmsg *msg;
+	struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg));
 	err = Read(sock, &bufsize, sizeof(bufsize));
 	if (err == 0) {
 		/* set dms_error */
 #if DEBUG
 		fprintf(stderr, "recv_msg: remote end closed connection\n");
 #endif
+		free(msg);
 		return (NULL);
 	}
 
-	printf("bufsize = %d\n", bufsize);
 
 	bufsize -= sizeof(bufsize);
 
-	printf("sock = %d\n", sock);
 	err = Read(sock, &(msg->op), sizeof(msg->op));
 	if (err == 0) {
 		/* set dms_error */
 #if DEBUG
 		fprintf(stderr, "recv_msg: remote end closed connection\n");
 #endif
+		free(msg);
 		return (NULL);
 	}
 	bufsize -= sizeof(msg->op);
@@ -222,6 +221,7 @@
 #if DEBUG
 		fprintf(stderr, "recv_msg: remote end closed connection\n");
 #endif
+		free(msg);
 		return (NULL);
 	}
 
@@ -231,6 +231,7 @@
 void
 free_msg(struct dmmsg **msg)
 {
+
 	free((*msg)->buf);
 	free(*msg);
 	*msg = NULL;

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:31:57 2013	(r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:39:15 2013	(r256164)
@@ -6,12 +6,17 @@
 #include <string.h>
 #include <errno.h>
 #include <signal.h>
+#include <fcntl.h>
+#include <unistd.h>
 
 #include "dms.h"
 #include "dm.h"
 
+
 extern struct dmjob 	*jobs;
 
+#define	TMP_EXT		".tmp"
+
 static int
 authenticate(struct url *url)
 {
@@ -53,6 +58,12 @@
 	free_msg(&rcvmsg);
 }
 
+static int
+compare_jobs(struct dmjob *j1, struct dmjob *j2)
+{
+	return strcmp(j1->request->URL, j2->request->URL);
+}
+
 static void
 stat_send(int csock, struct xferstat *xs, int force)
 {
@@ -143,29 +154,14 @@
 }
 
 static int
-fetch(struct dmjob *dmjob)
+mk_url(struct dmjob *dmjob, char *flags)
 {
-	struct url_stat us;
-	struct stat sb, nsb;
-	struct xferstat xs;
-	FILE *f, *of;
-	size_t size, readcnt, wr;
-	off_t count;
-	char flags[8];
-	const char *slash;
-	char *tmppath;
-	int r;
-	unsigned timeout;
-	char *ptr;
-	char *buf;
 	struct dmreq *dmreq = dmjob->request;
+	struct stat sb;
+	int r;
 
-	f = of = NULL;
-	tmppath = NULL;
-
-	timeout = 0;
-	*flags = 0;
-	count = 0;
+	if (dmjob->url != NULL)
+		return 0;
 
 	/* set verbosity level */
 	if (dmreq->v_level > 1)
@@ -174,7 +170,6 @@
 		fetchDebug = 1;
 
 	/* parse URL */
-	dmjob->url = NULL;
 	if (*dmreq->URL == '\0') {
 		warnx("empty URL");
 		goto failure;
@@ -212,7 +207,7 @@
 			strcat(flags, "d");
 		if (dmreq->flags & U_FLAG)
 			strcat(flags, "l");
-		timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
+		dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
 	}
 
 	/* HTTP specific flags */
@@ -222,7 +217,7 @@
 			strcat(flags, "d");
 		if ((dmreq->flags & A_FLAG))
 			strcat(flags, "A");
-		timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
+		dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
 		if (dmreq->flags & i_FLAG) {
 			if (stat(dmreq->i_filename, &sb)) {
 				warn("%s: stat()", dmreq->i_filename);
@@ -234,27 +229,58 @@
 	}
 
 	/* set the protocol timeout. */
-	fetchTimeout = timeout;
+	fetchTimeout = dmjob->timeout;
+	goto success;
 
-	/* just print size */
-	if (dmreq->flags & s_FLAG) {
-	//	if (timeout)
-	//			alarm(timeout);
-		r = fetchStat(dmjob->url, &us, flags);
-		if (timeout)
-			alarm(0);
-		if (dmjob->sigalrm || dmjob->sigint)
-			goto signal;
-		if (r == -1) {
-			warnx("%s", fetchLastErrString);
-			goto failure;
-		}
-		if (us.size == -1)
-			printf("Unknown\n");
-		else
-			printf("%jd\n", (intmax_t)us.size);
-		goto success;
-	}
+signal:
+	/* report that we were timedout/interrupted */
+failure:
+	free(dmjob->url->doc);
+	free(dmjob->url);
+	dmjob->url = NULL;
+success:
+	return (r);
+}
+
+static int
+fetch(struct dmjob *dmjob, FILE *f, struct url_stat us)
+{
+	struct stat sb, nsb;
+	struct xferstat xs;
+	FILE *of;
+	size_t size, readcnt, wr;
+	off_t count;
+	char flags[8];
+	const char *slash;
+	char *tmppath;
+	int r;
+	char *ptr;
+	char *buf;
+	struct dmreq *dmreq = dmjob->request;
+
+	of = NULL;
+	tmppath = NULL;
+
+	dmjob->timeout = 0;
+	*flags = 0;
+	count = 0;
+	
+	r = mk_url(dmjob, flags);
+
+	/* Initialize signal flags */
+	dmjob->sigint = 0;
+	dmjob->sigalrm = 0;
+	dmjob->siginfo = 0;
+
+	/* Set timeout */
+	if (strcmp(dmjob->url->scheme, SCHEME_FTP) == 0)
+		dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
+	else if (strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0 ||
+	    strcmp(dmjob->url->scheme, SCHEME_HTTPS) == 0) 
+		dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
+
+	/* set the protocol timeout. */
+	fetchTimeout = dmjob->timeout;
 
 	/*
 	 * If the -r flag was specified, we have to compare the local
@@ -289,24 +315,12 @@
 	}
 
 	/* start the transfer */
-	if (timeout)
-		alarm(timeout);
-	f = fetchXGet(dmjob->url, &us, flags);
-	if (timeout)
-		alarm(0);
+	if (dmjob->timeout)
+		alarm(dmjob->timeout);
+
 	if (dmjob->sigalrm || dmjob->sigint)
 		goto signal;
-	if (f == NULL) {
-		warnx("%s: %s", dmreq->URL, fetchLastErrString);
-		if ((dmreq->flags & i_FLAG) && strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0
-		    && fetchLastErrCode == FETCH_OK
-		    && strcmp(fetchLastErrString, "Not Modified") == 0) {
-			/* HTTP Not Modified Response, return OK. */
-			r = 0;
-			goto done;
-		} else
-			goto failure;
-	}
+	
 	if (dmjob->sigint)
 		goto signal;
 
@@ -422,14 +436,11 @@
 			 * from scratch if we want the whole file
 			 */
 			dmjob->url->offset = 0;
-			if ((f = fetchXGet(dmjob->url, &us, flags)) == NULL) {
-				warnx("%s: %s", dmreq->URL, fetchLastErrString);
-				goto failure;
-			}
 			if (dmjob->sigint)
 				goto signal;
 		}
 
+
 		/* construct a temp file name */
 		if (sb.st_size != -1 && S_ISREG(sb.st_mode)) {
 			if ((slash = strrchr(dmreq->path, '/')) == NULL)
@@ -448,6 +459,7 @@
 				chmod(tmppath, sb.st_mode & ALLPERMS);
 			}
 		}
+
 		if (of == NULL)
 			of = fdopen(dmjob->ofd, "w");
 		if (of == NULL) {
@@ -485,10 +497,12 @@
 		if ((readcnt = fread(buf, 1, size, f)) < size) {
 			if (ferror(f) && errno == EINTR && !dmjob->sigint)
 				clearerr(f);
-			else if (readcnt == 0)
+			else if (readcnt == 0) {
 				break;
+			}	
 		}
 
+
 		stat_update(&xs, count += readcnt, dmjob);
 		for (ptr = buf; readcnt > 0; ptr += wr, readcnt -= wr)
 			if ((wr = fwrite(ptr, 1, readcnt, of)) < readcnt) {
@@ -500,6 +514,7 @@
 		if (readcnt != 0)
 			break;
 	}
+
 	if (!dmjob->sigalrm)
 		dmjob->sigalrm = ferror(f) && errno == ETIMEDOUT;
 	dmjob->siginfo_en = 0;
@@ -589,9 +604,89 @@
 		fetchFreeURL(dmjob->url);
 	if (tmppath != NULL)
 		free(tmppath);
+
 	return (r);
 }
 
+FILE *
+dmXGet(struct dmjob *dmjob, struct url_stat *us) 
+{
+	char flags[8];
+	int ret;
+	struct dmjob tmpjob;
+	struct dmreq tmpreq;
+	struct dmreq *dmreq = dmjob->request;
+
+	/* populate tmpjob */
+
+	/* TODO : Modify stat_* to udpate jobs of progress,
+	 * right now we just put the msgs on stderr
+	 * */
+	tmpjob.client = STDERR_FILENO;
+
+	tmpjob.request = &tmpreq;
+	tmpreq.v_level = dmreq->v_level;
+ 	tmpreq.ftp_timeout = dmjob->request->ftp_timeout;
+	tmpreq.http_timeout = dmjob->request->http_timeout;
+	tmpreq.B_size = dmjob->request->B_size;
+	tmpreq.S_size = dmjob->request->S_size;
+	tmpreq.T_secs = dmjob->request->T_secs;
+	tmpreq.flags = dmjob->request->flags;
+	tmpreq.family = dmjob->request->family;
+
+	tmpreq.i_filename = (char *) Malloc(strlen(dmreq->i_filename));
+	strcpy(tmpreq.i_filename, dmreq->i_filename);
+
+	tmpreq.URL = (char *) Malloc(strlen(dmreq->URL));
+	strcpy(tmpreq.URL, dmreq->URL);
+	
+	tmpjob.url = NULL;
+	ret = mk_url(&tmpjob, flags);
+	if (ret <= 0) {
+
+	}
+
+	/* special case : -s flag
+	if (tmpreq.flags & s_FLAG) {
+		if (dmjob->timeout)
+			alarm(dmjob->timeout); 
+		r = fetchStat(dmjob->url, us, flags);
+		if (dmjob->timeout)
+			alarm(0);
+		if (dmjob->sigalrm || dmjob->sigint)
+			goto signal;
+		if (r == -1) {
+			warnx("%s", fetchLastErrString);
+			goto failure;
+		}
+		if (us->size == -1)
+			printf("Unknown\n");
+		else
+			printf("%jd\n", (intmax_t)us->size);
+		goto success;
+	}
+	*/
+	tmpreq.path = (char *) Malloc(strlen(dmreq->path) + strlen(TMP_EXT));
+	strcpy(tmpreq.path, dmreq->path);
+	strcat(tmpreq.path, TMP_EXT);
+
+	tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
+
+	FILE *f = fetchXGet(tmpjob.url, us, flags);
+	fetch(&tmpjob, f, *us);
+	fclose(f);
+
+	f = fopen(tmpreq.path, "r");
+
+	free(tmpjob.url->doc);
+	free(tmpjob.url);
+	free(tmpreq.i_filename);
+	free(tmpreq.URL);
+	free(tmpreq.path);
+	
+	return f;
+}
+
 static void
 send_report(int sock, struct dmrep report, char op)
 {
@@ -621,6 +716,9 @@
 	free(buf);
 }
 
+/* TODO: This handler isn't registered as SIGUSR1 interrupts the download
+ * 	 Figure out a proper way to handle this
+ * */
 void
 sig_handler(int sig)
 {
@@ -630,7 +728,7 @@
 	pthread_t tid = pthread_self();
 	if (sig == SIGUSR1) {
 		while (tmp != NULL) {
-			if (pthread_equal(tid, *(tmp->worker)) != 0)
+			if (pthread_equal(tid, tmp->worker) != 0)
 				break;
 			tmp = tmp->next;
 		}
@@ -641,8 +739,9 @@
 			tmp->sigint = 1;
 		else if (*clisig == SIGINFO)
 			tmp->siginfo = 1;
-		else if (*clisig == SIGALRM)
-			tmp->siginfo = 1;
+		else if (*clisig == SIGALRM) {
+			tmp->sigalrm = 1;
+		}
 	}
 }
 
@@ -650,13 +749,61 @@
 run_worker(struct dmjob *dmjob)
 {
 	struct dmrep report;
+	struct dmjob *tmp;
+	struct url_stat us;
+	int err;
+	FILE *f;
+	char *tmppath;
+	char flags[8];
+
+	/* check if this is a duplicate */
+	mk_url(dmjob, flags);	
+	tmp = jobs;
+	while (tmp != NULL) {
+		if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
+			dmjob->state = DUPLICATE;
+			dmjob->worker = tmp->worker;
+			return NULL;
+		}
+		tmp = tmp->next;
+	}
+
+	/* fetch the remote file into a local tmp file */
+	f = dmXGet(dmjob, &us);
+
+	/* Serve any outstanding requests from the local tmp file */
+	tmp = jobs;
+	while (tmp != NULL) {
+		if (compare_jobs(tmp, dmjob) != 0) {
+			tmp = tmp->next;
+			continue;
+		}
 
-	int err = fetch(dmjob);
-	report.status = err;
-	report.errcode = fetchLastErrCode;
-	report.errstr = fetchLastErrString;	
-	send_report(dmjob->client, report, DMRESP);
-	dmjob->state = RUNNING;
+		if (f == NULL) {
+			err = -1;
+		} else {
+			fseek(f, 0, SEEK_SET);
+			err = fetch(dmjob, f, us);
+		}
+
+		report.status = err;
+		report.errcode = fetchLastErrCode;
+		report.errstr = fetchLastErrString;	
+		send_report(dmjob->client, report, DMRESP);
+
+		tmp->state = DONE;
+		tmp = tmp->next;
+	}
+	/* remove the local tmp file */
+	if (f != NULL) {
+		tmppath = (char *) Malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
+		strcpy(tmppath, dmjob->request->path);
+		strcat(tmppath, TMP_EXT);
 
-	return NULL;
+		remove(tmppath);
+		free(tmppath);
+	}
+	
+	/* TODO : What is this? Yew!! */
+	sleep(10);
 }


More information about the svn-soc-all mailing list