blob: 74d8f1f007050f85e30beb73cdb4d7f9da377899 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#ifdef __MACH__ // OS X does not have clock_gettime
#include <mach/clock.h>
#include <mach/mach.h>
#include <mach/mach_time.h>
#endif
#include "config.h"
#include "hdfs/hdfs.h"
#define VECSUM_CHUNK_SIZE (8 * 1024 * 1024)
#define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8)
#define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024)
#define DOUBLES_PER_LOOP_ITER 16
static double timespec_to_double(const struct timespec *ts)
{
double sec = ts->tv_sec;
double nsec = ts->tv_nsec;
return sec + (nsec / 1000000000L);
}
struct stopwatch {
struct timespec start;
struct timespec stop;
};
#ifdef __MACH__
static int clock_gettime_mono(struct timespec * ts) {
static mach_timebase_info_data_t tb;
static uint64_t timestart = 0;
uint64_t t = 0;
if (timestart == 0) {
mach_timebase_info(&tb);
timestart = mach_absolute_time();
}
t = mach_absolute_time() - timestart;
t *= tb.numer;
t /= tb.denom;
ts->tv_sec = t / 1000000000ULL;
ts->tv_nsec = t - (ts->tv_sec * 1000000000ULL);
return 0;
}
#else
static int clock_gettime_mono(struct timespec * ts) {
return clock_gettime(CLOCK_MONOTONIC, ts);
}
#endif
static struct stopwatch *stopwatch_create(void)
{
struct stopwatch *watch;
watch = calloc(1, sizeof(struct stopwatch));
if (!watch) {
fprintf(stderr, "failed to allocate memory for stopwatch\n");
goto error;
}
if (clock_gettime_mono(&watch->start)) {
int err = errno;
fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
"error %d (%s)\n", err, strerror(err));
goto error;
}
return watch;
error:
free(watch);
return NULL;
}
static void stopwatch_stop(struct stopwatch *watch,
long long bytes_read)
{
double elapsed, rate;
if (clock_gettime_mono(&watch->stop)) {
int err = errno;
fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
"error %d (%s)\n", err, strerror(err));
goto done;
}
elapsed = timespec_to_double(&watch->stop) -
timespec_to_double(&watch->start);
rate = (bytes_read / elapsed) / (1024 * 1024 * 1024);
printf("stopwatch: took %.5g seconds to read %lld bytes, "
"for %.5g GB/s\n", elapsed, bytes_read, rate);
printf("stopwatch: %.5g seconds\n", elapsed);
done:
free(watch);
}
enum vecsum_type {
VECSUM_LOCAL = 0,
VECSUM_LIBHDFS,
VECSUM_ZCR,
};
#define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local"
int parse_vecsum_type(const char *str)
{
if (strcasecmp(str, "local") == 0)
return VECSUM_LOCAL;
else if (strcasecmp(str, "libhdfs") == 0)
return VECSUM_LIBHDFS;
else if (strcasecmp(str, "zcr") == 0)
return VECSUM_ZCR;
else
return -1;
}
struct options {
// The path to read.
const char *path;
// Length of the file.
long long length;
// The number of times to read the path.
int passes;
// Type of vecsum to do
enum vecsum_type ty;
// RPC address to use for HDFS
const char *rpc_address;
};
static struct options *options_create(void)
{
struct options *opts = NULL;
const char *pass_str;
const char *ty_str;
const char *length_str;
int ty;
opts = calloc(1, sizeof(struct options));
if (!opts) {
fprintf(stderr, "failed to calloc options\n");
goto error;
}
opts->path = getenv("VECSUM_PATH");
if (!opts->path) {
fprintf(stderr, "You must set the VECSUM_PATH environment "
"variable to the path of the file to read.\n");
goto error;
}
length_str = getenv("VECSUM_LENGTH");
if (!length_str) {
length_str = "2147483648";
}
opts->length = atoll(length_str);
if (!opts->length) {
fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n",
length_str);
goto error;
}
if (opts->length % VECSUM_CHUNK_SIZE) {
fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The "
"currently specified length of '%lld' is not.\n",
(long long)VECSUM_CHUNK_SIZE, (long long)opts->length);
goto error;
}
pass_str = getenv("VECSUM_PASSES");
if (!pass_str) {
fprintf(stderr, "You must set the VECSUM_PASSES environment "
"variable to the number of passes to make.\n");
goto error;
}
opts->passes = atoi(pass_str);
if (opts->passes <= 0) {
fprintf(stderr, "Invalid value for the VECSUM_PASSES "
"environment variable. You must set this to a "
"number greater than 0.\n");
goto error;
}
ty_str = getenv("VECSUM_TYPE");
if (!ty_str) {
fprintf(stderr, "You must set the VECSUM_TYPE environment "
"variable to " VECSUM_TYPE_VALID_VALUES "\n");
goto error;
}
ty = parse_vecsum_type(ty_str);
if (ty < 0) {
fprintf(stderr, "Invalid VECSUM_TYPE environment variable. "
"Valid values are " VECSUM_TYPE_VALID_VALUES "\n");
goto error;
}
opts->ty = ty;
opts->rpc_address = getenv("VECSUM_RPC_ADDRESS");
if (!opts->rpc_address) {
opts->rpc_address = "default";
}
return opts;
error:
free(opts);
return NULL;
}
static int test_file_chunk_setup(double **chunk)
{
int i;
double *c, val;
c = malloc(VECSUM_CHUNK_SIZE);
if (!c) {
fprintf(stderr, "test_file_create: failed to malloc "
"a buffer of size '%lld'\n",
(long long) VECSUM_CHUNK_SIZE);
return EIO;
}
val = 0.0;
for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) {
c[i] = val;
val += 0.5;
}
*chunk = c;
return 0;
}
static void options_free(struct options *opts)
{
free(opts);
}
struct local_data {
int fd;
double *mmap;
long long length;
};
static int local_data_create_file(struct local_data *cdata,
const struct options *opts)
{
int ret = EIO;
int dup_fd = -1;
FILE *fp = NULL;
double *chunk = NULL;
long long offset = 0;
dup_fd = dup(cdata->fd);
if (dup_fd < 0) {
ret = errno;
fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n",
strerror(ret), ret);
goto done;
}
fp = fdopen(dup_fd, "w");
if (!fp) {
ret = errno;
fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n",
strerror(ret), ret);
goto done;
}
ret = test_file_chunk_setup(&chunk);
if (ret)
goto done;
while (offset < opts->length) {
if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) {
fprintf(stderr, "local_data_create_file: failed to write to "
"the local file '%s' at offset %lld\n",
opts->path, offset);
ret = EIO;
goto done;
}
offset += VECSUM_CHUNK_SIZE;
}
fprintf(stderr, "local_data_create_file: successfully re-wrote %s as "
"a file of length %lld\n", opts->path, opts->length);
ret = 0;
done:
if (dup_fd >= 0) {
close(dup_fd);
}
if (fp) {
fclose(fp);
}
free(chunk);
return ret;
}
static struct local_data *local_data_create(const struct options *opts)
{
struct local_data *cdata = NULL;
struct stat st_buf;
cdata = malloc(sizeof(*cdata));
if (!cdata) {
fprintf(stderr, "Failed to allocate local test data.\n");
goto error;
}
cdata->fd = -1;
cdata->mmap = MAP_FAILED;
cdata->length = opts->length;
cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777);
if (cdata->fd < 0) {
int err = errno;
fprintf(stderr, "local_data_create: failed to open %s "
"for read/write: error %d (%s)\n", opts->path, err, strerror(err));
goto error;
}
if (fstat(cdata->fd, &st_buf)) {
int err = errno;
fprintf(stderr, "local_data_create: fstat(%s) failed: "
"error %d (%s)\n", opts->path, err, strerror(err));
goto error;
}
if (st_buf.st_size != opts->length) {
int err;
fprintf(stderr, "local_data_create: current size of %s is %lld, but "
"we want %lld. Re-writing the file.\n",
opts->path, (long long)st_buf.st_size,
(long long)opts->length);
err = local_data_create_file(cdata, opts);
if (err)
goto error;
}
cdata->mmap = mmap(NULL, cdata->length, PROT_READ,
MAP_PRIVATE, cdata->fd, 0);
if (cdata->mmap == MAP_FAILED) {
int err = errno;
fprintf(stderr, "local_data_create: mmap(%s) failed: "
"error %d (%s)\n", opts->path, err, strerror(err));
goto error;
}
return cdata;
error:
if (cdata) {
if (cdata->fd >= 0) {
close(cdata->fd);
}
free(cdata);
}
return NULL;
}
static void local_data_free(struct local_data *cdata)
{
close(cdata->fd);
munmap(cdata->mmap, cdata->length);
}
struct libhdfs_data {
hdfsFS fs;
hdfsFile file;
long long length;
double *buf;
};
static void libhdfs_data_free(struct libhdfs_data *ldata)
{
if (ldata->fs) {
free(ldata->buf);
if (ldata->file) {
hdfsCloseFile(ldata->fs, ldata->file);
}
hdfsDisconnect(ldata->fs);
}
free(ldata);
}
static int libhdfs_data_create_file(struct libhdfs_data *ldata,
const struct options *opts)
{
int ret;
double *chunk = NULL;
long long offset = 0;
ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0);
if (!ldata->file) {
ret = errno;
fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, "
"O_WRONLY) failed: error %d (%s)\n", opts->path, ret,
strerror(ret));
goto done;
}
ret = test_file_chunk_setup(&chunk);
if (ret)
goto done;
while (offset < opts->length) {
ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE);
if (ret < 0) {
ret = errno;
fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at "
"offset %lld of %s\n", ret, strerror(ret),
offset, opts->path);
goto done;
} else if (ret < VECSUM_CHUNK_SIZE) {
fprintf(stderr, "libhdfs_data_create_file: got short write "
"of %d at offset %lld of %s\n", ret, offset, opts->path);
goto done;
}
offset += VECSUM_CHUNK_SIZE;
}
ret = 0;
done:
free(chunk);
if (ldata->file) {
if (hdfsCloseFile(ldata->fs, ldata->file)) {
fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error.");
ret = EIO;
}
ldata->file = NULL;
}
return ret;
}
static struct libhdfs_data *libhdfs_data_create(const struct options *opts)
{
struct libhdfs_data *ldata = NULL;
struct hdfsBuilder *builder = NULL;
hdfsFileInfo *pinfo = NULL;
ldata = calloc(1, sizeof(struct libhdfs_data));
if (!ldata) {
fprintf(stderr, "Failed to allocate libhdfs test data.\n");
goto error;
}
builder = hdfsNewBuilder();
if (!builder) {
fprintf(stderr, "Failed to create builder.\n");
goto error;
}
hdfsBuilderSetNameNode(builder, opts->rpc_address);
hdfsBuilderConfSetStr(builder,
"dfs.client.read.shortcircuit.skip.checksum", "true");
ldata->fs = hdfsBuilderConnect(builder);
if (!ldata->fs) {
fprintf(stderr, "Could not connect to default namenode!\n");
goto error;
}
pinfo = hdfsGetPathInfo(ldata->fs, opts->path);
if (!pinfo) {
int err = errno;
fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). "
"Attempting to re-create file.\n",
opts->path, err, strerror(err));
if (libhdfs_data_create_file(ldata, opts))
goto error;
} else if (pinfo->mSize != opts->length) {
fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, "
"but we want length %lld. Attempting to re-create file.\n",
opts->path, (long long)pinfo->mSize, (long long)opts->length);
if (libhdfs_data_create_file(ldata, opts))
goto error;
}
ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0);
if (!ldata->file) {
int err = errno;
fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n",
opts->path, err, strerror(err));
goto error;
}
ldata->length = opts->length;
return ldata;
error:
if (pinfo)
hdfsFreeFileInfo(pinfo, 1);
if (ldata)
libhdfs_data_free(ldata);
return NULL;
}
static int check_byte_size(int byte_size, const char *const str)
{
if (byte_size % sizeof(double)) {
fprintf(stderr, "%s is not a multiple "
"of sizeof(double)\n", str);
return EINVAL;
}
if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) {
fprintf(stderr, "The number of doubles contained in "
"%s is not a multiple of DOUBLES_PER_LOOP_ITER\n",
str);
return EINVAL;
}
return 0;
}
#ifdef HAVE_INTEL_SSE_INTRINSICS
#include <emmintrin.h>
static double vecsum(const double *buf, int num_doubles)
{
int i;
double hi, lo;
__m128d x0, x1, x2, x3, x4, x5, x6, x7;
__m128d sum0 = _mm_set_pd(0.0,0.0);
__m128d sum1 = _mm_set_pd(0.0,0.0);
__m128d sum2 = _mm_set_pd(0.0,0.0);
__m128d sum3 = _mm_set_pd(0.0,0.0);
__m128d sum4 = _mm_set_pd(0.0,0.0);
__m128d sum5 = _mm_set_pd(0.0,0.0);
__m128d sum6 = _mm_set_pd(0.0,0.0);
__m128d sum7 = _mm_set_pd(0.0,0.0);
for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) {
x0 = _mm_load_pd(buf + i + 0);
x1 = _mm_load_pd(buf + i + 2);
x2 = _mm_load_pd(buf + i + 4);
x3 = _mm_load_pd(buf + i + 6);
x4 = _mm_load_pd(buf + i + 8);
x5 = _mm_load_pd(buf + i + 10);
x6 = _mm_load_pd(buf + i + 12);
x7 = _mm_load_pd(buf + i + 14);
sum0 = _mm_add_pd(sum0, x0);
sum1 = _mm_add_pd(sum1, x1);
sum2 = _mm_add_pd(sum2, x2);
sum3 = _mm_add_pd(sum3, x3);
sum4 = _mm_add_pd(sum4, x4);
sum5 = _mm_add_pd(sum5, x5);
sum6 = _mm_add_pd(sum6, x6);
sum7 = _mm_add_pd(sum7, x7);
}
x0 = _mm_add_pd(sum0, sum1);
x1 = _mm_add_pd(sum2, sum3);
x2 = _mm_add_pd(sum4, sum5);
x3 = _mm_add_pd(sum6, sum7);
x4 = _mm_add_pd(x0, x1);
x5 = _mm_add_pd(x2, x3);
x6 = _mm_add_pd(x4, x5);
_mm_storeh_pd(&hi, x6);
_mm_storel_pd(&lo, x6);
return hi + lo;
}
#else
static double vecsum(const double *buf, int num_doubles)
{
int i;
double sum = 0.0;
for (i = 0; i < num_doubles; i++) {
sum += buf[i];
}
return sum;
}
#endif
static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata,
struct hadoopRzOptions *zopts,
const struct options *opts)
{
int32_t len;
double sum = 0.0;
const double *buf;
struct hadoopRzBuffer *rzbuf = NULL;
int ret;
while (1) {
rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE);
if (!rzbuf) {
ret = errno;
fprintf(stderr, "hadoopReadZero failed with error "
"code %d (%s)\n", ret, strerror(ret));
goto done;
}
buf = hadoopRzBufferGet(rzbuf);
if (!buf) break;
len = hadoopRzBufferLength(rzbuf);
if (len < ZCR_READ_CHUNK_SIZE) {
fprintf(stderr, "hadoopReadZero got a partial read "
"of length %d\n", len);
ret = EINVAL;
goto done;
}
sum += vecsum(buf,
ZCR_READ_CHUNK_SIZE / sizeof(double));
hadoopRzBufferFree(ldata->file, rzbuf);
}
printf("finished zcr pass %d. sum = %g\n", pass, sum);
ret = 0;
done:
if (rzbuf)
hadoopRzBufferFree(ldata->file, rzbuf);
return ret;
}
static int vecsum_zcr(struct libhdfs_data *ldata,
const struct options *opts)
{
int ret, pass;
struct hadoopRzOptions *zopts = NULL;
zopts = hadoopRzOptionsAlloc();
if (!zopts) {
fprintf(stderr, "hadoopRzOptionsAlloc failed.\n");
ret = ENOMEM;
goto done;
}
if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) {
ret = errno;
perror("hadoopRzOptionsSetSkipChecksum failed: ");
goto done;
}
if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) {
ret = errno;
perror("hadoopRzOptionsSetByteBufferPool failed: ");
goto done;
}
for (pass = 0; pass < opts->passes; ++pass) {
ret = vecsum_zcr_loop(pass, ldata, zopts, opts);
if (ret) {
fprintf(stderr, "vecsum_zcr_loop pass %d failed "
"with error %d\n", pass, ret);
goto done;
}
hdfsSeek(ldata->fs, ldata->file, 0);
}
ret = 0;
done:
if (zopts)
hadoopRzOptionsFree(zopts);
return ret;
}
tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
uint8_t *buf = buffer;
tSize ret, nread = 0;
while (length > 0) {
ret = hdfsRead(fs, f, buf, length);
if (ret < 0) {
if (errno != EINTR) {
return -1;
}
}
if (ret == 0) {
break;
}
nread += ret;
length -= ret;
buf += ret;
}
return nread;
}
static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata,
const struct options *opts)
{
double sum = 0.0;
while (1) {
int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf,
NORMAL_READ_CHUNK_SIZE);
if (res == 0) // EOF
break;
if (res < 0) {
int err = errno;
fprintf(stderr, "hdfsRead failed with error %d (%s)\n",
err, strerror(err));
return err;
}
if (res < NORMAL_READ_CHUNK_SIZE) {
fprintf(stderr, "hdfsRead got a partial read of "
"length %d\n", res);
return EINVAL;
}
sum += vecsum(ldata->buf,
NORMAL_READ_CHUNK_SIZE / sizeof(double));
}
printf("finished normal pass %d. sum = %g\n", pass, sum);
return 0;
}
static int vecsum_libhdfs(struct libhdfs_data *ldata,
const struct options *opts)
{
int pass;
ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE);
if (!ldata->buf) {
fprintf(stderr, "failed to malloc buffer of size %d\n",
NORMAL_READ_CHUNK_SIZE);
return ENOMEM;
}
for (pass = 0; pass < opts->passes; ++pass) {
int ret = vecsum_normal_loop(pass, ldata, opts);
if (ret) {
fprintf(stderr, "vecsum_normal_loop pass %d failed "
"with error %d\n", pass, ret);
return ret;
}
hdfsSeek(ldata->fs, ldata->file, 0);
}
return 0;
}
static void vecsum_local(struct local_data *cdata, const struct options *opts)
{
int pass;
for (pass = 0; pass < opts->passes; pass++) {
double sum = vecsum(cdata->mmap, cdata->length / sizeof(double));
printf("finished vecsum_local pass %d. sum = %g\n", pass, sum);
}
}
static long long vecsum_length(const struct options *opts,
const struct libhdfs_data *ldata)
{
if (opts->ty == VECSUM_LOCAL) {
struct stat st_buf = { 0 };
if (stat(opts->path, &st_buf)) {
int err = errno;
fprintf(stderr, "vecsum_length: stat(%s) failed: "
"error %d (%s)\n", opts->path, err, strerror(err));
return -EIO;
}
return st_buf.st_size;
} else {
return ldata->length;
}
}
/*
* vecsum is a microbenchmark which measures the speed of various ways of
* reading from HDFS. It creates a file containing floating-point 'doubles',
* and computes the sum of all the doubles several times. For some CPUs,
* assembly optimizations are used for the summation (SSE, etc).
*/
int main(void)
{
int ret = 1;
struct options *opts = NULL;
struct local_data *cdata = NULL;
struct libhdfs_data *ldata = NULL;
struct stopwatch *watch = NULL;
if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") ||
check_byte_size(ZCR_READ_CHUNK_SIZE,
"ZCR_READ_CHUNK_SIZE") ||
check_byte_size(NORMAL_READ_CHUNK_SIZE,
"NORMAL_READ_CHUNK_SIZE")) {
goto done;
}
opts = options_create();
if (!opts)
goto done;
if (opts->ty == VECSUM_LOCAL) {
cdata = local_data_create(opts);
if (!cdata)
goto done;
} else {
ldata = libhdfs_data_create(opts);
if (!ldata)
goto done;
}
watch = stopwatch_create();
if (!watch)
goto done;
switch (opts->ty) {
case VECSUM_LOCAL:
vecsum_local(cdata, opts);
ret = 0;
break;
case VECSUM_LIBHDFS:
ret = vecsum_libhdfs(ldata, opts);
break;
case VECSUM_ZCR:
ret = vecsum_zcr(ldata, opts);
break;
}
if (ret) {
fprintf(stderr, "vecsum failed with error %d\n", ret);
goto done;
}
ret = 0;
done:
fprintf(stderr, "cleaning up...\n");
if (watch && (ret == 0)) {
long long length = vecsum_length(opts, ldata);
if (length >= 0) {
stopwatch_stop(watch, length * opts->passes);
}
}
if (cdata)
local_data_free(cdata);
if (ldata)
libhdfs_data_free(ldata);
if (opts)
options_free(opts);
return ret;
}
// vim: ts=4:sw=4:tw=79:et