blob: 03ff1ee77537ce5e80e65b699fae388e6aacbb53 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
private static DistributedFileSystem FS;
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 2000;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
startMiniDFSCluster(3);
FS = CLUSTER.getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
}
@AfterClass
public static void tearDown() throws IOException, InterruptedException {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
}
shutdownMiniDFSCluster();
}
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
throws IOException, InterruptedException, ExecutionException {
List<CompletableFuture<Long>> futures = new ArrayList<>();
byte[] b = new byte[10];
Random rand = new Random(12345);
// test pipelined flush
for (int i = 0; i < 10; i++) {
rand.nextBytes(b);
out.write(b);
futures.add(out.flush(false));
futures.add(out.flush(false));
}
for (int i = 0; i < 10; i++) {
assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
}
out.close();
assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
rand.setSeed(12345);
try (FSDataInputStream in = fs.open(f)) {
for (int i = 0; i < 10; i++) {
in.readFully(actual);
rand.nextBytes(b);
assertArrayEquals(b, actual);
}
assertEquals(-1, in.read());
}
}
@Test
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(FS, f, out);
}
@Test
public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length);
out.flush(false).get();
// restart one datanode which causes one connection broken
CLUSTER.restartDataNode(0);
out.write(b, 0, b.length);
try {
out.flush(false).get();
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
LOG.info("expected exception caught", e);
}
out.recoverAndClose(null);
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
}
@Test
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(FS, f, out);
}
/**
* This is important for fencing when recover from RS crash.
*/
@Test
public void testCreateParentFailed() throws IOException {
Path f = new Path("/" + name.getMethodName() + "/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
}
}
@Test
public void testConnectToDatanodeFailed()
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException, InterruptedException, NoSuchFieldException {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass =
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {
CLUSTER.restartDataNode(dnProp);
}
}
@Test
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);
out.flush(false);
assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
}
}