blob: 7857c1ca2fee1f65d5754a4c54fadf7fe628a26c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.test;
import static org.apache.hadoop.security.ssl.SSLFactory.SSL_CLIENT_CONF_KEY;
import static org.junit.Assert.assertEquals;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.math.BigInteger;
import java.net.InetAddress;
import java.security.KeyPair;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.x500.X500Principal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.examples.TestOrderedWordCount;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.asn1.x509.GeneralNames;
import org.bouncycastle.asn1.x509.X509Extensions;
import org.bouncycastle.x509.X509V3CertificateGenerator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test to verify secure-shuffle (SSL mode) in Tez
*/
@RunWith(Parameterized.class)
public class TestSecureShuffle {
private static MiniDFSCluster miniDFSCluster;
private static MiniTezCluster miniTezCluster;
private static Configuration conf = new Configuration();
private static FileSystem fs;
private static Path inputLoc = new Path("/tmp/sample.txt");
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestSecureShuffle.class.getName() + "-tmpDir";
private static File keysStoresDir = new File(TEST_ROOT_DIR, "keystores");
private boolean enableSSLInCluster; //To set ssl config in cluster
private int resultWithTezSSL; //expected result with tez ssl setting
private int resultWithoutTezSSL; //expected result without tez ssl setting
private boolean asyncHttp;
public TestSecureShuffle(boolean sslInCluster, int resultWithTezSSL, int resultWithoutTezSSL,
boolean asyncHttp) {
this.enableSSLInCluster = sslInCluster;
this.resultWithTezSSL = resultWithTezSSL;
this.resultWithoutTezSSL = resultWithoutTezSSL;
this.asyncHttp = asyncHttp;
}
@Parameterized.Parameters(name = "test[sslInCluster:{0}, resultWithTezSSL:{1}, "
+ "resultWithoutTezSSL:{2}, asyncHttp:{3}]")
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
//enable ssl in cluster, succeed with tez-ssl enabled, fail with tez-ssl disabled
parameters.add(new Object[] { true, 0, 1, false });
//With asyncHttp
parameters.add(new Object[] { true, 0, 1, true });
parameters.add(new Object[] { false, 1, 0, true });
//Negative testcase
//disable ssl in cluster, fail with tez-ssl enabled, succeed with tez-ssl disabled
parameters.add(new Object[] { false, 1, 0, false });
return parameters;
}
@BeforeClass
public static void setupDFSCluster() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
fs = miniDFSCluster.getFileSystem();
conf.set("fs.defaultFS", fs.getUri().toString());
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
}
@AfterClass
public static void shutdownDFSCluster() {
if (miniDFSCluster != null) {
//shutdown
miniDFSCluster.shutdown();
}
}
@Before
public void setupTezCluster() throws Exception {
if (enableSSLInCluster) {
// Enable SSL debugging
System.setProperty("javax.net.debug", "all");
setupKeyStores();
}
conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, enableSSLInCluster);
// 3 seconds should be good enough in local machine
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
//set to low value so that it can detect failures quickly
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
//reduce the maximum number of failed attempts per task
conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, asyncHttp);
String sslConf = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
conf.addResource(sslConf);
miniTezCluster = new MiniTezCluster(TestSecureShuffle.class.getName() + "-" +
(enableSSLInCluster ? "withssl" : "withoutssl"), 1, 1, 1);
miniTezCluster.init(conf);
miniTezCluster.start();
createSampleFile(inputLoc);
}
@After
public void shutdownTezCluster() throws IOException {
if (miniTezCluster != null) {
miniTezCluster.stop();
}
}
private void baseTest(int expectedResult) throws Exception {
Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
TestOrderedWordCount wordCount = new TestOrderedWordCount();
wordCount.setConf(new Configuration(miniTezCluster.getConfig()));
String[] args = new String[] { "-DUSE_MR_CONFIGS=false",
inputLoc.toString(), outputLoc.toString() };
assertEquals(expectedResult, wordCount.run(args));
}
/**
* Verify whether shuffle works in mini cluster
*
* @throws Exception
*/
@Test(timeout = 500000)
public void testSecureShuffle() throws Exception {
//With tez-ssl setting
miniTezCluster.getConfig().setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true);
baseTest(this.resultWithTezSSL);
//Without tez-ssl setting
miniTezCluster.getConfig().setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, false);
baseTest(this.resultWithoutTezSSL);
}
/**
* Create sample file for wordcount program
*
* @param inputLoc
* @throws IOException
*/
private static void createSampleFile(Path inputLoc) throws IOException {
fs.deleteOnExit(inputLoc);
FSDataOutputStream out = fs.create(inputLoc);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
for (int i = 0; i < 10; i++) {
writer.write("Hello World");
writer.write("Some other line");
writer.newLine();
}
writer.close();
}
/**
* Create relevant keystores for test cluster
*
* @throws Exception
*/
private static void setupKeyStores() throws Exception {
keysStoresDir.mkdirs();
String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSecureShuffle.class);
setupSSLConfig(keysStoresDir.getAbsolutePath(), sslConfsDir, conf, true, true, "");
}
/**
* This is a copied version of hadoop's KeyStoreTestUtil.setupSSLConfig which was needed to create
* server certs with actual hostname in CN instead of "localhost". While upgrading async http
* client in TEZ-4237, it turned out that netty doesn't support custom hostname verifiers anymore
* (as discussed in https://github.com/AsyncHttpClient/async-http-client/issues/928), that's why
* it cannot be set for an async http connection. So instead of hacking an ALLOW_ALL verifier
* somehow (which cannot be propagated to netty), a valid certificate with the actual hostname
* should be generated in setupSSLConfig. So, one change is the usage of
* InetAddress.getLocalHost().getHostName(), the other is using local generateCertificate,
* which fixes another issue.
*/
public static void setupSSLConfig(String keystoresDir, String sslConfDir, Configuration config,
boolean useClientCert, boolean trustStore, String excludeCiphers) throws Exception {
String clientKS = keystoresDir + "/clientKS.jks";
String clientPassword = "clientP";
String serverKS = keystoresDir + "/serverKS.jks";
String serverPassword = "serverP";
String trustKS = null;
String trustPassword = "trustP";
File sslClientConfFile = new File(sslConfDir, KeyStoreTestUtil.getClientSSLConfigFileName());
File sslServerConfFile = new File(sslConfDir, KeyStoreTestUtil.getServerSSLConfigFileName());
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
if (useClientCert) {
KeyPair cKP = KeyStoreTestUtil.generateKeyPair("RSA");
X509Certificate cCert =
generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
KeyStoreTestUtil.createKeyStore(clientKS, clientPassword, "client", cKP.getPrivate(), cCert);
certs.put("client", cCert);
}
String localhostName = InetAddress.getLocalHost().getHostName();
KeyPair sKP = KeyStoreTestUtil.generateKeyPair("RSA");
X509Certificate sCert =
generateCertificate("CN="+localhostName+", O=server", sKP, 30, "SHA1withRSA");
KeyStoreTestUtil.createKeyStore(serverKS, serverPassword, "server", sKP.getPrivate(), sCert);
certs.put("server", sCert);
if (trustStore) {
trustKS = keystoresDir + "/trustKS.jks";
KeyStoreTestUtil.createTrustStore(trustKS, trustPassword, certs);
}
Configuration clientSSLConf = KeyStoreTestUtil.createClientSSLConfig(clientKS, clientPassword,
clientPassword, trustKS, excludeCiphers);
Configuration serverSSLConf = KeyStoreTestUtil.createServerSSLConfig(serverKS, serverPassword,
serverPassword, trustKS, excludeCiphers);
KeyStoreTestUtil.saveConfig(sslClientConfFile, clientSSLConf);
KeyStoreTestUtil.saveConfig(sslServerConfFile, serverSSLConf);
// this will be ignored for AsyncHttpConnection, see method comments above
config.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "ALLOW_ALL");
config.set(SSLFactory.SSL_CLIENT_CONF_KEY, sslClientConfFile.getName());
config.set(SSLFactory.SSL_SERVER_CONF_KEY, sslServerConfFile.getName());
config.setBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY, useClientCert);
}
/**
* This is a copied version of hadoop's KeyStoreTestUtil.generateCertificate, which takes care of setting
* IP address as a SSL Subject Alternative Name (SAN). Without this, SSL shuffle failed with async http client.
* Introduced by TEZ-4342.
*/
public static X509Certificate generateCertificate(String dn, KeyPair pair, int days, String algorithm)
throws Exception {
Date from = new Date();
Date to = new Date(from.getTime() + days * 86400000L);
BigInteger sn = new BigInteger(64, new SecureRandom());
KeyPair keyPair = pair;
X509V3CertificateGenerator certGen = new X509V3CertificateGenerator();
String hostAddress = InetAddress.getLocalHost().getHostAddress();
certGen.addExtension(X509Extensions.SubjectAlternativeName, false,
new GeneralNames(new GeneralName[] { new GeneralName(GeneralName.iPAddress, hostAddress),
new GeneralName(GeneralName.dNSName, "localhost") }));
X500Principal dnName = new X500Principal(dn);
certGen.setSerialNumber(sn);
certGen.setIssuerDN(dnName);
certGen.setNotBefore(from);
certGen.setNotAfter(to);
certGen.setSubjectDN(dnName);
certGen.setPublicKey(keyPair.getPublic());
certGen.setSignatureAlgorithm(algorithm);
X509Certificate cert = certGen.generate(pair.getPrivate());
return cert;
}
}