socsvn commit: r256170 - soc2013/ambarisha/head/usr.bin/dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Mon Aug 19 20:51:29 UTC 2013
Author: ambarisha
Date: Mon Aug 19 20:51:29 2013
New Revision: 256170
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256170
Log:
Added mirror profiling
DMS samples the download speeds from mirrors and picks
the mirror which has the highest average speed over the
past week. It picks a mirror which hasn't been tried before
with preference.
Added:
soc2013/ambarisha/head/usr.bin/dms/mirror.c
Modified:
soc2013/ambarisha/head/usr.bin/dms/Makefile
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/dms.h
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:48:15 2013 (r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/Makefile Mon Aug 19 20:51:29 2013 (r256170)
@@ -1,7 +1,7 @@
# $FreeBSD$
.include <bsd.own.mk>
-SRCS= utils.c dms.c worker.c
+SRCS= mirror.c utils.c dms.c worker.c
PROG= dms
CSTD?= c99
.if ${MK_OPENSSL} != "no"
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:48:15 2013 (r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:51:29 2013 (r256170)
@@ -18,8 +18,12 @@
static char dm_errstr[512];
int stop;
+
struct dmjob *jobs;
-pthread_mutex_t job_queue_mutex;
+pthread_mutex_t job_queue_mutex;
+
+extern struct dmmirr *mirrors;
+extern pthread_mutex_t mirror_list_mutex;
void *run_worker(struct dmjob *job);
@@ -114,6 +118,8 @@
static struct dmjob *
mk_dmjob(struct dmreq *dmreq, int client)
{
+ int ret;
+ struct dmmirr *cur;
struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob));
if (dmjob == NULL) {
fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n");
@@ -128,6 +134,7 @@
return NULL;
}
+ dmjob->mirror = get_mirror();
dmjob->client = client;
dmjob->sigint = 0;
dmjob->sigalrm = 0;
@@ -187,6 +194,7 @@
break;
}
+
memcpy(&(dmreq->flags), rcvbuf + i, sizeof(dmreq->flags));
i += sizeof(dmreq->flags);
@@ -292,7 +300,6 @@
if (ret == -1) {
fprintf(stderr, "handle_request: Couldn't release "
"job queue lock\n");
-
goto error;
}
/* Job queue lock released */
@@ -322,11 +329,12 @@
void
sigint_handler(int sig)
{
+ save_mirrors();
stop = 1;
exit(1); // Temporary
}
-static state_t
+static int
service_job(struct dmjob *job, fd_set *fdset)
{
int ret = 0;
@@ -334,19 +342,23 @@
/* TODO: Worker can't handle this signal yet */
//pthread_kill(job->worker, SIGUSR1);
}
- return (job->state);
+
+ return ret;
}
static void
run_event_loop(int socket)
{
int i, ret, maxfd = socket;
- state_t state;
struct dmjob *cur;
void *retptr;
+ fd_set fdset;
+
jobs = NULL;
job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
- fd_set fdset;
+ mirrors = NULL;
+ mirror_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+ load_mirrors();
signal(SIGINT, sigint_handler);
while (!stop) {
@@ -398,8 +410,8 @@
cur = jobs;
while (cur != NULL) {
- state = service_job(cur, &fdset);
- if (state == DONE) {
+ ret = service_job(cur, &fdset);
+ if (ret > 0) {
close(cur->client);
jobs = rm_job(jobs, cur);
rm_dmjob(&cur);
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:48:15 2013 (r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h Mon Aug 19 20:51:29 2013 (r256170)
@@ -3,20 +3,29 @@
#include <sys/types.h>
-typedef enum {RUNNING=0, DONE, DUPLICATE} state_t;
+#define MAX_LISTEN_QUEUE 5
+#define MINBUFSIZE 4096
+#define MAX_SAMPLES 256
struct dmjob {
int ofd;
int client;
- state_t state;
int sigint;
int sigalrm;
int siginfo;
int siginfo_en;
unsigned timeout;
+
+ enum {
+ RUNNING = 0,
+ DONE,
+ DUPLICATE
+ } state;
+
pthread_t worker;
struct dmreq *request;
struct url *url;
+ struct dmmirr *mirror;
struct dmjob *next;
struct dmjob *prev;
@@ -28,9 +37,24 @@
char *errstr;
};
-#define DEBUG 1
+struct dmmirr {
+ char name[512];
+ int index;
+
+ enum {
+ NOT_TRIED = 0,
+ ACTIVE,
+ FAILED
+ } remark;
+
+ struct timeval timestamps[MAX_SAMPLES];
+ double samples[MAX_SAMPLES];
+ int nconns;
-#define MAX_LISTEN_QUEUE 5
-#define MINBUFSIZE 4096
+ struct dmmirr *next;
+ struct dmmirr *prev;
+};
+
+#define DEBUG 1
#endif
Added: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Mon Aug 19 20:51:29 2013 (r256170)
@@ -0,0 +1,310 @@
+#include <errno.h>
+#include <sys/time.h>
+#include "dm.h"
+#include "dms.h"
+
+#define MAX_SAMPLES 256
+#define MAX_CONNS 5
+#define MIRRORS_FILE "mirrors.list"
+
+struct dmmirr *mirrors;
+pthread_mutex_t mirror_list_mutex;
+
+static const char *MIRROR_LIST[] = {
+ "ftp.freebsd.org"
+};
+
+static struct dmmirr *
+add_mirror(struct dmmirr *head, struct dmmirr *new)
+{
+ new->prev = NULL;
+ new->next = NULL;
+
+ if (head == NULL)
+ return new;
+
+ head->prev = new;
+ new->next = head;
+}
+
+static struct dmmirr *
+rm_mirror(struct dmmirr *head, struct dmmirr *mirror)
+{
+ if (mirror->next != NULL)
+ mirror->next->prev = mirror->prev;
+
+ if (mirror->prev != NULL)
+ mirror->prev->next = mirror->next;
+
+ if (mirror == head)
+ return mirror->next;
+
+ return head;
+}
+
+static double
+get_speed(struct xferstat *xs)
+{
+ double delta = (xs->last.tv_sec + (xs->last.tv_usec / 1.e6))
+ - (xs->last2.tv_sec + (xs->last2.tv_usec / 1.e6));
+ if (delta == 0.0)
+ return -1.0;
+ return (xs->rcvd - xs->lastrcvd) / delta;
+}
+
+static struct dmmirr *
+read_mirror(FILE *f)
+{
+ int i;
+ struct dmmirr *mirror;
+ char buf[512], rem[64];
+
+ mirror = (struct dmmirr *) malloc(sizeof(struct dmmirr));
+ if (mirror == NULL) {
+ fprintf(stderr, "read_mirror: Insufficient memory\n");
+ return NULL;
+ }
+
+ if (fgets(buf, 512, f) == NULL) {
+ free(mirror);
+ return NULL;
+ }
+ sscanf(buf, "%s\n", mirror->name);
+
+ if (fgets(buf, 64, f) == NULL) {
+ fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n");
+ free(mirror);
+ return NULL;
+ }
+ sscanf(buf, "%s\n", rem);
+
+ if (strcmp(rem, "NOT_TRIED") == 0) {
+ mirror->remark = NOT_TRIED;
+ } else if (strcmp(rem, "FAILED") == 0) {
+ mirror->remark = FAILED;
+ } else {
+ fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n");
+ }
+
+ if (fgets(buf, 64, f) == NULL) {
+ fprintf(stderr, "WARNING: read_mirror: mirrors.list file corrupted\n");
+ free(mirror);
+ return NULL;
+ }
+ sscanf(buf, "%d\n", &mirror->index);
+
+ for(i = 0; i < MAX_SAMPLES; i++) {
+ fscanf(f, "%ld\t%f\n", &(mirror->timestamps[i].tv_sec),
+ &(mirror->samples[i]));
+ /* TODO: What if fscanf fails? */
+ }
+
+ return mirror;
+}
+
+static void
+write_mirror(struct dmmirr *mirror, FILE *f)
+{
+ int i;
+
+ fputs(mirror->name, f);
+ fputc('\n', f);
+
+ switch(mirror->remark) {
+ case NOT_TRIED:
+ fputs("NOT_TRIED\n", f);
+ break;
+ case FAILED:
+ fputs("FAILED\n", f);
+ break;
+ }
+
+ for(i = 0; i < MAX_SAMPLES; i++) {
+ fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec,
+ mirror->samples[i]);
+ }
+
+ return;
+}
+
+static int
+init_mirrors_file(void)
+{
+ int i, j;
+ FILE *f = fopen(MIRRORS_FILE, "w");
+ if (f == NULL)
+ return -1;
+
+ for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) {
+ fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f);
+ fprintf(f, "\nNOT_TRIED\n");
+ for (j = 0; j < MAX_SAMPLES; j++)
+ fprintf(f, "0\t0\n");
+ }
+
+ fclose(f);
+}
+
+int
+load_mirrors(void)
+{
+ int ret;
+ struct dmmirr *mirror;
+
+ FILE *f = fopen(MIRRORS_FILE, "r");
+ if (f == NULL && errno == ENOENT) {
+ init_mirrors_file();
+ f = fopen(MIRRORS_FILE, "r");
+ } else if (f == NULL) {
+ fprintf(stderr, "load_mirrors: fopen(%s) failed\n",
+ MIRRORS_FILE);
+ return -1;
+ }
+
+ /* Profile list lock */
+ ret = pthread_mutex_lock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Attempt to acquire"
+ " profile list mutex failed\n");
+ return -1;
+ }
+
+ mirror = read_mirror(f);
+ while(mirror != NULL) {
+ mirrors = add_mirror(mirrors, mirror);
+ mirror = read_mirror(f);
+ }
+
+ ret = pthread_mutex_unlock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Couldn't release "
+ "profile list lock\n");
+ return -1;
+ }
+ /* Profile list lock released */
+
+ fclose(f);
+ return 0;
+}
+
+int
+save_mirrors(void)
+{
+ int ret;
+ struct dmmirr *mirror = mirrors;
+
+ FILE *f = fopen(MIRRORS_FILE, "w");
+
+ /* Profile list lock */
+ ret = pthread_mutex_lock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Attempt to acquire"
+ " profile list mutex failed\n");
+ return -1;
+ }
+
+ while(mirror != NULL) {
+ write_mirror(mirror, f);
+ mirrors = rm_mirror(mirrors, mirror);
+ }
+
+ ret = pthread_mutex_unlock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Couldn't release "
+ "profile list lock\n");
+ return -1;
+ }
+ /* Profile list lock released */
+
+ fclose(f);
+ return 0;
+}
+
+void
+update_mirror(struct dmmirr *dmmirr, struct xferstat *xs)
+{
+ struct timeval tv;
+ double speed;
+
+ gettimeofday(&tv, NULL);
+ if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60)
+ return;
+
+ speed = get_speed(xs);
+
+ /* TODO: This assumes that workers and sites have 1-1 correspondence */
+ dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES;
+ dmmirr->timestamps[dmmirr->index] = tv;
+ dmmirr->samples[dmmirr->index] = speed;
+ dmmirr->remark = ACTIVE;
+}
+
+struct dmmirr *
+get_mirror(void)
+{
+ struct dmmirr *cur, *tmp;
+ double tmpmax = -1.0;
+ int cnt, ret, i;
+ struct timeval now;
+ long week_sec;
+ double average;
+
+ week_sec = 7 * 24 * 60 * 60;
+
+ /* Profile list lock */
+ ret = pthread_mutex_lock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Attempt to acquire"
+ " profile list mutex failed\n");
+ return NULL;
+ }
+
+ cur = mirrors;
+ tmp = NULL;
+ tmpmax = -1.0;
+ while (cur != NULL) {
+ if (cur->remark == NOT_TRIED) {
+ tmp = cur;
+ goto success;
+ }
+
+ if (cur->remark == FAILED)
+ goto next;
+ if (cur->nconns > MAX_CONNS)
+ goto next;
+
+
+ i = cur->index;
+ cnt = 0;
+ average = 0.0;
+ do {
+ gettimeofday(&now, NULL);
+ if (cur->timestamps[i].tv_sec < now.tv_sec - week_sec)
+ break;
+ average = (average * cnt + cur->samples[i]) / (cnt + 1);
+ cnt++;
+
+ i = (i - 1) % MAX_SAMPLES;
+ } while (i != cur->index);
+
+ if (average > tmpmax) {
+ tmpmax = average;
+ tmp = cur;
+ }
+next:
+ cur = cur->next;
+ }
+
+ /* TODO: If we couldn't pick up a mirror? */
+
+success:
+ ret = pthread_mutex_unlock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "get_mirror: Couldn't release "
+ "profile list lock\n");
+ return NULL;
+ }
+ /* Profile list lock released */
+
+ return tmp;
+}
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:48:15 2013 (r256169)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:51:29 2013 (r256170)
@@ -180,6 +180,9 @@
xs->offset = offset;
xs->rcvd = offset;
xs->lastrcvd = offset;
+
+ update_mirror(dmjob->mirror , xs);
+
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
stat_send(xs, 1);
else if (dmjob->request->v_level > 0)
@@ -190,6 +193,7 @@
stat_end(struct xferstat *xs, struct dmjob *dmjob)
{
gettimeofday(&xs->last, NULL);
+ update_mirror(dmjob->mirror , xs);
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) {
stat_send(xs, 2);
putc('\n', stderr);
@@ -203,10 +207,20 @@
stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob)
{
xs->rcvd = rcvd;
+ update_mirror(dmjob->mirror , xs);
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
stat_send(xs, 0);
}
+static void
+select_mirror(struct dmjob *dmjob)
+{
+ dmjob->mirror = get_mirror();
+ strcpy(dmjob->url->host, dmjob->mirror->name);
+ strcpy(dmjob->request->URL, dmjob->mirror->name);
+ strcat(dmjob->request->URL, dmjob->url->doc);
+}
+
static int
mk_url(struct dmjob *dmjob, char *flags)
{
@@ -228,12 +242,15 @@
fprintf(stderr, "warning: mk_url: URL empty\n");
goto failure;
}
+
if ((dmjob->url = fetchParseURL(dmreq->URL)) == NULL) {
warnx("%s: parse error", dmreq->URL);
goto failure;
}
- /* if no scheme was specified, take a guess */
+ /* Replace host name with the mirror name */
+ select_mirror(dmjob);
+
if (!*(dmjob->url->scheme)) {
if (!*(dmjob->url->host))
strcpy(dmjob->url->scheme, SCHEME_FILE);
@@ -732,6 +749,7 @@
goto success;
}
*/
+
tmpreq.path = (char *) malloc(strlen(dmreq->path) + strlen(TMP_EXT));
if (tmpreq.path == NULL) {
fprintf(stderr, "dmXGet: Insufficient memory\n");
More information about the svn-soc-all
mailing list