blob: 0cadc832f7f076aaec6adeae516f065a4513c442 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests that StoragePolicySatisfier is able to work with HA enabled.
*/
public class TestStoragePolicySatisfierWithHA {
private MiniDFSCluster cluster = null;
private static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class);
private final Configuration config = new HdfsConfiguration();
private static final int DEFAULT_BLOCK_SIZE = 1024;
private DistributedFileSystem dfs = null;
private StorageType[][] allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
private int numOfDatanodes = 3;
private int storagesPerDatanode = 2;
private long capacity = 2 * 256 * 1024 * 1024;
private int nnIndex = 0;
private void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode,
capacity);
dfs = cluster.getFileSystem(nnIndex);
}
private void startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
for (int i = 0; i < numberOfDatanodes; i++) {
for (int j = 0; j < storagesPerDn; j++) {
capacities[i][j] = nodeCapacity;
}
}
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
.storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive();
cluster.transitionToActive(0);
}
/**
* Tests to verify that SPS should run/stop automatically when NN state
* changes between Standby and Active.
*/
@Test(timeout = 90000)
public void testWhenNNHAStateChanges() throws IOException {
try {
createCluster();
boolean running;
dfs = cluster.getFileSystem(1);
try {
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("Call this function to Standby NN should "
+ "raise an exception.");
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (!(cause instanceof StandbyException)) {
Assert.fail("Unexpected exception happened " + e);
}
}
cluster.transitionToActive(0);
dfs = cluster.getFileSystem(0);
running = dfs.getClient().isInternalSatisfierRunning();
Assert.assertTrue("StoragePolicySatisfier should be active "
+ "when NN transits from Standby to Active mode.", running);
// NN transits from Active to Standby
cluster.transitionToStandby(0);
try {
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("NN in Standby again, call this function should "
+ "raise an exception.");
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (!(cause instanceof StandbyException)) {
Assert.fail("Unexpected exception happened " + e);
}
}
try {
cluster.getNameNode(0).reconfigurePropertyImpl(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
Assert.fail("It's not allowed to enable or disable"
+ " StoragePolicySatisfier on Standby NameNode");
} catch (ReconfigurationException e) {
GenericTestUtils.assertExceptionContains("Could not change property "
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY
+ " from 'INTERNAL' to 'EXTERNAL'", e);
GenericTestUtils.assertExceptionContains(
"Enabling or disabling storage policy satisfier service on "
+ "standby NameNode is not allowed", e.getCause());
}
} finally {
cluster.shutdown();
}
}
/**
* Test to verify that during namenode switch over will add
* DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to
* drop all the SPS queues at datanode.
*/
@Test(timeout = 90000)
public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception {
try {
createCluster();
FSNamesystem fsn = cluster.getNamesystem(0);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
List<DatanodeDescriptor> listOfDns = new ArrayList<>();
for (DataNode dn : dataNodes) {
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
dn.getDatanodeId());
listOfDns.add(dnd);
}
cluster.shutdownDataNodes();
cluster.transitionToStandby(0);
LOG.info("**Transition to Active**");
cluster.transitionToActive(1);
// Verify that Standby-to-Active transition should set drop SPS flag to
// true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
// propagated to datanode during heartbeat response.
int retries = 20;
boolean dropSPSWork = false;
while (retries > 0) {
for (DatanodeDescriptor dnd : listOfDns) {
dropSPSWork = dnd.shouldDropSPSWork();
if (!dropSPSWork) {
retries--;
Thread.sleep(250);
break;
}
}
if (dropSPSWork) {
break;
}
}
Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
} finally {
cluster.shutdown();
}
}
/**
* Test to verify that SPS work will be dropped once the datanode is marked as
* expired. Internally 'dropSPSWork' flag is set as true while expiration and
* at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that
* datanode.
*/
@Test(timeout = 90000)
public void testDeadDatanode() throws Exception {
int heartbeatExpireInterval = 2 * 2000;
config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
3000);
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L);
createCluster();
DataNode dn = cluster.getDataNodes().get(0);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
FSNamesystem fsn = cluster.getNamesystem(0);
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
dn.getDatanodeId());
boolean isDead = false;
int retries = 20;
while (retries > 0) {
isDead = dnd.getLastUpdateMonotonic() < (monotonicNow()
- heartbeatExpireInterval);
if (isDead) {
break;
}
retries--;
Thread.sleep(250);
}
Assert.assertTrue("Datanode is alive", isDead);
// Disable datanode heartbeat, so that the datanode will get expired after
// the recheck interval and become dead.
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
// Verify that datanode expiration will set drop SPS flag to
// true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
// propagated to datanode during reconnection.
boolean dropSPSWork = false;
retries = 50;
while (retries > 0) {
dropSPSWork = dnd.shouldDropSPSWork();
if (dropSPSWork) {
break;
}
retries--;
Thread.sleep(100);
}
Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
}
}