blob: 92ec67ea58edacf9365a775806e20e1fb6418f35 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.gateway.shell;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.ssl.SSLContexts;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Hadoop implements Closeable {
private static final String GATEWAY_CLIENT_TRUST_DEFAULT_PASS = "changeit";
private static final String KNOX_CLIENT_TRUSTSTORE_PASS = "KNOX_CLIENT_TRUSTSTORE_PASS";
private static final String GATEWAY_CLIENT_TRUST = "gateway-client-trust.jks";
private static final String KNOX_CLIENT_TRUSTSTORE_FILENAME = "KNOX_CLIENT_TRUSTSTORE_FILENAME";
private static final String KNOX_CLIENT_TRUSTSTORE_DIR = "KNOX_CLIENT_TRUSTSTORE_DIR";
String base;
HttpHost host;
CloseableHttpClient client;
BasicHttpContext context;
ExecutorService executor;
Map<String, String> headers = new HashMap<>();
public Map<String, String> getHeaders() {
return headers;
}
public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
public static Hadoop login( String url, Map<String,String> headers ) throws URISyntaxException {
Hadoop instance = new Hadoop(ClientContext.with(url));
instance.setHeaders(headers);
return instance;
}
public static Hadoop login( String url, String username, String password ) throws URISyntaxException {
return new Hadoop(ClientContext.with(username, password, url));
}
public static Hadoop loginInsecure(String url, String username, String password) throws URISyntaxException {
return new Hadoop(ClientContext.with(username, password, url)
.connection().secure(false).end());
}
public Hadoop( ClientContext clientContext) throws HadoopException, URISyntaxException {
this.executor = Executors.newCachedThreadPool();
this.base = clientContext.url();
try {
client = createClient(clientContext);
} catch (GeneralSecurityException e) {
throw new HadoopException("Failed to create HTTP client.", e);
}
}
private CloseableHttpClient createClient(ClientContext clientContext) throws GeneralSecurityException {
// SSL
HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE;
TrustStrategy trustStrategy = null;
if (clientContext.connection().secure()) {
hostnameVerifier = SSLConnectionSocketFactory.getDefaultHostnameVerifier();
} else {
trustStrategy = TrustSelfSignedStrategy.INSTANCE;
System.out.println("**************** WARNING ******************\n"
+ "This is an insecure client instance and may\n"
+ "leave the interactions subject to a man in\n"
+ "the middle attack. Please use the login()\n"
+ "method instead of loginInsecure() for any\n"
+ "sensitive or production usecases.\n"
+ "*******************************************");
}
KeyStore trustStore = getTrustStore();
SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(trustStore, trustStrategy).build();
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", new SSLConnectionSocketFactory(sslContext, hostnameVerifier)).build();
// Pool
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);
connectionManager.setMaxTotal(clientContext.pool().maxTotal());
connectionManager.setDefaultMaxPerRoute(clientContext.pool().defaultMaxPerRoute());
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setBufferSize(clientContext.connection().bufferSize())
.build();
connectionManager.setDefaultConnectionConfig(connectionConfig);
SocketConfig socketConfig = SocketConfig.custom()
.setSoKeepAlive(clientContext.socket().keepalive())
.setSoLinger(clientContext.socket().linger())
.setSoReuseAddress(clientContext.socket().reuseAddress())
.setSoTimeout(clientContext.socket().timeout())
.setTcpNoDelay(clientContext.socket().tcpNoDelay())
.build();
connectionManager.setDefaultSocketConfig(socketConfig);
// Auth
URI uri = URI.create(clientContext.url());
host = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
CredentialsProvider credentialsProvider = null;
if (clientContext.username() != null && clientContext.password() != null) {
credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
new AuthScope(host.getHostName(), host.getPort()),
new UsernamePasswordCredentials(clientContext.username(), clientContext.password()));
AuthCache authCache = new BasicAuthCache();
BasicScheme authScheme = new BasicScheme();
authCache.put(host, authScheme);
context = new BasicHttpContext();
context.setAttribute(org.apache.http.client.protocol.HttpClientContext.AUTH_CACHE, authCache);
}
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultCredentialsProvider(credentialsProvider)
.build();
}
private static KeyStore getTrustStore() throws GeneralSecurityException {
KeyStore ks = null;
String truststoreDir = System.getenv(KNOX_CLIENT_TRUSTSTORE_DIR);
if (truststoreDir == null) {
truststoreDir = System.getProperty("user.home");
}
String truststoreFileName = System.getenv(KNOX_CLIENT_TRUSTSTORE_FILENAME);
if (truststoreFileName == null) {
truststoreFileName = GATEWAY_CLIENT_TRUST;
}
String truststorePass = System.getenv(KNOX_CLIENT_TRUSTSTORE_PASS);
if (truststorePass == null) {
truststorePass = GATEWAY_CLIENT_TRUST_DEFAULT_PASS;
}
InputStream is = null;
try {
ks = KeyStore.getInstance("JKS");
File file = new File(truststoreDir, truststoreFileName);
if (!file.exists()) {
String truststore = System.getProperty("javax.net.ssl.trustStore");
if (truststore == null) {
truststoreDir = System.getProperty("java.home");
truststore = truststoreDir + File.separator + "lib" + File.separator
+ "security" + File.separator + "cacerts";
truststorePass = System.getProperty("javax.net.ssl.trustStorePassword", "changeit");
}
file = new File(truststore);
}
if (file.exists()) {
is = new FileInputStream(file);
ks.load(is, truststorePass.toCharArray());
}
else {
throw new HadoopException("Unable to find a truststore for secure login."
+ "Please import the gateway-identity certificate into the JVM"
+ " truststore or set the truststore location ENV variables.");
}
} catch (KeyStoreException e) {
throw new HadoopException("Unable to create keystore of expected type.", e);
} catch (FileNotFoundException e) {
throw new HadoopException("Unable to read truststore."
+ " Please import the gateway-identity certificate into the JVM"
+ " truststore or set the truststore location ENV variables.", e);
} catch (NoSuchAlgorithmException e) {
throw new HadoopException("Unable to load the truststore."
+ " Please import the gateway-identity certificate into the JVM"
+ " truststore or set the truststore location ENV variables.", e);
} catch (CertificateException e) {
throw new HadoopException("Certificate cannot be found in the truststore."
+ " Please import the gateway-identity certificate into the JVM"
+ " truststore or set the truststore location ENV variables.", e);
} catch (IOException e) {
throw new HadoopException("Unable to load truststore."
+ " May be related to password setting or truststore format.", e);
} finally {
IOUtils.closeQuietly(is);
}
return ks;
}
public String base() {
return base;
}
public CloseableHttpResponse executeNow(HttpRequest request ) throws IOException {
CloseableHttpResponse response = client.execute( host, request, context );
if( response.getStatusLine().getStatusCode() < 400 ) {
return response;
} else {
throw new ErrorResponse( response );
}
}
public <T> Future<T> executeLater( Callable<T> callable ) {
return executor.submit( callable );
}
public void waitFor( Future<?>... futures ) throws ExecutionException, InterruptedException {
if( futures != null ) {
for( Future future : futures ) {
future.get();
}
}
}
public void waitFor( long timeout, TimeUnit units, Future<?>... futures ) throws ExecutionException, TimeoutException, InterruptedException {
if( futures != null ) {
timeout = TimeUnit.MILLISECONDS.convert( timeout, units );
long start;
for( Future future : futures ) {
start = System.currentTimeMillis();
future.get( timeout, TimeUnit.MILLISECONDS );
timeout -= ( System.currentTimeMillis() - start );
}
}
}
public void shutdown() throws InterruptedException {
executor.shutdownNow();
}
public boolean shutdown( long timeout, TimeUnit unit ) throws InterruptedException {
executor.shutdown();
return executor.awaitTermination( timeout, unit );
}
@Override
public void close() throws IOException {
try {
shutdown();
} catch (InterruptedException e) {
throw new HadoopException("Can not shutdown underlying resources", e);
} finally {
client.close();
}
}
}