blob: 7449b13952d0cbfa8c45b63f84077ea9f0991622 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test main functionality of ObserverNode.
*/
public class TestObserverNode {
public static final Logger LOG =
LoggerFactory.getLogger(TestObserverNode.class.getName());
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private final Path testPath= new Path("/TestObserverNode");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
dfsCluster = qjmhaCluster.getDfsCluster();
}
@Before
public void setUp() throws Exception {
setObserverRead(true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
assertEquals("NN[0] should be active", HAServiceState.ACTIVE,
getServiceState(dfsCluster.getNameNode(0)));
assertEquals("NN[1] should be standby", HAServiceState.STANDBY,
getServiceState(dfsCluster.getNameNode(1)));
assertEquals("NN[2] should be observer", HAServiceState.OBSERVER,
getServiceState(dfsCluster.getNameNode(2)));
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testNoActiveToObserver() throws Exception {
try {
dfsCluster.transitionToObserver(0);
} catch (ServiceFailedException e) {
return;
}
fail("active cannot be transitioned to observer");
}
/**
* Test that non-ClientProtocol proxies such as
* {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work
* when run in an environment with observers.
*/
@Test
public void testGetGroups() throws Exception {
GetGroups getGroups = new GetGroups(conf);
assertEquals(0, getGroups.run(new String[0]));
}
@Test
public void testNoObserverToActive() throws Exception {
try {
dfsCluster.transitionToActive(2);
} catch (ServiceFailedException e) {
return;
}
fail("observer cannot be transitioned to active");
}
@Test
public void testSimpleRead() throws Exception {
Path testPath2 = new Path(testPath, "test2");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(0);
}
@Test
public void testFailover() throws Exception {
Path testPath2 = new Path(testPath, "test2");
setObserverRead(false);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfs.getFileStatus(testPath);
assertSentTo(0);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(1);
dfs.getFileStatus(testPath);
assertSentTo(1);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testDoubleFailover() throws Exception {
Path testPath2 = new Path(testPath, "test2");
Path testPath3 = new Path(testPath, "test3");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
dfsCluster.rollEditLogAndTail(1);
dfs.getFileStatus(testPath2);
assertSentTo(2);
dfs.mkdir(testPath3, FsPermission.getDefault());
assertSentTo(1);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath3);
assertSentTo(2);
dfs.delete(testPath3, false);
assertSentTo(0);
}
@Test
public void testObserverShutdown() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Shutdown the observer - requests should go to active
dfsCluster.shutdownNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(0);
// Start the observer again - requests should go to observer
dfsCluster.restartNameNode(2);
dfsCluster.transitionToObserver(2);
// The first request goes to the active because it has not refreshed yet;
// the second will properly go to the observer
dfs.getFileStatus(testPath);
dfs.getFileStatus(testPath);
assertSentTo(2);
}
@Test
public void testObserverFailOverAndShutdown() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
// Shutdown the observer - requests should go to active
dfsCluster.shutdownNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(1);
// Start the observer again - requests should go to observer
dfsCluster.restartNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(1);
dfsCluster.transitionToObserver(2);
dfs.getFileStatus(testPath);
// The first request goes to the active because it has not refreshed yet;
// the second will properly go to the observer
dfs.getFileStatus(testPath);
assertSentTo(2);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testBootstrap() throws Exception {
for (URI u : dfsCluster.getNameDirs(2)) {
File dir = new File(u.getPath());
assertTrue(FileUtil.fullyDelete(dir));
}
int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
dfsCluster.getConfiguration(2)
);
assertEquals(0, rc);
}
/**
* Test the case where Observer should throw RetriableException, just like
* active NN, for certain open() calls where block locations are not
* available. See HDFS-13898 for details.
*/
@Test
public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Create a new file - the request should go to active.
dfs.create(testPath, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.open(testPath).close();
assertSentTo(2);
// Set observer to safe mode.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_ENTER);
// Mock block manager for observer to generate some fake blocks which
// will trigger the (retriable) safe mode exception.
BlockManager bmSpy =
NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
Answer ans = new Answer() {
@Override
public LocatedBlocks answer(InvocationOnMock invocationOnMock) throws Throwable {
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
List<LocatedBlock> fakeBlocks = new ArrayList<>();
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(0, false, fakeBlocks, null, true, null);
}
};
Mockito.doAnswer(ans).when(bmSpy).createLocatedBlocks((BlockInfoContiguous[])any(),
anyLong(), anyBoolean(), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), (FileEncryptionInfo)any());
// Open the file again - it should throw retriable exception and then
// failover to active.
dfs.open(testPath).close();
assertSentTo(0);
Mockito.reset(bmSpy);
// Remove safe mode on observer, request should still go to it.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
dfs.open(testPath).close();
assertSentTo(2);
}
@Test
public void testObserverNodeBlockMissingRetry() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
// Mock block manager for observer to generate some fake blocks which
// will trigger the block missing exception.
BlockManager bmSpy = NameNodeAdapter
.spyOnBlockManager(dfsCluster.getNameNode(2));
final DatanodeInfo[] empty = {};
Answer ans = new Answer() {
@Override
public LocatedBlocks answer(InvocationOnMock invocationOnMock) throws Throwable {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Remove the datanode info for the only block so it will throw
// BlockMissingException and retry.
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, empty);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(0, false, fakeBlocks, null, true, null);
}
};
Mockito.doAnswer(ans).when(bmSpy).createLocatedBlocks((BlockInfoContiguous[])any(),
anyLong(), anyBoolean(), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), (FileEncryptionInfo)any());
dfs.open(testPath);
assertSentTo(0);
Mockito.reset(bmSpy);
}
@Test
public void testFsckWithObserver() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
final String result = TestFsck.runFsck(conf, 0, true, "/");
LOG.info("result=" + result);
assertTrue(result.contains("Status: HEALTHY"));
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
}
}