GIRAPH-1251
closes #150
diff --git a/checkstyle.xml b/checkstyle.xml
index f820d74..5462769 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -227,7 +227,7 @@
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
- <property name="max" value="210"/>
+ <property name="max" value="220"/>
</module>
<module name="ParameterNumber">
<property name="max" value="8"/>
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 229ac1d..7f0d3f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -241,6 +241,8 @@
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
+ /** Netty SSL Handler class */
+ private final NettySSLHandler nettySSLHandler;
/**
* Keeps track of the number of reconnect failures. Once this exceeds the
@@ -325,6 +327,12 @@
executionGroup = null;
}
+ if (conf.sslAuthenticate()) {
+ nettySSLHandler = new NettySSLHandler(true, conf);
+ } else {
+ nettySSLHandler = null;
+ }
+
workerGroup = new NioEventLoopGroup(maxPoolSize,
ThreadUtils.createThreadFactory(
"netty-client-worker-%d", exceptionHandler));
@@ -395,11 +403,19 @@
new SaslClientHandler(conf), handlerToUseExecutionGroup,
executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("response-handler",
- new ResponseClientHandler(NettyClient.this, conf),
+ new ResponseClientHandler(
+ NettyClient.this, conf, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
} else {
LOG.info("Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
+
+ if (conf.sslAuthenticate()) {
+ PipelineUtils.addLastWithExecutorCheck("sslHandler",
+ nettySSLHandler.getSslHandler(ch.alloc()),
+ handlerToUseExecutionGroup, executionGroup, ch);
+ }
+
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
@@ -429,8 +445,11 @@
PipelineUtils.addLastWithExecutorCheck("request-encoder",
new RequestEncoder(conf), handlerToUseExecutionGroup,
executionGroup, ch);
+ // ResponseClientHandler is the last handler in channel pipeline
+ // It handles the SSL Exception in a special way
PipelineUtils.addLastWithExecutorCheck("response-handler",
- new ResponseClientHandler(NettyClient.this, conf),
+ new ResponseClientHandler(
+ NettyClient.this, conf, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
/*if_not[HADOOP_NON_SECURE]*/
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java
new file mode 100644
index 0000000..624f412
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.log4j.Logger;
+
+import javax.net.ssl.SSLException;
+
+/**
+ * Class to handle all SSL related logic. This class creates the SSL Context,
+ * sets up SSL handler and the interface for custom SSL event handling. When
+ * SSL_ENCRYPT is set to true, an object of this class is created and the
+ * getSslHandler function is called to get the SSL handler and add it to the
+ * Netty channel pipeline. This class also adds an event listener function to
+ * on handshake complete event and calls the custom
+ * {@link SSLEventHandler} handleOnSslHandshakeComplete function, if defined.
+ */
+public class NettySSLHandler
+{
+ /** Class Logger */
+ private static final Logger LOG = Logger.getLogger(NettySSLHandler.class);
+ /** Client or Server */
+ private boolean isClient;
+ /** Giraph Configuration */
+ private ImmutableClassesGiraphConfiguration conf;
+ /** SSL Event Handler interface */
+ private SSLEventHandler sslEventHandler;
+ /** SSL Context object */
+ private SslContext sslContext;
+
+ /**
+ * Constructor
+ *
+ * This is called before the Netty Channel is setup. This initializes the
+ * SslContext object, which will be used by the SSL Handler. Since this
+ * class is instantiated once for every Netty client and server, creating
+ * the SslContext ensures that we create it only once and re-use it to
+ * create the SSL handlers (getSslHandler) instead of creating the
+ * SslContext for every call to getSslHandler.
+ *
+ * @param isClient Client/server for which the ssl handler
+ * needs to be created
+ * @param conf configuration object
+ */
+ public NettySSLHandler(
+ boolean isClient,
+ ImmutableClassesGiraphConfiguration conf) {
+ this.isClient = isClient;
+ this.conf = conf;
+ this.sslEventHandler = conf.createSSLEventHandler();
+ try {
+ this.sslContext = new SSLConfig.Builder(
+ this.isClient, this.conf, SSLConfig.VerifyMode.VERIFY_REQ_CLIENT_CERT)
+ .build()
+ .buildSslContext();
+ } catch (SSLException e) {
+ LOG.error("Failed to build SSLConfig object " + e.getCause());
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Build the Client or server SSL Context, create new SSL handler,
+ * add a listener function to onSslHandshakeComplete and return
+ *
+ * @param allocator ByteBufAllocator of the channel
+ *
+ * @return The SSL Handler object
+ */
+ public SslHandler getSslHandler(ByteBufAllocator allocator)
+ {
+ SslHandler handler = this.sslContext.newHandler(allocator);
+ handler.handshakeFuture().addListener(
+ f -> onSslHandshakeComplete(f, handler));
+ return handler;
+ }
+
+ /**
+ * Build the Client or server SSL Context, create new SSL handler,
+ * add a listener function to onSslHandshakeComplete and return
+ *
+ * @param future Future object to be notified once handshake completes
+ * @param sslHandler SslHandler object
+ *
+ * @throws Exception
+ */
+ private void onSslHandshakeComplete(
+ Future<? super Channel> future,
+ SslHandler sslHandler) throws Exception
+ {
+ // If no custom SSL Event Handler is defined, return
+ if (sslEventHandler == null) {
+ return;
+ }
+ try {
+ sslEventHandler.handleOnSslHandshakeComplete(
+ future, sslHandler, isClient);
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ // If there is any exception from onSslHandshakeComplete
+ // Cast it to SSLException and propagate it down the channel
+ LOG.error("Error in handleOnSslHandshakeComplete: " + e.getMessage());
+ Channel ch = (Channel) future.getNow();
+ ch.pipeline().fireExceptionCaught(new SSLException(e));
+ }
+ LOG.debug("onSslHandshakeComplete succeeded");
+ }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 5cb1502..223c710 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -127,7 +127,8 @@
private final String handlerToUseExecutionGroup;
/** Handles all uncaught exceptions in netty threads */
private final Thread.UncaughtExceptionHandler exceptionHandler;
-
+ /** Netty SSL Handler class */
+ private final NettySSLHandler nettySSLHandler;
/**
* Constructor for creating the server
@@ -192,6 +193,12 @@
} else {
executionGroup = null;
}
+
+ if (conf.sslAuthenticate()) {
+ nettySSLHandler = new NettySSLHandler(false, conf);
+ } else {
+ nettySSLHandler = null;
+ }
}
/*if_not[HADOOP_NON_SECURE]*/
@@ -311,6 +318,13 @@
} else {
LOG.info("start: Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
+
+ if (conf.sslAuthenticate()) {
+ PipelineUtils.addLastWithExecutorCheck("sslHandler",
+ nettySSLHandler.getSslHandler(ch.alloc()),
+ handlerToUseExecutionGroup, executionGroup, ch);
+ }
+
// Store all connected channels in order to ensure that we can close
// them on stop(), or else stop() may hang waiting for the
// connections to close on their own
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java
new file mode 100644
index 0000000..1ad20b9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.netty.handler.ssl.ClientAuth;
+import org.apache.giraph.conf.StrConfOption;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+
+import javax.net.ssl.SSLException;
+
+/**
+ * Helper class to build Client and server SSL Context.
+ */
+public class SSLConfig
+{
+ /** Certificate Authority path for SSL config */
+ public static final StrConfOption CA_PATH =
+ new StrConfOption("giraph.sslConfig.caPath", null,
+ "Certificate Authority path for SSL config");
+
+ /** Client certificate path for SSL config */
+ public static final StrConfOption CLIENT_PATH =
+ new StrConfOption(
+ "giraph.sslConfig.clientPath", null,
+ "Client certificate path for SSL config");
+
+ /** Server certificate path for SSL config */
+ public static final StrConfOption SERVER_PATH =
+ new StrConfOption("giraph.sslConfig.serverPath", null,
+ "Server certificate path for SSL config");
+
+ /** Env variable name for Certificate Authority path of SSL config */
+ public static final StrConfOption CA_PATH_ENV =
+ new StrConfOption("giraph.sslConfig.caPathEnv", null,
+ "Env variable name for Certificate Authority path of SSL config");
+
+ /** Client certificate path for SSL config */
+ public static final StrConfOption CLIENT_PATH_ENV =
+ new StrConfOption(
+ "giraph.sslConfig.clientPathEnv", null,
+ "Env variable name for Client certificate Path of SSL config");
+
+ /** Server certificate path for SSL config */
+ public static final StrConfOption SERVER_PATH_ENV =
+ new StrConfOption("giraph.sslConfig.serverPathEnv", null,
+ "Env variable name for Server certificate Path of SSL config");
+
+ /**
+ * Enum for the verification mode during SS authentication
+ */
+ public enum VerifyMode
+ {
+ /**
+ * For server side - request a Client certificate and verify the
+ * certificate if it is sent. Does not fail if the Client does not
+ * present a certificate.
+ * For Client side - validates the server certificate or fails.
+ */
+ VERIFY,
+ /**
+ * For server side - same as VERIFY but will fail if no certificate
+ * is sent.
+ * For Client side - same as VERIFY.
+ */
+ VERIFY_REQ_CLIENT_CERT,
+ /**
+ * Request Client certificate
+ */
+ REQ_CLIENT_CERT,
+ /** No verification is done for both server and Client side */
+ NO_VERIFY
+ }
+
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(SSLConfig.class);
+
+ /** Client or server creating the config */
+ private boolean isClient;
+ /** Verification mode as per the enum defined above */
+ private VerifyMode verifyMode;
+ /** Certificate authority File */
+ private File caFile;
+ /** Certificate File */
+ private File certFile;
+ /** Key File */
+ private File keyFile;
+
+ /**
+ * Constructor to set the file paths based on verification mode
+ *
+ * @param isClient Client or server
+ * @param verifyMode verify mode as described in the enum above
+ * @param caPath certificate authority file path
+ * @param certPath certificate file path
+ * @param keyPath key file path
+ */
+ public SSLConfig(
+ boolean isClient, VerifyMode verifyMode, String caPath,
+ String certPath, String keyPath)
+ {
+ this.isClient = isClient;
+ this.verifyMode = verifyMode;
+ try {
+ if (verifyMode != VerifyMode.NO_VERIFY) {
+ checkNotNull(caPath, "CA file path should not be null");
+ caFile = new File(caPath);
+ checkArgument(caFile.exists(), "CA file %s doesn't exist", caPath);
+ }
+ if (!isClient || verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT ||
+ verifyMode == VerifyMode.REQ_CLIENT_CERT) {
+ checkNotNull(certPath, "certificate file path should not be null");
+ certFile = new File(certPath);
+ checkArgument(certFile.exists(),
+ "cert file %s doesn't exist", certPath);
+
+ checkNotNull(keyPath, "key path should not be null");
+ keyFile = new File(keyPath);
+ checkArgument(keyFile.exists(), "key file %s doesn't exist", keyPath);
+ }
+ } catch (NullPointerException | IllegalArgumentException e) {
+ LOG.error("Failed to load required SSL files. Exception: " +
+ e.getMessage());
+ LOG.error("Failure happened when using SSLConfig: isClient = " +
+ String.valueOf(isClient) + " with verifyMode = " +
+ String.valueOf(verifyMode) + " and paths ca:" + caPath + ", cert:" +
+ certPath + ", key:" + keyPath, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Wrapper Function to build Client or server SSL context
+ *
+ * @throws SSLException
+ * @return Built SslContext
+ */
+ SslContext buildSslContext()
+ throws SSLException
+ {
+ if (isClient) {
+ return buildClientSslContext();
+ } else {
+ return buildServerSslContext();
+ }
+ }
+
+ /**
+ * Function to build Client SSL context
+ *
+ * @throws SSLException
+ * @return Built SslContext
+ */
+ private SslContext buildClientSslContext()
+ throws SSLException
+ {
+ SslContextBuilder builder = SslContextBuilder.forClient();
+ if (verifyMode == VerifyMode.NO_VERIFY) {
+ builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ } else {
+ builder.trustManager(caFile);
+ }
+ if (verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT) {
+ builder.keyManager(certFile, keyFile);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Function to build Server SSL context
+ *
+ * @throws SSLException
+ * @return Built SslContext
+ */
+ private SslContext buildServerSslContext()
+ throws SSLException
+ {
+ SslContextBuilder builder = SslContextBuilder.forServer(certFile, keyFile);
+ if (verifyMode == VerifyMode.NO_VERIFY) {
+ builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ } else {
+ builder.trustManager(caFile);
+ }
+ if (verifyMode == VerifyMode.VERIFY) {
+ builder.clientAuth(ClientAuth.OPTIONAL);
+ } else if (verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT) {
+ builder.clientAuth(ClientAuth.REQUIRE);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Static Builder class and wrapper around SSLConfig
+ * Checks and assigns the certificate and key paths - first
+ * from a Env variables and then from the provided files.
+ */
+ public static class Builder
+ {
+ /** Client or server creating the config */
+ private boolean isClient;
+ /** Verification mode as per the enum defined above */
+ private VerifyMode verifyMode;
+ /** Certificate authority File */
+ private String caPath;
+ /** Key File */
+ private String keyPath;
+ /** Certificate File */
+ private String certPath;
+ /** Giraph configuration */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor
+ *
+ * @param isClient Client or server
+ * @param conf Giraph configuration
+ * @param verifyMode Verify Mode
+ */
+ public Builder(
+ boolean isClient,
+ ImmutableClassesGiraphConfiguration conf,
+ VerifyMode verifyMode)
+ {
+ this.isClient = isClient;
+ this.conf = conf;
+ this.verifyMode = verifyMode;
+ assignCAPath();
+ assignCertAndKeyPath();
+ }
+
+ /**
+ * Assign certificate authority path from
+ * env variable or from ca path provided
+ */
+ private void assignCAPath()
+ {
+ this.caPath = CA_PATH.get(conf);
+
+ if (CA_PATH_ENV.get(conf) != null) {
+ String envVarTlsCAPath = System.getenv(CA_PATH_ENV.get(conf));
+ if (envVarTlsCAPath != null) {
+ this.caPath = envVarTlsCAPath;
+ }
+ }
+ }
+
+ /**
+ * Assign certificate and key paths from
+ * env variable or from paths provided
+ */
+ private void assignCertAndKeyPath()
+ {
+
+ if (CLIENT_PATH_ENV.get(conf) != null &&
+ SERVER_PATH_ENV.get(conf) != null) {
+ String envVarTlsClientCertPath =
+ System.getenv(CLIENT_PATH_ENV.get(conf));
+ String envVarTlsClientKeyPath =
+ System.getenv(SERVER_PATH_ENV.get(conf));
+ if (envVarTlsClientCertPath != null && envVarTlsClientKeyPath != null) {
+ // If expected env variables are present, check if file exists
+ File certFile = new File(envVarTlsClientCertPath);
+ File keyFile = new File(envVarTlsClientKeyPath);
+ if (certFile.exists() && keyFile.exists()) {
+ // set paths and return if both files exist
+ this.certPath = envVarTlsClientCertPath;
+ this.keyPath = envVarTlsClientKeyPath;
+ return;
+ }
+ }
+ }
+
+ // Now we know that we are a Client, without valid env variables
+ // We shall try to read off of the default Client path
+ if (CLIENT_PATH.get(conf) != null &&
+ (new File(CLIENT_PATH.get(conf))).exists()) {
+ LOG.error("Falling back to CLIENT_PATH (" + CLIENT_PATH.get(conf) +
+ ") since env var path is not valid/env var not present");
+ this.keyPath = CLIENT_PATH.get(conf);
+ this.certPath = CLIENT_PATH.get(conf);
+ return;
+ }
+
+ // Looks like default Client also does not exist
+ // Time to use the server cert and see if we have any luck
+ LOG.error("EnvVar and CLIENT_PATH (" + CLIENT_PATH.get(conf) +
+ ") both do not exist/invalid, trying SERVER_PATH(" +
+ SERVER_PATH.get(conf) + ")");
+ this.keyPath = SERVER_PATH.get(conf);
+ this.certPath = SERVER_PATH.get(conf);
+ }
+
+ /**
+ * Build function which calls the main SSLConfig class
+ *
+ * @return SSLConfig object
+ */
+ public SSLConfig build()
+ {
+ return new SSLConfig(
+ isClient, verifyMode, this.caPath, this.certPath, this.keyPath);
+ }
+ }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java
new file mode 100644
index 0000000..9435790
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import io.netty.channel.Channel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+
+/**
+ * An SSLEventHandler is used to handle events during the SSL
+ * handshake process and override the default behavior.
+ */
+public interface SSLEventHandler
+ extends ImmutableClassesGiraphConfigurable
+{
+ /**
+ * Handle the event 'onSslHandshakeComplete' after TLS handshake
+ * The Channel future can be used to modify the channel,
+ * fire exceptions and so on.
+ *
+ * @param future Channel future
+ * @param isClient whether it is the client or server involved
+ * @param sslHandler the SslHandler
+ */
+ void handleOnSslHandshakeComplete(
+ Future<? super Channel> future,
+ SslHandler sslHandler,
+ boolean isClient) throws Exception;
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 5c6f035..6b66a66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -27,6 +27,8 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
+import javax.net.ssl.SSLException;
+
import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
/**
@@ -42,15 +44,22 @@
private final boolean dropFirstResponse;
/** Netty client that does the actual I/O and keeps track of open requests */
private final NettyClient nettyClient;
+ /** Handler for uncaught exceptions */
+ private final Thread.UncaughtExceptionHandler exceptionHandler;
/**
* Constructor.
* @param nettyClient Client that does the actual I/O
* @param conf Configuration
+ * @param exceptionHandler Handles uncaught exceptions
*/
- public ResponseClientHandler(NettyClient nettyClient, Configuration conf) {
+ public ResponseClientHandler(
+ NettyClient nettyClient,
+ Configuration conf,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.nettyClient = nettyClient;
dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
+ this.exceptionHandler = exceptionHandler;
}
@Override
@@ -106,9 +115,14 @@
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- LOG.warn("exceptionCaught: Channel channelId=" +
+ // If SSLException, fail the client
+ if (cause instanceof SSLException) {
+ exceptionHandler.uncaughtException(Thread.currentThread(), cause);
+ } else {
+ LOG.warn("exceptionCaught: Channel channelId=" +
ctx.channel().hashCode() + " failed with remote address " +
ctx.channel().remoteAddress(), cause);
+ }
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 20d40cf..2e7a41c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -957,6 +957,15 @@
}
/**
+ * Use authentication? (if supported)
+ *
+ * @return True if should authenticate, false otherwise
+ */
+ public boolean sslAuthenticate() {
+ return SSL_ENCRYPT.get(this);
+ }
+
+ /**
* Set the number of compute threads
*
* @param numComputeThreads Number of compute threads to use
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2236092..61beb9a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -26,6 +26,7 @@
import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.netty.SSLEventHandler;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
import org.apache.giraph.edge.CreateSourceVertexCallback;
@@ -1073,6 +1074,15 @@
"Whether to use SASL with DIGEST and Hadoop Job Tokens to " +
"authenticate and authorize Netty BSP Clients to Servers.");
+ /**
+ * Whether to use SSL to authenticate and authorize "
+ * Netty BSP Clients to Servers.
+ */
+ BooleanConfOption SSL_ENCRYPT =
+ new BooleanConfOption("giraph.sslEncrypt", false,
+ "Whether to use SSL to authenticate and authorize " +
+ "Netty BSP Clients to Servers.");
+
/** Use unsafe serialization? */
BooleanConfOption USE_UNSAFE_SERIALIZATION =
new BooleanConfOption("giraph.useUnsafeSerialization", true,
@@ -1335,5 +1345,11 @@
BooleanConfOption FAIL_ON_EMPTY_INPUT = new BooleanConfOption(
"giraph.failOnEmptyInput", true,
"Whether to fail the job or just warn when input is empty");
+
+ /** SSLEventHandler class - optional */
+ ClassConfOption<SSLEventHandler> SSL_EVENT_HANDLER_CLASS =
+ ClassConfOption.create("giraph.sslEventHandler",
+ null, SSLEventHandler.class,
+ "SSLEventHandler class - optional");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index d244d20..f9b6761 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -28,6 +28,7 @@
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.comm.netty.SSLEventHandler;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.edge.EdgeStoreFactory;
@@ -544,6 +545,28 @@
}
/**
+ * Get the user's subclassed {@link SSLEventHandler}.
+ *
+ * @return User's SSL Event Handler class
+ */
+ public Class<? extends SSLEventHandler> getSSLEventHandlerClass() {
+ return SSL_EVENT_HANDLER_CLASS.get(this);
+ }
+
+ /**
+ * Create a user SSLEventHandler class
+ *
+ * @return Instantiated user SSL Event Handler class
+ */
+ public SSLEventHandler createSSLEventHandler() {
+ if (getSSLEventHandlerClass() != null) {
+ return ReflectionUtils.newInstance(getSSLEventHandlerClass(), this);
+ } else {
+ return null;
+ }
+ }
+
+ /**
* Get the user's subclassed
* {@link org.apache.giraph.graph.VertexValueCombiner} class.
*