socsvn commit: r256170 - soc2013/ambarisha/head/usr.bin/dms

ambarisha at FreeBSD.org ambarisha at FreeBSD.org
Mon Aug 19 20:51:29 UTC 2013


Author: ambarisha
Date: Mon Aug 19 20:51:29 2013
New Revision: 256170
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256170

Log:
  Added mirror profiling
  
  DMS samples the download speeds from mirrors and picks
  the mirror which has the highest average speed over the
  past week. It picks a mirror which hasn't been tried before
  with preference.
  

Added:
  soc2013/ambarisha/head/usr.bin/dms/mirror.c
Modified:
  soc2013/ambarisha/head/usr.bin/dms/Makefile
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/dms.h
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 20:48:15 2013	(r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 20:51:29 2013	(r256170)
@@ -1,7 +1,7 @@
 # $FreeBSD$
 .include <bsd.own.mk>
 
-SRCS=		utils.c dms.c worker.c
+SRCS=		mirror.c utils.c dms.c worker.c
 PROG=		dms
 CSTD?=		c99
 .if ${MK_OPENSSL} != "no"

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:48:15 2013	(r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:51:29 2013	(r256170)
@@ -18,8 +18,12 @@
 static char	dm_errstr[512];
 
 int	 	 	 stop;
+
 struct dmjob		*jobs;
-pthread_mutex_t	 job_queue_mutex;
+pthread_mutex_t	 	 job_queue_mutex;
+
+extern struct dmmirr		*mirrors;
+extern pthread_mutex_t		 mirror_list_mutex;
 
 void *run_worker(struct dmjob *job);
 
@@ -114,6 +118,8 @@
 static struct dmjob *
 mk_dmjob(struct dmreq *dmreq, int client)
 {
+	int ret;
+	struct dmmirr *cur;
 	struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob));
 	if (dmjob == NULL) {
 		fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n");
@@ -128,6 +134,7 @@
 		return NULL;
 	}
 
+	dmjob->mirror = get_mirror();
 	dmjob->client = client;
 	dmjob->sigint = 0;
 	dmjob->sigalrm = 0;
@@ -187,6 +194,7 @@
 		break;
 	}
 
+	
 	memcpy(&(dmreq->flags), rcvbuf + i, sizeof(dmreq->flags));
 	i += sizeof(dmreq->flags);
 
@@ -292,7 +300,6 @@
 		if (ret == -1) {
 			fprintf(stderr, "handle_request: Couldn't release "
 					"job queue lock\n");
-
 			goto error;
 		}
 		/* Job queue lock released */
@@ -322,11 +329,12 @@
 void
 sigint_handler(int sig)
 {
+	save_mirrors();
 	stop = 1;
 	exit(1); // Temporary
 }
 
-static state_t
+static int
 service_job(struct dmjob *job, fd_set *fdset)
 {
 	int ret = 0;
@@ -334,19 +342,23 @@
 		/* TODO: Worker can't handle this signal yet */
 		//pthread_kill(job->worker, SIGUSR1);
 	}
