blob: c4045c35fb7eb0b1a77ee6c8d1c7accdf4f5138d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
/**
* This class tests if EnhancedByteBufferAccess works correctly.
*/
public class TestEnhancedByteBufferAccess {
private static final Log LOG =
LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
static TemporarySocketDirectory sockDir;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
private static byte[] byteBufferToArray(ByteBuffer buf) {
byte resultArray[] = new byte[buf.remaining()];
buf.get(resultArray);
buf.flip();
return resultArray;
}
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
return conf;
}
@Test
public void testZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
ByteBuffer result = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
fsIn.releaseBuffer(result);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testShortZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
// Try to read 8192, but only get 4096 because of the block size.
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
ByteBuffer result =
dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
dfsIn.releaseBuffer(result);
// Try to read 4097, but only get 4096 because of the block size.
result =
dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
byteBufferToArray(result));
dfsIn.releaseBuffer(result);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testZeroCopyReadsNoFallback() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
ByteBuffer result;
try {
result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
private static class CountingVisitor
implements ClientMmapManager.ClientMmapVisitor {
int count = 0;
@Override
public void accept(ClientMmap mmap) {
count++;
}
public void reset() {
count = 0;
}
}
@Test
public void testZeroCopyMmapCache() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
ByteBuffer results[] = { null, null, null, null, null };
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
final CountingVisitor countingVisitor = new CountingVisitor();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
results[0] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
fsIn.seek(0);
results[1] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(1, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
countingVisitor.reset();
// The mmaps should be of the first block of the file.
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
@Override
public void accept(ClientMmap mmap) {
Assert.assertEquals(firstBlock, mmap.getBlock());
}
});
// Read more blocks.
results[2] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
results[3] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
try {
results[4] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
// we should have 3 mmaps, 0 evictable
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(3, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
// After we close the cursors, the mmaps should be evictable for
// a brief period of time. Then, they should be closed (we're
// using a very quick timeout)
for (ByteBuffer buffer : results) {
if (buffer != null) {
fsIn.releaseBuffer(buffer);
}
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
countingVisitor.reset();
try {
mmapManager.visitEvictable(countingVisitor);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return (0 == countingVisitor.count);
}
}, 10, 10000);
countingVisitor.reset();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
/**
* Test HDFS fallback reads. HDFS streams support the ByteBufferReadable
* interface.
*/
@Test
public void testHdfsFallbackReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
testFallbackImpl(fsIn, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
private static class RestrictedAllocatingByteBufferPool
implements ByteBufferPool {
private final boolean direct;
RestrictedAllocatingByteBufferPool(boolean direct) {
this.direct = direct;
}
@Override
public ByteBuffer getBuffer(boolean direct, int length) {
Preconditions.checkArgument(this.direct == direct);
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
}
@Override
public void putBuffer(ByteBuffer buffer) {
}
}
private static void testFallbackImpl(InputStream stream,
byte original[]) throws Exception {
RestrictedAllocatingByteBufferPool bufferPool =
new RestrictedAllocatingByteBufferPool(
stream instanceof ByteBufferReadable);
ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
Assert.assertEquals(10, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
Assert.assertEquals(5000, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
Assert.assertEquals(11375, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
Assert.assertNull(result);
}
/**
* Test the {@link ByteBufferUtil#fallbackRead} function directly.
*/
@Test
public void testFallbackRead() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
testFallbackImpl(fsIn, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
/**
* Test fallback reads on a stream which does not support the
* ByteBufferReadable * interface.
*/
@Test
public void testIndirectFallbackReads() throws Exception {
final File TEST_DIR = new File(
System.getProperty("test.build.data","build/test/data"));
final String TEST_PATH = TEST_DIR + File.separator +
"indirectFallbackTestFile";
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FileOutputStream fos = null;
FileInputStream fis = null;
try {
fos = new FileOutputStream(TEST_PATH);
Random random = new Random(RANDOM_SEED);
byte original[] = new byte[TEST_FILE_LENGTH];
random.nextBytes(original);
fos.write(original);
fos.close();
fos = null;
fis = new FileInputStream(TEST_PATH);
testFallbackImpl(fis, original);
} finally {
IOUtils.cleanup(LOG, fos, fis);
new File(TEST_PATH).delete();
}
}
}