HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode fails. Contributed by jitendra.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1090419 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 813da7f..923576f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -255,6 +255,9 @@
for transferring RBW/Finalized with acknowledgement and without using RPC.
(szetszwo)
+ HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode
+ fails. (jitendra)
+
IMPROVEMENTS
HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 04c33b0..937e631 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1372,6 +1372,9 @@
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
+
+ // BlockPoolTokenSecretManager is required to create ipc server.
+ this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
initIpcServer(conf);
myMetrics = new DataNodeMetrics(conf, getMachineName());
@@ -2028,10 +2031,6 @@
dataXceiverServer.start();
ipcServer.start();
startPlugins(conf);
-
- // BlockPoolTokenSecretManager is created here, but it shouldn't be
- // used until it is initialized in register().
- this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
}
/**
@@ -2561,7 +2560,7 @@
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block,
- BlockTokenSecretManager.AccessMode.WRITE);
+ BlockTokenSecretManager.AccessMode.READ);
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 843df54..983e937 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -911,18 +911,17 @@
if (LOG.isDebugEnabled()) {
LOG.debug("last = " + last);
}
-
- if(isBlockTokenEnabled && needBlockToken) {
+
+ LocatedBlock lastBlock = last.isComplete() ? blockManager
+ .getBlockLocation(last, n - last.getNumBytes()) : blockManager
+ .getBlockLocation(last, n);
+
+ if (isBlockTokenEnabled && needBlockToken) {
setBlockTokens(locatedblocks);
+ setBlockToken(lastBlock);
}
-
- if (last.isComplete()) {
- return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
- } else {
- return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n), false);
- }
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ lastBlock, last.isComplete());
}
} finally {
readUnlock();
@@ -938,13 +937,16 @@
/** Generate block tokens for the blocks to be returned. */
private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
for(LocatedBlock l : locatedBlocks) {
- Token<BlockTokenIdentifier> token =
- blockTokenSecretManager.generateToken(l.getBlock(),
- EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
-
- l.setBlockToken(token);
+ setBlockToken(l);
}
}
+
+ /** Generate block token for a LocatedBlock. */
+ private void setBlockToken(LocatedBlock l) throws IOException {
+ Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
+ .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
+ l.setBlockToken(token);
+ }
/**
* Moves all the blocks from srcs and appends them to trg
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index f834d9c..45fd648 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -29,9 +29,16 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -46,6 +53,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.log4j.Level;
+import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
@@ -259,4 +267,43 @@
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
}
}
+
+ /**
+ * This test writes a file and gets the block locations without closing
+ * the file, and tests the block token in the last block. Block token is
+ * verified by ensuring it is of correct kind.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBlockTokenInLastLocatedBlock() throws IOException,
+ InterruptedException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(1)
+ .numDataNodes(1).build();
+ cluster.waitActive();
+
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ String fileName = "/testBlockTokenInLastLocatedBlock";
+ Path filePath = new Path(fileName);
+ FSDataOutputStream out = fs.create(filePath, (short) 1);
+ out.write(new byte[1000]);
+ LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations(
+ fileName, 0, 1000);
+ while (locatedBlocks.getLastLocatedBlock() == null) {
+ Thread.sleep(100);
+ locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0,
+ 1000);
+ }
+ Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()
+ .getBlockToken();
+ Assert.assertEquals(BlockTokenIdentifier.KIND_NAME, token.getKind());
+ out.close();
+ } finally {
+ cluster.shutdown();
+ }
+ }
}