-	return (job->state);
+
+	return ret;
 }
 
 static void
 run_event_loop(int socket)
 {
 	int i, ret, maxfd = socket;
-	state_t state;
 	struct dmjob *cur;
 	void *retptr;
+	fd_set fdset;
+
 	jobs = NULL;
 	job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-	fd_set fdset;
+	mirrors = NULL;
+	mirror_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+	load_mirrors();
 
 	signal(SIGINT, sigint_handler);
 	while (!stop) {
@@ -398,8 +410,8 @@
 
 		cur = jobs;
 		while (cur != NULL) {
-			state = service_job(cur, &fdset);
-			if (state == DONE) {
+			ret = service_job(cur, &fdset);
+			if (ret > 0) {
 				close(cur->client);
 				jobs = rm_job(jobs, cur);
 				rm_dmjob(&cur);

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 20:48:15 2013	(r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 20:51:29 2013	(r256170)
@@ -3,20 +3,29 @@
 
 #include <sys/types.h>
 
-typedef enum {RUNNING=0, DONE, DUPLICATE} state_t;
+#define MAX_LISTEN_QUEUE	5
+#define MINBUFSIZE		4096
+#define MAX_SAMPLES		256
 
 struct dmjob {
 	int		 ofd;
 	int	 	 client;
-	state_t	 	 state;
 	int		 sigint;
 	int	 	 sigalrm;
 	int	 	 siginfo;
 	int	 	 siginfo_en;
 	unsigned	 timeout;
+
+	enum {
+		RUNNING = 0,
+		DONE,
+		DUPLICATE
+	} state;
+
 	pthread_t	 worker;
 	struct dmreq 	*request;
 	struct url	*url;
+	struct dmmirr	*mirror;
 
 	struct dmjob 	*next;
 	struct dmjob	*prev;
@@ -28,9 +37,24 @@
 	char 	*errstr;
 };
 
-#define DEBUG			1
+struct dmmirr {
+	char		name[512];
+	int		index;
+
+	enum {
+		NOT_TRIED = 0,
+		ACTIVE,
+		FAILED
+	} remark;
+
+	struct timeval	timestamps[MAX_SAMPLES];
+	double		samples[MAX_SAMPLES];
+	int		nconns;
 
-#define MAX_LISTEN_QUEUE	5
-#define MINBUFSIZE		4096
+	struct dmmirr 	*next;
+	struct dmmirr	*prev;
+};
+
+#define DEBUG			1
 
 #endif

Added: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c	Mon Aug 19 20:51:29 2013	(r256170)
@@ -0,0 +1,310 @@
+#include <errno.h>
+#include <sys/time.h>
+#include "dm.h"
+#include "dms.h"
+
+#define 	MAX_SAMPLES	256
+#define		MAX_CONNS	5
+#define		MIRRORS_FILE	"mirrors.list"
+
+struct dmmirr		*mirrors;
+pthread_mutex_t	 	 mirror_list_mutex;
+
+static const char *MIRROR_LIST[] = {
+	"ftp.freebsd.org"
+};
+
+static struct dmmirr *
+add_mirror(struct dmmirr *head, struct dmmirr *new)
+{ 
+	new->prev = NULL;
+	new->next = NULL;
+
+	if (head == NULL)
+		return new;
+
+	head->prev = new;
+	new->next = head;	
+}
+
+static struct dmmirr *
+rm_mirror(struct dmmirr *head, struct dmmirr *mirror)
+{
+	if (mirror->next != NULL) 
+		mirror->next->prev = mirror->prev;
+
+	if (mirror->prev != NULL)
+		mirror->prev->next = mirror->next;
+	
+	if (mirror == head) 
+		return mirror->next;
+
+	return head;
+}
+
+static double
+get_speed(struct xferstat *xs)
+{
+	double delta = (xs->last.tv_sec + (xs->last.tv_usec / 1.e6))
+			- (xs->last2.tv_sec + (xs->last2.tv_usec / 1.e6));
+	if (delta == 0.0) 
+		return -1.0;
+	return (xs->rcvd - xs->lastrcvd) / delta;
+}
+
+static struct dmmirr *
+read_mirror(FILE *f)
+{
+	int i;
+	struct dmmirr *mirror;
+	char buf[512], rem[64];
+
+	mirror = (struct dmmirr *) malloc(sizeof(struct dmmirr));
+	if (mirror == NULL) {
+		fprintf(stderr, "read_mirror: Insufficient memory\n");
+		return NULL;
+	}
+
+	if (fgets(buf, 512, f) == NULL) {
+		free(mirror);
+		return NULL;
+	}
+	sscanf(buf, "%s\n", mirror->name);
+
+	if (fgets(buf, 64, f) == NULL) {
+		fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n");
+		free(mirror);
+		return NULL;
+	}
+	sscanf(buf, "%s\n", rem);
+
+	if (strcmp(rem, "NOT_TRIED") == 0) {
+		mirror->remark = NOT_TRIED;
+	} else if (strcmp(rem, "FAILED") == 0) {
+		mirror->remark = FAILED;
+	} else {
+		fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n");
+	}
+
+	if (fgets(buf, 64, f) == NULL) {
+		fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n");
+		free(mirror);
+		return NULL;
+	}
+	sscanf(buf, "%d\n", &mirror->index);
+
+	for(i = 0; i < MAX_SAMPLES; i++) {
+		fscanf(f, "%ld\t%f\n", &(mirror->timestamps[i].tv_sec),
+					&(mirror->samples[i]));
+		/* TODO: What if fscanf fails? */
+	}
+
+	return mirror;
+}
+
+static void
+write_mirror(struct dmmirr *mirror, FILE *f)
+{
+	int i;
+	
+	fputs(mirror->name, f);
+	fputc('\n', f);
+	
+	switch(mirror->remark) {
+	case NOT_TRIED:
+		fputs("NOT_TRIED\n", f);
+		break;
+	case FAILED:
+		fputs("FAILED\n", f);
+		break;
+	}
+
+	for(i = 0; i < MAX_SAMPLES; i++) {
+		fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec,
+					mirror->samples[i]);
+	}
+	
+	return;
+}
+
+static int
+init_mirrors_file(void)
+{
+	int i, j;
+	FILE *f = fopen(MIRRORS_FILE, "w");
+	if (f == NULL)
+		return -1;
+	
+	for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) {
+		fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f);
+		fprintf(f, "\nNOT_TRIED\n");
+		for (j = 0; j < MAX_SAMPLES; j++)
+			fprintf(f, "0\t0\n");
+	}
+
+	fclose(f);
+}
+
+int
+load_mirrors(void)
+{
+	int ret;
+	struct dmmirr *mirror;
+
+	FILE *f = fopen(MIRRORS_FILE, "r");
+	if (f == NULL && errno == ENOENT) {
+		init_mirrors_file();
+		f = fopen(MIRRORS_FILE, "r");
+	} else if (f == NULL) {
+		fprintf(stderr, "load_mirrors: fopen(%s) failed\n",
+				MIRRORS_FILE);
+		return -1;
+	}
+
+	/* Profile list lock */
+	ret = pthread_mutex_lock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Attempt to acquire"
+				" profile list mutex failed\n");
+		return -1;
+	}
+
+	mirror = read_mirror(f);
+	while(mirror != NULL) {
+		mirrors = add_mirror(mirrors, mirror);
+		mirror = read_mirror(f);
+	}
+
+	ret = pthread_mutex_unlock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Couldn't release "
+				"profile list lock\n");
+		return -1;
+	}
+	/* Profile list lock released */
+
+	fclose(f);
+	return 0;
+}
+
+int
+save_mirrors(void)
+{
+	int ret;
+	struct dmmirr *mirror = mirrors;
+
+	FILE *f = fopen(MIRRORS_FILE, "w");
+
+	/* Profile list lock */
+	ret = pthread_mutex_lock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Attempt to acquire"
+				" profile list mutex failed\n");
+		return -1;
+	}
+
+	while(mirror != NULL) {
+		write_mirror(mirror, f);	
+		mirrors = rm_mirror(mirrors, mirror);
+	}
+
+	ret = pthread_mutex_unlock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Couldn't release "
+				"profile list lock\n");
+		return -1;
+	}
+	/* Profile list lock released */
+
+	fclose(f);
+	return 0;
+}
+
+void
+update_mirror(struct dmmirr *dmmirr, struct xferstat *xs)
+{
+	struct timeval tv;
+	double speed;
+
+	gettimeofday(&tv, NULL);
+	if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60)
+		return;
+
+	speed = get_speed(xs);
+
+	/* TODO: This assumes that workers and sites have 1-1 correspondence */
+	dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES;
+	dmmirr->timestamps[dmmirr->index] = tv;
+	dmmirr->samples[dmmirr->index] = speed;
+	dmmirr->remark = ACTIVE;
+}
+
+struct dmmirr *
+get_mirror(void)
+{
+	struct dmmirr *cur, *tmp;
+	double tmpmax = -1.0;
+	int cnt, ret, i;
+	struct timeval now;
+	long week_sec;
+	double average;
+
+	week_sec = 7 * 24 * 60 * 60;
+
+	/* Profile list lock */
+	ret = pthread_mutex_lock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Attempt to acquire"
+				" profile list mutex failed\n");
+		return NULL;
+	}
+
+	cur = mirrors;
+	tmp = NULL;
+	tmpmax = -1.0;
+	while (cur != NULL) {
+		if (cur->remark == NOT_TRIED) {
+			tmp = cur;
+			goto success;
+		}
+
+		if (cur->remark == FAILED)
+			goto next;
+		if (cur->nconns > MAX_CONNS)
+			goto next;
+
+
+		i = cur->index;
+		cnt = 0;
+		average = 0.0;
+		do {
+			gettimeofday(&now, NULL);
+			if (cur->timestamps[i].tv_sec <  now.tv_sec - week_sec)
+				break;
+			average = (average * cnt + cur->samples[i]) / (cnt + 1);
+			cnt++;
+
+			i = (i - 1) % MAX_SAMPLES;
+		} while (i != cur->index);
+
+		if (average > tmpmax) {
+			tmpmax = average;
+			tmp = cur;
+		}
+next:
+		cur = cur->next;
+	}
+
+	/* TODO: If we couldn't pick up a mirror? */
+
+success:
+	ret = pthread_mutex_unlock(&mirror_list_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "get_mirror: Couldn't release "
+				"profile list lock\n");
+		return NULL;
+	}
+	/* Profile list lock released */
+	
+	return tmp;
+}

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:48:15 2013	(r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:51:29 2013	(r256170)
@@ -180,6 +180,9 @@
 	xs->offset = offset;
 	xs->rcvd = offset;
 	xs->lastrcvd = offset;
+	
+	update_mirror(dmjob->mirror , xs);
+
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
 		stat_send(xs, 1);
 	else if (dmjob->request->v_level > 0)
@@ -190,6 +193,7 @@
 stat_end(struct xferstat *xs, struct dmjob *dmjob)
 {
 	gettimeofday(&xs->last, NULL);
+	update_mirror(dmjob->mirror , xs);
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) {
 		stat_send(xs, 2);
 		putc('\n', stderr);
@@ -203,10 +207,20 @@
 stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob)
 {
 	xs->rcvd = rcvd;
+	update_mirror(dmjob->mirror , xs);
 	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
 		stat_send(xs, 0);
 }
 
+static void
+select_mirror(struct dmjob *dmjob)
+{
+	dmjob->mirror = get_mirror();
+	strcpy(dmjob->url->host, dmjob->mirror->name);
+	strcpy(dmjob->request->URL, dmjob->mirror->name);
+	strcat(dmjob->request->URL, dmjob->url->doc);
+}
+
 static int
 mk_url(struct dmjob *dmjob, char *flags)
 {
@@ -228,12 +242,15 @@
 		fprintf(stderr, "warning: mk_url: URL empty\n");
 		goto failure;
 	}
+
 	if ((dmjob->url = fetchParseURL(dmreq->URL)) == NULL) {
 		warnx("%s: parse error", dmreq->URL);
 		goto failure;
 	}
 
-	/* if no scheme was specified, take a guess */
+	/* Replace host name with the mirror name */
+	select_mirror(dmjob);
+
 	if (!*(dmjob->url->scheme)) {
 		if (!*(dmjob->url->host))
 			strcpy(dmjob->url->scheme, SCHEME_FILE);
@@ -732,6 +749,7 @@
 		goto success;
 	}
 	*/
+
 	tmpreq.path = (char *) malloc(strlen(dmreq->path) + strlen(TMP_EXT));
 	if (tmpreq.path == NULL) {
 		fprintf(stderr, "dmXGet: Insufficient memory\n");


More information about the svn-soc-all mailing list