blob: bf529b49f6439930a3b80d6cff851901f894c8b0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs/hdfs.h"
#include "native_mini_dfs.h"
#include "platform.h"
#include <errno.h>
#include <inttypes.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
#define TEST_FILE_NAME_LENGTH 128
#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
#define TEST_ZEROCOPY_NUM_BLOCKS 6
#define SMALL_READ_LEN 16
#define TEST_ZEROCOPY_FILE_LEN \
(((TEST_ZEROCOPY_NUM_BLOCKS - 1) * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + \
TEST_ZEROCOPY_LAST_BLOCK_SIZE)
#define ZC_BUF_LEN 32768
static uint8_t *getZeroCopyBlockData(int blockIdx)
{
uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
int i;
if (!buf) {
fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
exit(1);
}
for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
buf[i] = (uint8_t)(blockIdx + (i % 17));
}
return buf;
}
static int getZeroCopyBlockLen(int blockIdx)
{
if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
return 0;
} else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
} else {
return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
}
}
static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
{
hdfsFile file = NULL;
struct hadoopRzOptions *opts = NULL;
struct hadoopRzBuffer *buffer = NULL;
uint8_t *block;
file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
opts = hadoopRzOptionsAlloc();
EXPECT_NONNULL(opts);
EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
/* haven't read anything yet */
EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
block = getZeroCopyBlockData(0);
EXPECT_NONNULL(block);
/* first read is half of a block. */
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopRzBufferLength(buffer));
EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
hadoopRzBufferFree(file, buffer);
/* read the next half of the block */
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopRzBufferLength(buffer));
EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
hadoopRzBufferFree(file, buffer);
free(block);
EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* Now let's read just a few bytes. */
buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
block = getZeroCopyBlockData(1);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
hadoopRzBufferFree(file, buffer);
EXPECT_INT64_EQ(
(int64_t)TEST_ZEROCOPY_FULL_BLOCK_SIZE + (int64_t)SMALL_READ_LEN,
hdfsTell(fs, file));
EXPECT_ZERO(expectFileStats(file,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
/* Clear 'skip checksums' and test that we can't do zero-copy reads any
* more. Since there is no ByteBufferPool set, we should fail with
* EPROTONOSUPPORT.
*/
EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
/* Verify that setting a NULL ByteBufferPool class works. */
EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, NULL));
EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
/* Now set a ByteBufferPool and try again. It should succeed this time. */
EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
ELASTIC_BYTE_BUFFER_POOL_CLASS));
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
EXPECT_ZERO(expectFileStats(file,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
free(block);
block = getZeroCopyBlockData(2);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, (uint8_t*)hadoopRzBufferGet(buffer) +
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
hadoopRzBufferFree(file, buffer);
/* Check the result of a zero-length read. */
buffer = hadoopReadZero(file, opts, 0);
EXPECT_NONNULL(buffer);
EXPECT_NONNULL(hadoopRzBufferGet(buffer));
EXPECT_INT_EQ(0, hadoopRzBufferLength(buffer));
hadoopRzBufferFree(file, buffer);
/* Check the result of reading past EOF */
EXPECT_INT_EQ(0, hdfsSeek(fs, file, TEST_ZEROCOPY_FILE_LEN));
buffer = hadoopReadZero(file, opts, 1);
EXPECT_NONNULL(buffer);
EXPECT_NULL(hadoopRzBufferGet(buffer));
hadoopRzBufferFree(file, buffer);
/* Cleanup */
free(block);
hadoopRzOptionsFree(opts);
EXPECT_ZERO(hdfsCloseFile(fs, file));
return 0;
}
static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
size_t testFileNameLen)
{
int blockIdx, blockLen;
hdfsFile file;
uint8_t *data;
snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
getpid(), rand());
file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
TEST_ZEROCOPY_FULL_BLOCK_SIZE);
EXPECT_NONNULL(file);
for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
blockLen = getZeroCopyBlockLen(blockIdx);
data = getZeroCopyBlockData(blockIdx);
EXPECT_NONNULL(data);
EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
}
EXPECT_ZERO(hdfsCloseFile(fs, file));
return 0;
}
static int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
struct hdfsBuilder *bld) {
int ret;
tPort port;
const char *domainSocket;
hdfsBuilderSetNameNode(bld, "localhost");
port = (tPort) nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
return EIO;
}
hdfsBuilderSetNameNodePort(bld, port);
domainSocket = hdfsGetDomainSocketPath(cl);
if (domainSocket) {
ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
if (ret) {
return ret;
}
ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
domainSocket);
if (ret) {
return ret;
}
}
return 0;
}
/**
* Test that we can write a file with libhdfs and then read it back
*/
int main(void)
{
int port;
struct NativeMiniDfsConf conf = {
1, /* doFormat */
0, /* webhdfsEnabled */
0, /* namenodeHttpPort */
1, /* configureShortCircuit */
};
char testFileName[TEST_FILE_NAME_LENGTH];
hdfsFS fs;
struct NativeMiniDfsCluster* cl;
struct hdfsBuilder *bld;
cl = nmdCreate(&conf);
EXPECT_NONNULL(cl);
EXPECT_ZERO(nmdWaitClusterUp(cl));
port = nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "TEST_ERROR: test_zerocopy: "
"nmdGetNameNodePort returned error %d\n", port);
return EXIT_FAILURE;
}
bld = hdfsNewBuilder();
EXPECT_NONNULL(bld);
EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderConfSetStr(bld, "dfs.block.size",
TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* ensure that we'll always get our mmaps */
hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
"true");
fs = hdfsBuilderConnect(bld);
EXPECT_NONNULL(fs);
EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
TEST_FILE_NAME_LENGTH));
EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
EXPECT_ZERO(hdfsDisconnect(fs));
EXPECT_ZERO(nmdShutdown(cl));
nmdFree(cl);
fprintf(stderr, "TEST_SUCCESS\n");
return EXIT_SUCCESS;
}