blob: ace7c3bcdef024adf308d42b2a924a5871d16b2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.qjournal.TestSecureNNWithQJM;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Class for Kerberized test cases for {@link DFSInotifyEventInputStream}.
*/
public class TestDFSInotifyEventInputStreamKerberized {
private static final Logger LOG =
LoggerFactory.getLogger(TestDFSInotifyEventInputStreamKerberized.class);
private File baseDir;
private String keystoresDir;
private String sslConfDir;
private MiniKdc kdc;
private Configuration baseConf;
private Configuration conf;
private MiniQJMHACluster cluster;
private File generalHDFSKeytabFile;
private File nnKeytabFile;
@Rule
public Timeout timeout = new Timeout(180000);
@Test
public void testWithKerberizedCluster() throws Exception {
conf = new HdfsConfiguration(baseConf);
// make sure relogin can happen after tgt expiration.
conf.setInt(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 3);
// make sure the rpc connection is not reused
conf.setInt(IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, 100);
conf.setInt(IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 2000);
Client.setConnectTimeout(conf, 2000);
// force the remote journal to be the only edits dir, so we can test
// EditLogFileInputStream$URLLog behavior.
cluster = new MiniQJMHACluster.Builder(conf).setForceRemoteEditsOnly(true)
.build();
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
final UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI("hdfs",
generalHDFSKeytabFile.getAbsolutePath());
UserGroupInformation.setShouldRenewImmediatelyForTests(true);
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
LOG.info("Current user is: " + UserGroupInformation.getCurrentUser()
+ " login user is:" + UserGroupInformation.getLoginUser());
Configuration clientConf =
new Configuration(cluster.getDfsCluster().getConfiguration(0));
try (DistributedFileSystem clientFs =
(DistributedFileSystem) FileSystem.get(clientConf)) {
clientFs.mkdirs(new Path("/test"));
LOG.info("mkdir /test success");
final DFSInotifyEventInputStream eis =
clientFs.getInotifyEventStream();
// verify we can poll
EventBatch batch;
while ((batch = eis.poll()) != null) {
LOG.info("txid: " + batch.getTxid());
}
assertNull("poll should not return anything", eis.poll());
Thread.sleep(6000);
LOG.info("Slept 6 seconds to make sure the TGT has expired.");
UserGroupInformation.getCurrentUser()
.checkTGTAndReloginFromKeytab();
clientFs.mkdirs(new Path("/test1"));
LOG.info("mkdir /test1 success");
// verify we can poll after a tgt expiration interval
batch = eis.poll();
assertNotNull("poll should return something", batch);
assertEquals(1, batch.getEvents().length);
assertNull("poll should not return anything", eis.poll());
return null;
}
}
});
}
@Before
public void initKerberizedCluster() throws Exception {
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
TestDFSInotifyEventInputStreamKerberized.class.getSimpleName());
FileUtil.fullyDelete(baseDir);
assertTrue(baseDir.mkdirs());
final Properties kdcConf = MiniKdc.createConf();
kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "5");
kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "5");
kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
baseConf = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS, baseConf);
UserGroupInformation.setConfiguration(baseConf);
assertTrue("Expected configuration to enable security",
UserGroupInformation.isSecurityEnabled());
final String userName = "hdfs";
nnKeytabFile = new File(baseDir, userName + ".keytab");
final String keytab = nnKeytabFile.getAbsolutePath();
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
final String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
kdc.createPrincipal(nnKeytabFile, userName + "/" + krbInstance,
"HTTP/" + krbInstance);
generalHDFSKeytabFile = new File(baseDir, "hdfs_general.keytab");
kdc.createPrincipal(generalHDFSKeytabFile, "hdfs");
assertTrue(generalHDFSKeytabFile.exists());
final String hdfsPrincipal =
userName + "/" + krbInstance + "@" + kdc.getRealm();
final String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
baseConf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
baseConf
.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
baseConf.set(DFS_JOURNALNODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
spnegoPrincipal);
baseConf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
baseConf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
baseConf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
keystoresDir = baseDir.getAbsolutePath();
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureNNWithQJM.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, baseConf, false);
baseConf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
KeyStoreTestUtil.getClientSSLConfigFileName());
baseConf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
KeyStoreTestUtil.getServerSSLConfigFileName());
}
@After
public void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
if (kdc != null) {
kdc.stop();
}
FileUtil.fullyDelete(baseDir);
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
}
}