socsvn commit: r256164 - in soc2013/ambarisha/head/usr.bin: dmget dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Mon Aug 19 20:39:15 UTC 2013
Author: ambarisha
Date: Mon Aug 19 20:39:15 2013
New Revision: 256164
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256164
Log:
Duplicate request handing.
When it receives a request, DMS downloads the requested file into
a temporary file. It then uses this file to serve both the original
request that started this download and any duplicate requests received
in the mean time.
Modified:
soc2013/ambarisha/head/usr.bin/dmget/utils.c
soc2013/ambarisha/head/usr.bin/dms/Makefile
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/dms.h
soc2013/ambarisha/head/usr.bin/dms/utils.c
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/utils.c Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dmget/utils.c Mon Aug 19 20:39:15 2013 (r256164)
@@ -185,18 +185,27 @@
{
int bufsize = 0;
int err;
- struct dmmsg *msg;
+ struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg));
err = Read(sock, &bufsize, sizeof(bufsize));
if (err == 0) {
/* set dms_error */
+#if DEBUG
+ fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+ free(msg);
return (NULL);
}
+
bufsize -= sizeof(bufsize);
err = Read(sock, &(msg->op), sizeof(msg->op));
if (err == 0) {
/* set dms_error */
+#if DEBUG
+ fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+ free(msg);
return (NULL);
}
bufsize -= sizeof(msg->op);
@@ -209,6 +218,10 @@
free(msg->buf);
msg->len = 0;
/* set dms_error */
+#if DEBUG
+ fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+ free(msg);
return (NULL);
}
@@ -218,6 +231,7 @@
void
free_msg(struct dmmsg **msg)
{
+
free((*msg)->buf);
free(*msg);
*msg = NULL;
Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:39:15 2013 (r256164)
@@ -11,5 +11,6 @@
DPADD= ${LIBFETCH}
LDADD= -lfetch
.endif
+CFLAGS+= -g
.include <bsd.prog.mk>
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:39:15 2013 (r256164)
@@ -200,6 +200,7 @@
dmjob = mk_dmjob(dmreq, csock);
jobs = add_job(jobs, dmjob);
pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
+ pthread_detach(dmjob->worker);
break;
default:
goto error;
@@ -211,7 +212,7 @@
error:
ret = -1;
done:
- free_msg(msg);
+ free_msg(&msg);
return ret;
}
@@ -222,16 +223,15 @@
exit(1); // Temporary
}
-static int
+static state_t
service_job(struct dmjob *job, fd_set *fdset)
{
int ret = 0;
- if (FD_ISSET(job->client, fdset))
- pthread_kill(job->worker, SIGUSR1);
-
- if (job->state == DONE)
- ret = 1;
- return (ret);
+ if (FD_ISSET(job->client, fdset)) {
+ /* TODO: Worker can't handle this signal yet */
+ //pthread_kill(job->worker, SIGUSR1);
+ }
+ return (job->state);
}
static void
@@ -248,6 +248,7 @@
/* Prepare fdset and make select call */
FD_ZERO(&fdset);
+ maxfd = socket;
FD_SET(socket, &fdset);
cur = jobs;
@@ -257,37 +258,37 @@
maxfd = cur->client;
cur = cur->next;
}
-
+
Select(maxfd + 1, &fdset, NULL, NULL, NULL);
- if (FD_ISSET(socket, &fdset)) {
- struct sockaddr_un cliaddr;
- size_t cliaddrlen = sizeof(cliaddr);
- int csock = Accept(socket, (struct sockaddr *) &cliaddr,
- &cliaddrlen);
- handle_request(csock);
- }
-
cur = jobs;
while (cur != NULL) {
ret = service_job(cur, &fdset);
- if (ret == 1) {
+ if (ret == DONE) {
close(cur->client);
- pthread_join(cur->worker, &retptr);
jobs = rm_job(jobs, cur);
}
cur = cur->next;
}
-
+
+ if (FD_ISSET(socket, &fdset)) {
+ struct sockaddr_un cliaddr;
+ size_t cliaddrlen = sizeof(cliaddr);
+ int csock = Accept(socket, (struct sockaddr *) &cliaddr,
+ &cliaddrlen);
+ handle_request(csock);
+ }
}
+ /* Notify all running workers that we've to wrap up */
cur = jobs;
- while (cur != NULL) {
- close(cur->client);
- ret = service_job(cur, &fdset);
- /* TODO: Force the worker to quit as well */
+ while (cur != NULL) {
+ if (cur->state == RUNNING)
+ pthread_kill(cur->worker, SIGINT);
+
+ rm_dmreq(&(cur->request));
jobs = rm_job(jobs, cur);
- cur = jobs;
+ cur = cur->next;
}
}
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:39:15 2013 (r256164)
@@ -3,7 +3,7 @@
#include <sys/types.h>
-typedef enum {RUNNING=0, DONE=1} state_t;
+typedef enum {RUNNING=0, DONE, DUPLICATE} state_t;
struct dmjob {
int ofd;
@@ -13,9 +13,10 @@
int sigalrm;
int siginfo;
int siginfo_en;
+ unsigned timeout;
+ pthread_t worker;
struct dmreq *request;
struct url *url;
- pthread_t *worker;
struct dmjob *next;
struct dmjob *prev;
Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.c Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.c Mon Aug 19 20:39:15 2013 (r256164)
@@ -183,30 +183,29 @@
struct dmmsg *
recv_msg(int sock)
{
- printf("in recv_msg\n");
int bufsize = 0;
int err;
- struct dmmsg *msg;
+ struct dmmsg *msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg));
err = Read(sock, &bufsize, sizeof(bufsize));
if (err == 0) {
/* set dms_error */
#if DEBUG
fprintf(stderr, "recv_msg: remote end closed connection\n");
#endif
+ free(msg);
return (NULL);
}
- printf("bufsize = %d\n", bufsize);
bufsize -= sizeof(bufsize);
- printf("sock = %d\n", sock);
err = Read(sock, &(msg->op), sizeof(msg->op));
if (err == 0) {
/* set dms_error */
#if DEBUG
fprintf(stderr, "recv_msg: remote end closed connection\n");
#endif
+ free(msg);
return (NULL);
}
bufsize -= sizeof(msg->op);
@@ -222,6 +221,7 @@
#if DEBUG
fprintf(stderr, "recv_msg: remote end closed connection\n");
#endif
+ free(msg);
return (NULL);
}
@@ -231,6 +231,7 @@
void
free_msg(struct dmmsg **msg)
{
+
free((*msg)->buf);
free(*msg);
*msg = NULL;
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:31:57 2013 (r256163)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:39:15 2013 (r256164)
@@ -6,12 +6,17 @@
#include <string.h>
#include <errno.h>
#include <signal.h>
+#include <fcntl.h>
+#include <unistd.h>
#include "dms.h"
#include "dm.h"
+
extern struct dmjob *jobs;
+#define TMP_EXT ".tmp"
+
static int
authenticate(struct url *url)
{
@@ -53,6 +58,12 @@
free_msg(&rcvmsg);
}
+static int
+compare_jobs(struct dmjob *j1, struct dmjob *j2)
+{
+ return strcmp(j1->request->URL, j2->request->URL);
+}
+
static void
stat_send(int csock, struct xferstat *xs, int force)
{
@@ -143,29 +154,14 @@
}
static int
-fetch(struct dmjob *dmjob)
+mk_url(struct dmjob *dmjob, char *flags)
{
- struct url_stat us;
- struct stat sb, nsb;
- struct xferstat xs;
- FILE *f, *of;
- size_t size, readcnt, wr;
- off_t count;
- char flags[8];
- const char *slash;
- char *tmppath;
- int r;
- unsigned timeout;
- char *ptr;
- char *buf;
struct dmreq *dmreq = dmjob->request;
+ struct stat sb;
+ int r;
- f = of = NULL;
- tmppath = NULL;
-
- timeout = 0;
- *flags = 0;
- count = 0;
+ if (dmjob->url != NULL)
+ return 0;
/* set verbosity level */
if (dmreq->v_level > 1)
@@ -174,7 +170,6 @@
fetchDebug = 1;
/* parse URL */
- dmjob->url = NULL;
if (*dmreq->URL == '\0') {
warnx("empty URL");
goto failure;
@@ -212,7 +207,7 @@
strcat(flags, "d");
if (dmreq->flags & U_FLAG)
strcat(flags, "l");
- timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
+ dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
}
/* HTTP specific flags */
@@ -222,7 +217,7 @@
strcat(flags, "d");
if ((dmreq->flags & A_FLAG))
strcat(flags, "A");
- timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
+ dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
if (dmreq->flags & i_FLAG) {
if (stat(dmreq->i_filename, &sb)) {
warn("%s: stat()", dmreq->i_filename);
@@ -234,27 +229,58 @@
}
/* set the protocol timeout. */
- fetchTimeout = timeout;
+ fetchTimeout = dmjob->timeout;
+ goto success;
- /* just print size */
- if (dmreq->flags & s_FLAG) {
- // if (timeout)
- // alarm(timeout);
- r = fetchStat(dmjob->url, &us, flags);
- if (timeout)
- alarm(0);
- if (dmjob->sigalrm || dmjob->sigint)
- goto signal;
- if (r == -1) {
- warnx("%s", fetchLastErrString);
- goto failure;
- }
- if (us.size == -1)
- printf("Unknown\n");
- else
- printf("%jd\n", (intmax_t)us.size);
- goto success;
- }
+signal:
+ /* report that we were timedout/interrupted */
+failure:
+ free(dmjob->url->doc);
+ free(dmjob->url);
+ dmjob->url = NULL;
+success:
+ return (r);
+}
+
+static int
+fetch(struct dmjob *dmjob, FILE *f, struct url_stat us)
+{
+ struct stat sb, nsb;
+ struct xferstat xs;
+ FILE *of;
+ size_t size, readcnt, wr;
+ off_t count;
+ char flags[8];
+ const char *slash;
+ char *tmppath;
+ int r;
+ char *ptr;
+ char *buf;
+ struct dmreq *dmreq = dmjob->request;
+
+ of = NULL;
+ tmppath = NULL;
+
+ dmjob->timeout = 0;
+ *flags = 0;
+ count = 0;
+
+ r = mk_url(dmjob, flags);
+
+ /* Initialize signal flags */
+ dmjob->sigint = 0;
+ dmjob->sigalrm = 0;
+ dmjob->siginfo = 0;
+
+ /* Set timeout */
+ if (strcmp(dmjob->url->scheme, SCHEME_FTP) == 0)
+ dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->ftp_timeout;
+ else if (strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0 ||
+ strcmp(dmjob->url->scheme, SCHEME_HTTPS) == 0)
+ dmjob->timeout = dmreq->T_secs ? dmreq->T_secs : dmreq->http_timeout;
+
+ /* set the protocol timeout. */
+ fetchTimeout = dmjob->timeout;
/*
* If the -r flag was specified, we have to compare the local
@@ -289,24 +315,12 @@
}
/* start the transfer */
- if (timeout)
- alarm(timeout);
- f = fetchXGet(dmjob->url, &us, flags);
- if (timeout)
- alarm(0);
+ if (dmjob->timeout)
+ alarm(dmjob->timeout);
+
if (dmjob->sigalrm || dmjob->sigint)
goto signal;
- if (f == NULL) {
- warnx("%s: %s", dmreq->URL, fetchLastErrString);
- if ((dmreq->flags & i_FLAG) && strcmp(dmjob->url->scheme, SCHEME_HTTP) == 0
- && fetchLastErrCode == FETCH_OK
- && strcmp(fetchLastErrString, "Not Modified") == 0) {
- /* HTTP Not Modified Response, return OK. */
- r = 0;
- goto done;
- } else
- goto failure;
- }
+
if (dmjob->sigint)
goto signal;
@@ -422,14 +436,11 @@
* from scratch if we want the whole file
*/
dmjob->url->offset = 0;
- if ((f = fetchXGet(dmjob->url, &us, flags)) == NULL) {
- warnx("%s: %s", dmreq->URL, fetchLastErrString);
- goto failure;
- }
if (dmjob->sigint)
goto signal;
}
+
/* construct a temp file name */
if (sb.st_size != -1 && S_ISREG(sb.st_mode)) {
if ((slash = strrchr(dmreq->path, '/')) == NULL)
@@ -448,6 +459,7 @@
chmod(tmppath, sb.st_mode & ALLPERMS);
}
}
+
if (of == NULL)
of = fdopen(dmjob->ofd, "w");
if (of == NULL) {
@@ -485,10 +497,12 @@
if ((readcnt = fread(buf, 1, size, f)) < size) {
if (ferror(f) && errno == EINTR && !dmjob->sigint)
clearerr(f);
- else if (readcnt == 0)
+ else if (readcnt == 0) {
break;
+ }
}
+
stat_update(&xs, count += readcnt, dmjob);
for (ptr = buf; readcnt > 0; ptr += wr, readcnt -= wr)
if ((wr = fwrite(ptr, 1, readcnt, of)) < readcnt) {
@@ -500,6 +514,7 @@
if (readcnt != 0)
break;
}
+
if (!dmjob->sigalrm)
dmjob->sigalrm = ferror(f) && errno == ETIMEDOUT;
dmjob->siginfo_en = 0;
@@ -589,9 +604,89 @@
fetchFreeURL(dmjob->url);
if (tmppath != NULL)
free(tmppath);
+
return (r);
}
+FILE *
+dmXGet(struct dmjob *dmjob, struct url_stat *us)
+{
+ char flags[8];
+ int ret;
+ struct dmjob tmpjob;
+ struct dmreq tmpreq;
+ struct dmreq *dmreq = dmjob->request;
+
+ /* populate tmpjob */
+
+ /* TODO : Modify stat_* to udpate jobs of progress,
+ * right now we just put the msgs on stderr
+ * */
+ tmpjob.client = STDERR_FILENO;
+
+ tmpjob.request = &tmpreq;
+ tmpreq.v_level = dmreq->v_level;
+ tmpreq.ftp_timeout = dmjob->request->ftp_timeout;
+ tmpreq.http_timeout = dmjob->request->http_timeout;
+ tmpreq.B_size = dmjob->request->B_size;
+ tmpreq.S_size = dmjob->request->S_size;
+ tmpreq.T_secs = dmjob->request->T_secs;
+ tmpreq.flags = dmjob->request->flags;
+ tmpreq.family = dmjob->request->family;
+
+ tmpreq.i_filename = (char *) Malloc(strlen(dmreq->i_filename));
+ strcpy(tmpreq.i_filename, dmreq->i_filename);
+
+ tmpreq.URL = (char *) Malloc(strlen(dmreq->URL));
+ strcpy(tmpreq.URL, dmreq->URL);
+
+ tmpjob.url = NULL;
+ ret = mk_url(&tmpjob, flags);
+ if (ret <= 0) {
+
+ }
+
+ /* special case : -s flag
+ if (tmpreq.flags & s_FLAG) {
+ if (dmjob->timeout)
+ alarm(dmjob->timeout);
+ r = fetchStat(dmjob->url, us, flags);
+ if (dmjob->timeout)
+ alarm(0);
+ if (dmjob->sigalrm || dmjob->sigint)
+ goto signal;
+ if (r == -1) {
+ warnx("%s", fetchLastErrString);
+ goto failure;
+ }
+ if (us->size == -1)
+ printf("Unknown\n");
+ else
+ printf("%jd\n", (intmax_t)us->size);
+ goto success;
+ }
+ */
+ tmpreq.path = (char *) Malloc(strlen(dmreq->path) + strlen(TMP_EXT));
+ strcpy(tmpreq.path, dmreq->path);
+ strcat(tmpreq.path, TMP_EXT);
+
+ tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
+
+ FILE *f = fetchXGet(tmpjob.url, us, flags);
+ fetch(&tmpjob, f, *us);
+ fclose(f);
+
+ f = fopen(tmpreq.path, "r");
+
+ free(tmpjob.url->doc);
+ free(tmpjob.url);
+ free(tmpreq.i_filename);
+ free(tmpreq.URL);
+ free(tmpreq.path);
+
+ return f;
+}
+
static void
send_report(int sock, struct dmrep report, char op)
{
@@ -621,6 +716,9 @@
free(buf);
}
+/* TODO: This handler isn't registered as SIGUSR1 interrupts the download
+ * Figure out a proper way to handle this
+ * */
void
sig_handler(int sig)
{
@@ -630,7 +728,7 @@
pthread_t tid = pthread_self();
if (sig == SIGUSR1) {
while (tmp != NULL) {
- if (pthread_equal(tid, *(tmp->worker)) != 0)
+ if (pthread_equal(tid, tmp->worker) != 0)
break;
tmp = tmp->next;
}
@@ -641,8 +739,9 @@
tmp->sigint = 1;
else if (*clisig == SIGINFO)
tmp->siginfo = 1;
- else if (*clisig == SIGALRM)
- tmp->siginfo = 1;
+ else if (*clisig == SIGALRM) {
+ tmp->sigalrm = 1;
+ }
}
}
@@ -650,13 +749,61 @@
run_worker(struct dmjob *dmjob)
{
struct dmrep report;
+ struct dmjob *tmp;
+ struct url_stat us;
+ int err;
+ FILE *f;
+ char *tmppath;
+ char flags[8];
+
+ /* check if this is a duplicate */
+ mk_url(dmjob, flags);
+ tmp = jobs;
+ while (tmp != NULL) {
+ if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
+ dmjob->state = DUPLICATE;
+ dmjob->worker = tmp->worker;
+ return NULL;
+ }
+ tmp = tmp->next;
+ }
+
+ /* fetch the remote file into a local tmp file */
+ f = dmXGet(dmjob, &us);
+
+ /* Serve any outstanding requests from the local tmp file */
+ tmp = jobs;
+ while (tmp != NULL) {
+ if (compare_jobs(tmp, dmjob) != 0) {
+ tmp = tmp->next;
+ continue;
+ }
- int err = fetch(dmjob);
- report.status = err;
- report.errcode = fetchLastErrCode;
- report.errstr = fetchLastErrString;
- send_report(dmjob->client, report, DMRESP);
- dmjob->state = RUNNING;
+ if (f == NULL) {
+ err = -1;
+ } else {
+ fseek(f, 0, SEEK_SET);
+ err = fetch(dmjob, f, us);
+ }
+
+ report.status = err;
+ report.errcode = fetchLastErrCode;
+ report.errstr = fetchLastErrString;
+ send_report(dmjob->client, report, DMRESP);
+
+ tmp->state = DONE;
+ tmp = tmp->next;
+ }
+ /* remove the local tmp file */
+ if (f != NULL) {
+ tmppath = (char *) Malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
+ strcpy(tmppath, dmjob->request->path);
+ strcat(tmppath, TMP_EXT);
- return NULL;
+ remove(tmppath);
+ free(tmppath);
+ }
+
+ /* TODO : What is this? Yew!! */
+ sleep(10);
}
More information about the svn-soc-all
mailing list