blob: 12fa21103b3da2232b9b36da461577dff8b2d261 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRetryCacheWithHA {
private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
private static final int BlockSize = 1024;
private static final short DataNodes = 3;
private static final int CHECKTIMES = 10;
private static final int ResponseSize = 3;
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private final Configuration conf = new HdfsConfiguration();
/**
* A dummy invocation handler extending RetryInvocationHandler. We can use
* a boolean flag to control whether the method invocation succeeds or not.
*/
private static class DummyRetryInvocationHandler extends
RetryInvocationHandler<ClientProtocol> {
static final AtomicBoolean block = new AtomicBoolean(false);
DummyRetryInvocationHandler(
FailoverProxyProvider<ClientProtocol> proxyProvider,
RetryPolicy retryPolicy) {
super(proxyProvider, retryPolicy);
}
@Override
protected Object invokeMethod(Method method, Object[] args)
throws Throwable {
Object result = super.invokeMethod(method, args);
if (block.get()) {
throw new UnknownHostException("Fake Exception");
} else {
return result;
}
}
}
@Before
public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(DataNodes).build();
cluster.waitActive();
cluster.transitionToActive(0);
// setup the configuration
HATestUtil.setFailoverConfigurations(cluster, conf);
dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf);
}
@After
public void cleanup() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* 1. Run a set of operations
* 2. Trigger the NN failover
* 3. Check the retry cache on the original standby NN
*/
@Test (timeout=60000)
public void testRetryCacheOnStandbyNN() throws Exception {
// 1. run operations
DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0);
// check retry cache in NN1
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals("Retry cache size is wrong", 26, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
Iterator<CacheEntry> iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
oldEntries.put(entry, entry);
}
// 2. Failover the current standby to active.
cluster.getNameNode(0).getRpcServer().rollEditLog();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// 3. check the retry cache on the new active NN
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals("Retry cache size is wrong", 26, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
assertTrue(oldEntries.containsKey(entry));
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxiesClient.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT,
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
abstract class AtMostOnceOp {
private final String name;
final DFSClient client;
int expectedUpdateCount = 0;
AtMostOnceOp(String name, DFSClient client) {
this.name = name;
this.client = client;
}
abstract void prepare() throws Exception;
abstract void invoke() throws Exception;
abstract boolean checkNamenodeBeforeReturn() throws Exception;
abstract Object getResult();
int getExpectedCacheUpdateCount() {
return expectedUpdateCount;
}
}
/** createSnapshot operaiton */
class CreateSnapshotOp extends AtMostOnceOp {
private String snapshotPath;
private final String dir;
private final String snapshotName;
CreateSnapshotOp(DFSClient client, String dir, String snapshotName) {
super("createSnapshot", client);
this.dir = dir;
this.snapshotName = snapshotName;
}
@Override
void prepare() throws Exception {
final Path dirPath = new Path(dir);
if (!dfs.exists(dirPath)) {
dfs.mkdirs(dirPath);
dfs.allowSnapshot(dirPath);
}
}
@Override
void invoke() throws Exception {
this.snapshotPath = client.createSnapshot(dir, snapshotName);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
snapshotName);
boolean snapshotCreated = dfs.exists(sPath);
for (int i = 0; i < CHECKTIMES && !snapshotCreated; i++) {
Thread.sleep(1000);
snapshotCreated = dfs.exists(sPath);
}
return snapshotCreated;
}
@Override
Object getResult() {
return snapshotPath;
}
}
/** deleteSnapshot */
class DeleteSnapshotOp extends AtMostOnceOp {
private final String dir;
private final String snapshotName;
DeleteSnapshotOp(DFSClient client, String dir, String snapshotName) {
super("deleteSnapshot", client);
this.dir = dir;
this.snapshotName = snapshotName;
}
@Override
void prepare() throws Exception {
final Path dirPath = new Path(dir);
if (!dfs.exists(dirPath)) {
dfs.mkdirs(dirPath);
}
Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, snapshotName);
if (!dfs.exists(sPath)) {
dfs.allowSnapshot(dirPath);
dfs.createSnapshot(dirPath, snapshotName);
}
}
@Override
void invoke() throws Exception {
client.deleteSnapshot(dir, snapshotName);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
snapshotName);
boolean snapshotNotDeleted = dfs.exists(sPath);
for (int i = 0; i < CHECKTIMES && snapshotNotDeleted; i++) {
Thread.sleep(1000);
snapshotNotDeleted = dfs.exists(sPath);
}
return !snapshotNotDeleted;
}
@Override
Object getResult() {
return null;
}
}
/** renameSnapshot */
class RenameSnapshotOp extends AtMostOnceOp {
private final String dir;
private final String oldName;
private final String newName;
RenameSnapshotOp(DFSClient client, String dir, String oldName,
String newName) {
super("renameSnapshot", client);
this.dir = dir;
this.oldName = oldName;
this.newName = newName;
}
@Override
void prepare() throws Exception {
final Path dirPath = new Path(dir);
if (!dfs.exists(dirPath)) {
dfs.mkdirs(dirPath);
}
Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, oldName);
if (!dfs.exists(sPath)) {
dfs.allowSnapshot(dirPath);
dfs.createSnapshot(dirPath, oldName);
}
}
@Override
void invoke() throws Exception {
client.renameSnapshot(dir, oldName, newName);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
newName);
boolean snapshotRenamed = dfs.exists(sPath);
for (int i = 0; i < CHECKTIMES && !snapshotRenamed; i++) {
Thread.sleep(1000);
snapshotRenamed = dfs.exists(sPath);
}
return snapshotRenamed;
}
@Override
Object getResult() {
return null;
}
}
/** create file operation (without OverWrite) */
class CreateOp extends AtMostOnceOp {
private final String fileName;
private HdfsFileStatus status;
CreateOp(DFSClient client, String fileName) {
super("create", client);
this.fileName = fileName;
}
@Override
void prepare() throws Exception {
final Path filePath = new Path(fileName);
if (dfs.exists(filePath)) {
dfs.delete(filePath, true);
}
final Path fileParent = filePath.getParent();
if (!dfs.exists(fileParent)) {
dfs.mkdirs(fileParent);
}
}
@Override
void invoke() throws Exception {
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
this.status = client.getNamenode().create(fileName,
FsPermission.getFileDefault(), client.getClientName(),
new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
BlockSize,
new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES});
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
final Path filePath = new Path(fileName);
boolean fileCreated = dfs.exists(filePath);
for (int i = 0; i < CHECKTIMES && !fileCreated; i++) {
Thread.sleep(1000);
fileCreated = dfs.exists(filePath);
}
return fileCreated;
}
@Override
Object getResult() {
return status;
}
}
/** append operation */
class AppendOp extends AtMostOnceOp {
private final String fileName;
private LastBlockWithStatus lbk;
AppendOp(DFSClient client, String fileName) {
super("append", client);
this.fileName = fileName;
}
@Override
void prepare() throws Exception {
final Path filePath = new Path(fileName);
if (!dfs.exists(filePath)) {
DFSTestUtil.createFile(dfs, filePath, BlockSize / 2, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
lbk = client.getNamenode().append(fileName, client.getClientName(),
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
}
// check if the inode of the file is under construction
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
INodeFile fileNode = cluster.getNameNode(0).getNamesystem()
.getFSDirectory().getINode4Write(fileName).asFile();
boolean fileIsUC = fileNode.isUnderConstruction();
for (int i = 0; i < CHECKTIMES && !fileIsUC; i++) {
Thread.sleep(1000);
fileNode = cluster.getNameNode(0).getNamesystem().getFSDirectory()
.getINode4Write(fileName).asFile();
fileIsUC = fileNode.isUnderConstruction();
}
return fileIsUC;
}
@Override
Object getResult() {
return lbk;
}
}
/** rename */
class RenameOp extends AtMostOnceOp {
private final String oldName;
private final String newName;
private boolean renamed;
RenameOp(DFSClient client, String oldName, String newName) {
super("rename", client);
this.oldName = oldName;
this.newName = newName;
}
@Override
void prepare() throws Exception {
final Path filePath = new Path(oldName);
if (!dfs.exists(filePath)) {
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
}
}
@SuppressWarnings("deprecation")
@Override
void invoke() throws Exception {
this.renamed = client.rename(oldName, newName);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
Path targetPath = new Path(newName);
boolean renamed = dfs.exists(targetPath);
for (int i = 0; i < CHECKTIMES && !renamed; i++) {
Thread.sleep(1000);
renamed = dfs.exists(targetPath);
}
return renamed;
}
@Override
Object getResult() {
return new Boolean(renamed);
}
}
/** rename2 */
class Rename2Op extends AtMostOnceOp {
private final String oldName;
private final String newName;
Rename2Op(DFSClient client, String oldName, String newName) {
super("rename2", client);
this.oldName = oldName;
this.newName = newName;
}
@Override
void prepare() throws Exception {
final Path filePath = new Path(oldName);
if (!dfs.exists(filePath)) {
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
client.rename(oldName, newName, Rename.OVERWRITE);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
Path targetPath = new Path(newName);
boolean renamed = dfs.exists(targetPath);
for (int i = 0; i < CHECKTIMES && !renamed; i++) {
Thread.sleep(1000);
renamed = dfs.exists(targetPath);
}
return renamed;
}
@Override
Object getResult() {
return null;
}
}
/** concat */
class ConcatOp extends AtMostOnceOp {
private final String target;
private final String[] srcs;
private final Path[] srcPaths;
ConcatOp(DFSClient client, Path target, int numSrc) {
super("concat", client);
this.target = target.toString();
this.srcs = new String[numSrc];
this.srcPaths = new Path[numSrc];
Path parent = target.getParent();
for (int i = 0; i < numSrc; i++) {
srcPaths[i] = new Path(parent, "srcfile" + i);
srcs[i] = srcPaths[i].toString();
}
}
@Override
void prepare() throws Exception {
final Path targetPath = new Path(target);
DFSTestUtil.createFile(dfs, targetPath, BlockSize, DataNodes, 0);
for (int i = 0; i < srcPaths.length; i++) {
DFSTestUtil.createFile(dfs, srcPaths[i], BlockSize, DataNodes, 0);
}
assertEquals(BlockSize, dfs.getFileStatus(targetPath).getLen());
}
@Override
void invoke() throws Exception {
client.concat(target, srcs);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
Path targetPath = new Path(target);
boolean done = dfs.getFileStatus(targetPath).getLen() == BlockSize
* (srcs.length + 1);
for (int i = 0; i < CHECKTIMES && !done; i++) {
Thread.sleep(1000);
done = dfs.getFileStatus(targetPath).getLen() == BlockSize
* (srcs.length + 1);
}
return done;
}
@Override
Object getResult() {
return null;
}
}
/** delete */
class DeleteOp extends AtMostOnceOp {
private final String target;
private boolean deleted;
DeleteOp(DFSClient client, String target) {
super("delete", client);
this.target = target;
}
@Override
void prepare() throws Exception {
Path p = new Path(target);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
deleted = client.delete(target, true);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
Path targetPath = new Path(target);
boolean del = !dfs.exists(targetPath);
for (int i = 0; i < CHECKTIMES && !del; i++) {
Thread.sleep(1000);
del = !dfs.exists(targetPath);
}
return del;
}
@Override
Object getResult() {
return new Boolean(deleted);
}
}
/** createSymlink */
class CreateSymlinkOp extends AtMostOnceOp {
private final String target;
private final String link;
public CreateSymlinkOp(DFSClient client, String target, String link) {
super("createSymlink", client);
this.target = target;
this.link = link;
}
@Override
void prepare() throws Exception {
Path p = new Path(target);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.createSymlink(target, link, false);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
Path linkPath = new Path(link);
FileStatus linkStatus = null;
for (int i = 0; i < CHECKTIMES && linkStatus == null; i++) {
try {
linkStatus = dfs.getFileLinkStatus(linkPath);
} catch (FileNotFoundException fnf) {
// Ignoring, this can be legitimate.
Thread.sleep(1000);
}
}
return linkStatus != null;
}
@Override
Object getResult() {
return null;
}
}
/** updatePipeline */
class UpdatePipelineOp extends AtMostOnceOp {
private final String file;
private ExtendedBlock oldBlock;
private ExtendedBlock newBlock;
private DatanodeInfo[] nodes;
private FSDataOutputStream out;
public UpdatePipelineOp(DFSClient client, String file) {
super("updatePipeline", client);
this.file = file;
}
@Override
void prepare() throws Exception {
final Path filePath = new Path(file);
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
// append to the file and leave the last block under construction
out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
null, null);
byte[] appendContent = new byte[100];
new Random().nextBytes(appendContent);
out.write(appendContent);
((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
LocatedBlocks blks = dfs.getClient()
.getLocatedBlocks(file, BlockSize + 1);
assertEquals(1, blks.getLocatedBlocks().size());
nodes = blks.get(0).getLocations();
oldBlock = blks.get(0).getBlock();
LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline(
oldBlock, client.getClientName());
newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(),
oldBlock.getBlockId(), oldBlock.getNumBytes(),
newLbk.getBlock().getGenerationStamp());
}
@Override
void invoke() throws Exception {
DatanodeInfo[] newNodes = new DatanodeInfo[2];
newNodes[0] = nodes[0];
newNodes[1] = nodes[1];
final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager()
.getDatanodeManager();
final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0]
.getStorageID();
final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0]
.getStorageID();
String[] storageIDs = {storageID1, storageID2};
client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
newBlock, newNodes, storageIDs);
// close can fail if the out.close() commit the block after block received
// notifications from Datanode.
// Since datanodes and output stream have still old genstamps, these
// blocks will be marked as corrupt after HDFS-5723 if RECEIVED
// notifications reaches namenode first and close() will fail.
DFSTestUtil.abortStream((DFSOutputStream) out.getWrappedStream());
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
.getINode4Write(file).asFile();
BlockInfo blkUC = (fileNode.getBlocks())[1];
int datanodeNum = blkUC.getUnderConstructionFeature()
.getExpectedStorageLocations().length;
for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
Thread.sleep(1000);
datanodeNum = blkUC.getUnderConstructionFeature()
.getExpectedStorageLocations().length;
}
return datanodeNum == 2;
}
@Override
Object getResult() {
return null;
}
}
/** addCacheDirective */
class AddCacheDirectiveInfoOp extends AtMostOnceOp {
private final CacheDirectiveInfo directive;
private Long result;
AddCacheDirectiveInfoOp(DFSClient client,
CacheDirectiveInfo directive) {
super("addCacheDirective", client);
this.directive = directive;
}
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return result;
}
}
/** modifyCacheDirective */
class ModifyCacheDirectiveInfoOp extends AtMostOnceOp {
private final CacheDirectiveInfo directive;
private final short newReplication;
private long id;
ModifyCacheDirectiveInfoOp(DFSClient client,
CacheDirectiveInfo directive, short newReplication) {
super("modifyCacheDirective", client);
this.directive = directive;
this.newReplication = newReplication;
}
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication(newReplication).
build(), EnumSet.of(CacheFlag.FORCE));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
while (iter.hasNext()) {
CacheDirectiveInfo result = iter.next().getInfo();
if ((result.getId() == id) &&
(result.getReplication().shortValue() == newReplication)) {
return true;
}
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** removeCacheDirective */
class RemoveCacheDirectiveInfoOp extends AtMostOnceOp {
private final CacheDirectiveInfo directive;
private long id;
RemoveCacheDirectiveInfoOp(DFSClient client, String pool,
String path) {
super("removeCacheDirective", client);
this.directive = new CacheDirectiveInfo.Builder().
setPool(pool).
setPath(new Path(path)).
build();
}
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeCacheDirective(id);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** addCachePool */
class AddCachePoolOp extends AtMostOnceOp {
private final String pool;
AddCachePoolOp(DFSClient client, String pool) {
super("addCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** modifyCachePool */
class ModifyCachePoolOp extends AtMostOnceOp {
final String pool;
ModifyCachePoolOp(DFSClient client, String pool) {
super("modifyCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (iter.hasNext() && iter.next().getInfo().getLimit() == 99) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** removeCachePool */
class RemoveCachePoolOp extends AtMostOnceOp {
private final String pool;
RemoveCachePoolOp(DFSClient client, String pool) {
super("removeCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeCachePool(pool);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** setXAttr */
class SetXAttrOp extends AtMostOnceOp {
private final String src;
SetXAttrOp(DFSClient client, String src) {
super("setXAttr", client);
this.src = src;
}
@Override
void prepare() throws Exception {
Path p = new Path(src);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
Set<String> keySet = iter.keySet();
if (keySet.contains("user.key")) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** removeXAttr */
class RemoveXAttrOp extends AtMostOnceOp {
private final String src;
RemoveXAttrOp(DFSClient client, String src) {
super("removeXAttr", client);
this.src = src;
}
@Override
void prepare() throws Exception {
Path p = new Path(src);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE));
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeXAttr(src, "user.key");
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
Set<String> keySet = iter.keySet();
if (!keySet.contains("user.key")) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
@Test (timeout=60000)
public void testCreateSnapshot() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new CreateSnapshotOp(client, "/test", "s1");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testDeleteSnapshot() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new DeleteSnapshotOp(client, "/test", "s1");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRenameSnapshot() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RenameSnapshotOp(client, "/test", "s1", "s2");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testCreate() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new CreateOp(client, "/testfile");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAppend() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AppendOp(client, "/testfile");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRename() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RenameOp(client, "/file1", "/file2");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRename2() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new Rename2Op(client, "/file1", "/file2");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testConcat() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ConcatOp(client, new Path("/test/file"), 5);
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testDelete() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new DeleteOp(client, "/testfile");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testCreateSymlink() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new CreateSymlinkOp(client, "/testfile", "/testlink");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testUpdatePipeline() throws Exception {
final DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new UpdatePipelineOp(client, "/testfile");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAddCacheDirectiveInfo() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddCacheDirectiveInfoOp(client,
new CacheDirectiveInfo.Builder().
setPool("pool").
setPath(new Path("/path")).
build());
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testModifyCacheDirectiveInfo() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ModifyCacheDirectiveInfoOp(client,
new CacheDirectiveInfo.Builder().
setPool("pool").
setPath(new Path("/path")).
setReplication((short)1).build(),
(short)555);
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemoveCacheDescriptor() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool",
"/path");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAddCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testModifyCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ModifyCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemoveCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testSetXAttr() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new SetXAttrOp(client, "/setxattr");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemoveXAttr() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveXAttrOp(client, "/removexattr");
testClientRetryWithFailover(op);
}
/**
* When NN failover happens, if the client did not receive the response and
* send a retry request to the other NN, the same response should be recieved
* based on the retry cache.
*/
public void testClientRetryWithFailover(final AtMostOnceOp op)
throws Exception {
final Map<String, Object> results = new HashMap<String, Object>();
op.prepare();
// set DummyRetryInvocationHandler#block to true
DummyRetryInvocationHandler.block.set(true);
new Thread() {
@Override
public void run() {
try {
op.invoke();
Object result = op.getResult();
LOG.info("Operation " + op.name + " finished");
synchronized (TestRetryCacheWithHA.this) {
results.put(op.name, result == null ? "SUCCESS" : result);
TestRetryCacheWithHA.this.notifyAll();
}
} catch (Exception e) {
LOG.info("Got Exception while calling " + op.name, e);
} finally {
IOUtils.cleanup(null, op.client);
}
}
}.start();
// make sure the client's call has actually been handled by the active NN
assertTrue("After waiting the operation " + op.name
+ " still has not taken effect on NN yet",
op.checkNamenodeBeforeReturn());
// force the failover
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// disable the block in DummyHandler
LOG.info("Setting block to false");
DummyRetryInvocationHandler.block.set(false);
synchronized (this) {
while (!results.containsKey(op.name)) {
this.wait();
}
LOG.info("Got the result of " + op.name + ": "
+ results.get(op.name));
}
// Waiting for failover.
while (cluster.getNamesystem(1).isInStandbyState()) {
Thread.sleep(10);
}
long hitNN0 = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
.getCacheHit();
long hitNN1 = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
.getCacheHit();
assertTrue("CacheHit: " + hitNN0 + ", " + hitNN1,
hitNN0 + hitNN1 > 0);
long updatedNN0 = cluster.getNamesystem(0).getRetryCache()
.getMetricsForTests().getCacheUpdated();
long updatedNN1 = cluster.getNamesystem(1).getRetryCache()
.getMetricsForTests().getCacheUpdated();
// Cache updated metrics on NN0 should be >0 since the op was process on NN0
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
if (expectedUpdateCount > 0) {
assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
updatedNN0);
assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
updatedNN1);
}
}
/**
* Add a list of cache pools, list cache pools,
* switch active NN, and list cache pools again.
*/
@Test (timeout=60000)
public void testListCachePools() throws Exception {
final int poolCount = 7;
HashSet<String> poolNames = new HashSet<String>(poolCount);
for (int i=0; i<poolCount; i++) {
String poolName = "testListCachePools-" + i;
dfs.addCachePool(new CachePoolInfo(poolName));
poolNames.add(poolName);
}
listCachePools(poolNames, 0);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
listCachePools(poolNames, 1);
}
/**
* Add a list of cache directives, list cache directives,
* switch active NN, and list cache directives again.
*/
@Test (timeout=60000)
public void testListCacheDirectives() throws Exception {
final int poolCount = 7;
HashSet<String> poolNames = new HashSet<String>(poolCount);
Path path = new Path("/p");
for (int i=0; i<poolCount; i++) {
String poolName = "testListCacheDirectives-" + i;
CacheDirectiveInfo directiveInfo =
new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
dfs.addCachePool(new CachePoolInfo(poolName));
dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
poolNames.add(poolName);
}
listCacheDirectives(poolNames, 0);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
listCacheDirectives(poolNames, 1);
}
@SuppressWarnings("unchecked")
private void listCachePools(
HashSet<String> poolNames, int active) throws Exception {
HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
RemoteIterator<CachePoolEntry> pools = dfs.listCachePools();
int poolCount = poolNames.size();
for (int i=0; i<poolCount; i++) {
CachePoolEntry pool = pools.next();
String pollName = pool.getInfo().getPoolName();
assertTrue("The pool name should be expected", tmpNames.remove(pollName));
if (i % 2 == 0) {
int standby = active;
active = (standby == 0) ? 1 : 0;
cluster.transitionToStandby(standby);
cluster.transitionToActive(active);
cluster.waitActive(active);
}
}
assertTrue("All pools must be found", tmpNames.isEmpty());
}
@SuppressWarnings("unchecked")
private void listCacheDirectives(
HashSet<String> poolNames, int active) throws Exception {
HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
RemoteIterator<CacheDirectiveEntry> directives = dfs.listCacheDirectives(null);
int poolCount = poolNames.size();
for (int i=0; i<poolCount; i++) {
CacheDirectiveEntry directive = directives.next();
String pollName = directive.getInfo().getPool();
assertTrue("The pool name should be expected", tmpNames.remove(pollName));
if (i % 2 == 0) {
int standby = active;
active = (standby == 0) ? 1 : 0;
cluster.transitionToStandby(standby);
cluster.transitionToActive(active);
cluster.waitActive(active);
}
}
assertTrue("All pools must be found", tmpNames.isEmpty());
}
}