GIRAPH-1251

closes #150
diff --git a/checkstyle.xml b/checkstyle.xml
index f820d74..5462769 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -227,7 +227,7 @@
     </module>
       <!-- Over time, we will revised this down -->
     <module name="MethodLength">
-      <property name="max" value="210"/>
+      <property name="max" value="220"/>
     </module>
     <module name="ParameterNumber">
       <property name="max" value="8"/>
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 229ac1d..7f0d3f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -241,6 +241,8 @@
   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
   /** How many network requests were resent because connection failed */
   private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
+  /** Netty SSL Handler class */
+  private final NettySSLHandler nettySSLHandler;
 
   /**
    * Keeps track of the number of reconnect failures. Once this exceeds the
@@ -325,6 +327,12 @@
       executionGroup = null;
     }
 
+    if (conf.sslAuthenticate()) {
+      nettySSLHandler = new NettySSLHandler(true, conf);
+    } else {
+      nettySSLHandler = null;
+    }
+
     workerGroup = new NioEventLoopGroup(maxPoolSize,
         ThreadUtils.createThreadFactory(
             "netty-client-worker-%d", exceptionHandler));
@@ -395,11 +403,19 @@
                   new SaslClientHandler(conf), handlerToUseExecutionGroup,
                   executionGroup, ch);
               PipelineUtils.addLastWithExecutorCheck("response-handler",
-                  new ResponseClientHandler(NettyClient.this, conf),
+                  new ResponseClientHandler(
+                    NettyClient.this, conf, exceptionHandler),
                   handlerToUseExecutionGroup, executionGroup, ch);
             } else {
               LOG.info("Using Netty without authentication.");
 /*end[HADOOP_NON_SECURE]*/
+
+              if (conf.sslAuthenticate()) {
+                PipelineUtils.addLastWithExecutorCheck("sslHandler",
+                  nettySSLHandler.getSslHandler(ch.alloc()),
+                  handlerToUseExecutionGroup, executionGroup, ch);
+              }
+
               PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
                 new FlushConsolidationHandler(FlushConsolidationHandler
                     .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
@@ -429,8 +445,11 @@
               PipelineUtils.addLastWithExecutorCheck("request-encoder",
                     new RequestEncoder(conf), handlerToUseExecutionGroup,
                   executionGroup, ch);
+              // ResponseClientHandler is the last handler in channel pipeline
+              // It handles the SSL Exception in a special way
               PipelineUtils.addLastWithExecutorCheck("response-handler",
-                  new ResponseClientHandler(NettyClient.this, conf),
+                  new ResponseClientHandler(
+                    NettyClient.this, conf, exceptionHandler),
                   handlerToUseExecutionGroup, executionGroup, ch);
 
 /*if_not[HADOOP_NON_SECURE]*/
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java
new file mode 100644
index 0000000..624f412
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettySSLHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.log4j.Logger;
+
+import javax.net.ssl.SSLException;
+
+/**
+ * Class to handle all SSL related logic. This class creates the SSL Context,
+ * sets up SSL handler and the interface for custom SSL event handling. When
+ * SSL_ENCRYPT is set to true, an object of this class is created and the
+ * getSslHandler function is called to get the SSL handler and add it to the
+ * Netty channel pipeline. This class also adds an event listener function to
+ * on handshake complete event and calls the custom
+ * {@link SSLEventHandler} handleOnSslHandshakeComplete function, if defined.
+ */
+public class NettySSLHandler
+{
+  /** Class Logger */
+  private static final Logger LOG = Logger.getLogger(NettySSLHandler.class);
+  /** Client or Server */
+  private boolean isClient;
+  /** Giraph Configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** SSL Event Handler interface */
+  private SSLEventHandler sslEventHandler;
+  /** SSL Context object */
+  private SslContext sslContext;
+
+  /**
+   * Constructor
+   *
+   * This is called before the Netty Channel is setup. This initializes the
+   * SslContext object, which will be used by the SSL Handler. Since this
+   * class is instantiated once for every Netty client and server, creating
+   * the SslContext ensures that we create it only once and re-use it to
+   * create the SSL handlers (getSslHandler) instead of creating the
+   * SslContext for every call to getSslHandler.
+   *
+   * @param isClient Client/server for which the ssl handler
+   *                 needs to be created
+   * @param conf configuration object
+   */
+  public NettySSLHandler(
+    boolean isClient,
+    ImmutableClassesGiraphConfiguration conf) {
+    this.isClient = isClient;
+    this.conf = conf;
+    this.sslEventHandler = conf.createSSLEventHandler();
+    try {
+      this.sslContext = new SSLConfig.Builder(
+        this.isClient, this.conf, SSLConfig.VerifyMode.VERIFY_REQ_CLIENT_CERT)
+        .build()
+        .buildSslContext();
+    } catch (SSLException e) {
+      LOG.error("Failed to build SSLConfig object " + e.getCause());
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Build the Client or server SSL Context, create new SSL handler,
+   * add a listener function to onSslHandshakeComplete and return
+   *
+   * @param allocator ByteBufAllocator of the channel
+   *
+   * @return The SSL Handler object
+   */
+  public SslHandler getSslHandler(ByteBufAllocator allocator)
+  {
+    SslHandler handler = this.sslContext.newHandler(allocator);
+    handler.handshakeFuture().addListener(
+        f -> onSslHandshakeComplete(f, handler));
+    return handler;
+  }
+
+  /**
+   * Build the Client or server SSL Context, create new SSL handler,
+   * add a listener function to onSslHandshakeComplete and return
+   *
+   * @param future Future object to be notified once handshake completes
+   * @param sslHandler SslHandler object
+   *
+   * @throws Exception
+   */
+  private void onSslHandshakeComplete(
+      Future<? super Channel> future,
+        SslHandler sslHandler) throws Exception
+  {
+    // If no custom SSL Event Handler is defined, return
+    if (sslEventHandler == null) {
+      return;
+    }
+    try {
+      sslEventHandler.handleOnSslHandshakeComplete(
+        future, sslHandler, isClient);
+    // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+    // CHECKSTYLE: resume IllegalCatch
+      // If there is any exception from onSslHandshakeComplete
+      // Cast it to SSLException and propagate it down the channel
+      LOG.error("Error in handleOnSslHandshakeComplete: " + e.getMessage());
+      Channel ch = (Channel) future.getNow();
+      ch.pipeline().fireExceptionCaught(new SSLException(e));
+    }
+    LOG.debug("onSslHandshakeComplete succeeded");
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 5cb1502..223c710 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -127,7 +127,8 @@
   private final String handlerToUseExecutionGroup;
   /** Handles all uncaught exceptions in netty threads */
   private final Thread.UncaughtExceptionHandler exceptionHandler;
-
+  /** Netty SSL Handler class */
+  private final NettySSLHandler nettySSLHandler;
 
   /**
    * Constructor for creating the server
@@ -192,6 +193,12 @@
     } else {
       executionGroup = null;
     }
+
+    if (conf.sslAuthenticate()) {
+      nettySSLHandler = new NettySSLHandler(false, conf);
+    } else {
+      nettySSLHandler = null;
+    }
   }
 
 /*if_not[HADOOP_NON_SECURE]*/
@@ -311,6 +318,13 @@
         } else {
           LOG.info("start: Using Netty without authentication.");
 /*end[HADOOP_NON_SECURE]*/
+
+          if (conf.sslAuthenticate()) {
+            PipelineUtils.addLastWithExecutorCheck("sslHandler",
+              nettySSLHandler.getSslHandler(ch.alloc()),
+              handlerToUseExecutionGroup, executionGroup, ch);
+          }
+
           // Store all connected channels in order to ensure that we can close
           // them on stop(), or else stop() may hang waiting for the
           // connections to close on their own
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java
new file mode 100644
index 0000000..1ad20b9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLConfig.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.netty.handler.ssl.ClientAuth;
+import org.apache.giraph.conf.StrConfOption;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+
+import javax.net.ssl.SSLException;
+
+/**
+ * Helper class to build Client and server SSL Context.
+ */
+public class SSLConfig
+{
+  /** Certificate Authority path for SSL config */
+  public static final StrConfOption CA_PATH =
+      new StrConfOption("giraph.sslConfig.caPath", null,
+        "Certificate Authority path for SSL config");
+
+  /** Client certificate path for SSL config */
+  public static final StrConfOption CLIENT_PATH =
+      new StrConfOption(
+        "giraph.sslConfig.clientPath", null,
+        "Client certificate path for SSL config");
+
+  /** Server certificate path for SSL config */
+  public static final StrConfOption SERVER_PATH =
+      new StrConfOption("giraph.sslConfig.serverPath", null,
+        "Server certificate path for SSL config");
+
+  /** Env variable name for Certificate Authority path of SSL config */
+  public static final StrConfOption CA_PATH_ENV =
+    new StrConfOption("giraph.sslConfig.caPathEnv", null,
+      "Env variable name for Certificate Authority path of SSL config");
+
+  /** Client certificate path for SSL config */
+  public static final StrConfOption CLIENT_PATH_ENV =
+    new StrConfOption(
+      "giraph.sslConfig.clientPathEnv", null,
+      "Env variable name for Client certificate Path of SSL config");
+
+  /** Server certificate path for SSL config */
+  public static final StrConfOption SERVER_PATH_ENV =
+    new StrConfOption("giraph.sslConfig.serverPathEnv", null,
+      "Env variable name for Server certificate Path of SSL config");
+
+  /**
+   * Enum for the verification mode during SS authentication
+   */
+  public enum VerifyMode
+  {
+    /**
+     * For server side - request a Client certificate and verify the
+     * certificate if it is sent.  Does not fail if the Client does not
+     * present a certificate.
+     * For Client side - validates the server certificate or fails.
+     */
+    VERIFY,
+    /**
+     * For server side - same as VERIFY but will fail if no certificate
+     * is sent.
+     * For Client side - same as VERIFY.
+     */
+    VERIFY_REQ_CLIENT_CERT,
+    /**
+     * Request Client certificate
+     */
+    REQ_CLIENT_CERT,
+    /** No verification is done for both server and Client side */
+    NO_VERIFY
+  }
+
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(SSLConfig.class);
+
+  /** Client or server creating the config */
+  private boolean isClient;
+  /** Verification mode as per the enum defined above */
+  private VerifyMode verifyMode;
+  /** Certificate authority File */
+  private File caFile;
+  /** Certificate File */
+  private File certFile;
+  /** Key File */
+  private File keyFile;
+
+  /**
+   * Constructor to set the file paths based on verification mode
+   *
+   * @param isClient Client or server
+   * @param verifyMode verify mode as described in the enum above
+   * @param caPath certificate authority file path
+   * @param certPath certificate file path
+   * @param keyPath key file path
+   */
+  public SSLConfig(
+    boolean isClient, VerifyMode verifyMode, String caPath,
+    String certPath, String keyPath)
+  {
+    this.isClient = isClient;
+    this.verifyMode = verifyMode;
+    try {
+      if (verifyMode != VerifyMode.NO_VERIFY) {
+        checkNotNull(caPath, "CA file path should not be null");
+        caFile = new File(caPath);
+        checkArgument(caFile.exists(), "CA file %s doesn't exist", caPath);
+      }
+      if (!isClient || verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT ||
+          verifyMode == VerifyMode.REQ_CLIENT_CERT) {
+        checkNotNull(certPath, "certificate file path should not be null");
+        certFile = new File(certPath);
+        checkArgument(certFile.exists(),
+          "cert file %s doesn't exist", certPath);
+
+        checkNotNull(keyPath, "key path should not be null");
+        keyFile = new File(keyPath);
+        checkArgument(keyFile.exists(), "key file %s doesn't exist", keyPath);
+      }
+    } catch (NullPointerException | IllegalArgumentException e) {
+      LOG.error("Failed to load required SSL files. Exception: " +
+        e.getMessage());
+      LOG.error("Failure happened when using SSLConfig: isClient = " +
+        String.valueOf(isClient) + " with verifyMode = " +
+        String.valueOf(verifyMode) + " and paths ca:" + caPath + ", cert:" +
+        certPath + ", key:" + keyPath, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Wrapper Function to build Client or server SSL context
+   *
+   * @throws SSLException
+   * @return Built SslContext
+   */
+  SslContext buildSslContext()
+      throws SSLException
+  {
+    if (isClient) {
+      return buildClientSslContext();
+    } else {
+      return buildServerSslContext();
+    }
+  }
+
+  /**
+   * Function to build Client SSL context
+   *
+   * @throws SSLException
+   * @return Built SslContext
+   */
+  private SslContext buildClientSslContext()
+      throws SSLException
+  {
+    SslContextBuilder builder = SslContextBuilder.forClient();
+    if (verifyMode == VerifyMode.NO_VERIFY) {
+      builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+    } else {
+      builder.trustManager(caFile);
+    }
+    if (verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT) {
+      builder.keyManager(certFile, keyFile);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Function to build Server SSL context
+   *
+   * @throws SSLException
+   * @return Built SslContext
+   */
+  private SslContext buildServerSslContext()
+      throws SSLException
+  {
+    SslContextBuilder builder = SslContextBuilder.forServer(certFile, keyFile);
+    if (verifyMode == VerifyMode.NO_VERIFY) {
+      builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+    } else {
+      builder.trustManager(caFile);
+    }
+    if (verifyMode == VerifyMode.VERIFY) {
+      builder.clientAuth(ClientAuth.OPTIONAL);
+    } else if (verifyMode == VerifyMode.VERIFY_REQ_CLIENT_CERT) {
+      builder.clientAuth(ClientAuth.REQUIRE);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Static Builder class and wrapper around SSLConfig
+   * Checks and assigns the certificate and key paths - first
+   * from a Env variables and then from the provided files.
+   */
+  public static class Builder
+  {
+    /** Client or server creating the config */
+    private boolean isClient;
+    /** Verification mode as per the enum defined above */
+    private VerifyMode verifyMode;
+    /** Certificate authority File */
+    private String caPath;
+    /** Key File */
+    private String keyPath;
+    /** Certificate File */
+    private String certPath;
+    /** Giraph configuration */
+    private ImmutableClassesGiraphConfiguration conf;
+
+    /**
+     * Constructor
+     *
+     * @param isClient Client or server
+     * @param conf Giraph configuration
+     * @param verifyMode Verify Mode
+     */
+    public Builder(
+      boolean isClient,
+      ImmutableClassesGiraphConfiguration conf,
+      VerifyMode verifyMode)
+    {
+      this.isClient = isClient;
+      this.conf = conf;
+      this.verifyMode = verifyMode;
+      assignCAPath();
+      assignCertAndKeyPath();
+    }
+
+    /**
+     * Assign certificate authority path from
+     * env variable or from ca path provided
+     */
+    private void assignCAPath()
+    {
+      this.caPath = CA_PATH.get(conf);
+
+      if (CA_PATH_ENV.get(conf) != null) {
+        String envVarTlsCAPath = System.getenv(CA_PATH_ENV.get(conf));
+        if (envVarTlsCAPath != null) {
+          this.caPath = envVarTlsCAPath;
+        }
+      }
+    }
+
+    /**
+     * Assign certificate and key paths from
+     * env variable or from paths provided
+     */
+    private void assignCertAndKeyPath()
+    {
+
+      if (CLIENT_PATH_ENV.get(conf) != null &&
+        SERVER_PATH_ENV.get(conf) != null) {
+        String envVarTlsClientCertPath =
+          System.getenv(CLIENT_PATH_ENV.get(conf));
+        String envVarTlsClientKeyPath =
+          System.getenv(SERVER_PATH_ENV.get(conf));
+        if (envVarTlsClientCertPath != null && envVarTlsClientKeyPath != null) {
+          // If expected env variables are present, check if file exists
+          File certFile = new File(envVarTlsClientCertPath);
+          File keyFile = new File(envVarTlsClientKeyPath);
+          if (certFile.exists() && keyFile.exists()) {
+            // set paths and return if both files exist
+            this.certPath = envVarTlsClientCertPath;
+            this.keyPath = envVarTlsClientKeyPath;
+            return;
+          }
+        }
+      }
+
+      // Now we know that we are a Client, without valid env variables
+      // We shall try to read off of the default Client path
+      if (CLIENT_PATH.get(conf) != null &&
+        (new File(CLIENT_PATH.get(conf))).exists()) {
+        LOG.error("Falling back to CLIENT_PATH (" + CLIENT_PATH.get(conf) +
+          ") since env var path is not valid/env var not present");
+        this.keyPath = CLIENT_PATH.get(conf);
+        this.certPath = CLIENT_PATH.get(conf);
+        return;
+      }
+
+      // Looks like default Client also does not exist
+      // Time to use the server cert and see if we have any luck
+      LOG.error("EnvVar and CLIENT_PATH (" + CLIENT_PATH.get(conf) +
+        ") both do not exist/invalid, trying SERVER_PATH(" +
+        SERVER_PATH.get(conf) + ")");
+      this.keyPath = SERVER_PATH.get(conf);
+      this.certPath = SERVER_PATH.get(conf);
+    }
+
+    /**
+     * Build function which calls the main SSLConfig class
+     *
+     * @return SSLConfig object
+     */
+    public SSLConfig build()
+    {
+      return new SSLConfig(
+        isClient, verifyMode, this.caPath, this.certPath, this.keyPath);
+    }
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java
new file mode 100644
index 0000000..9435790
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SSLEventHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import io.netty.channel.Channel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+
+/**
+ * An SSLEventHandler is used to handle events during the SSL
+ * handshake process and override the default behavior.
+ */
+public interface SSLEventHandler
+  extends ImmutableClassesGiraphConfigurable
+{
+  /**
+   * Handle the event 'onSslHandshakeComplete' after TLS handshake
+   * The Channel future can be used to modify the channel,
+   * fire exceptions and so on.
+   *
+   * @param future Channel future
+   * @param isClient whether it is the client or server involved
+   * @param sslHandler the SslHandler
+   */
+  void handleOnSslHandshakeComplete(
+    Future<? super Channel> future,
+    SslHandler sslHandler,
+    boolean isClient) throws Exception;
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 5c6f035..6b66a66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -27,6 +27,8 @@
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.ReferenceCountUtil;
 
+import javax.net.ssl.SSLException;
+
 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
 
 /**
@@ -42,15 +44,22 @@
   private final boolean dropFirstResponse;
   /** Netty client that does the actual I/O and keeps track of open requests */
   private final NettyClient nettyClient;
+  /** Handler for uncaught exceptions */
+  private final Thread.UncaughtExceptionHandler exceptionHandler;
 
   /**
    * Constructor.
    * @param nettyClient Client that does the actual I/O
    * @param conf Configuration
+   * @param exceptionHandler Handles uncaught exceptions
    */
-  public ResponseClientHandler(NettyClient nettyClient, Configuration conf) {
+  public ResponseClientHandler(
+    NettyClient nettyClient,
+    Configuration conf,
+    Thread.UncaughtExceptionHandler exceptionHandler) {
     this.nettyClient = nettyClient;
     dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
+    this.exceptionHandler = exceptionHandler;
   }
 
   @Override
@@ -106,9 +115,14 @@
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
     throws Exception {
-    LOG.warn("exceptionCaught: Channel channelId=" +
+    // If SSLException, fail the client
+    if (cause instanceof SSLException) {
+      exceptionHandler.uncaughtException(Thread.currentThread(), cause);
+    } else {
+      LOG.warn("exceptionCaught: Channel channelId=" +
         ctx.channel().hashCode() + " failed with remote address " +
         ctx.channel().remoteAddress(), cause);
+    }
   }
 }
 
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 20d40cf..2e7a41c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -957,6 +957,15 @@
   }
 
   /**
+   * Use authentication? (if supported)
+   *
+   * @return True if should authenticate, false otherwise
+   */
+  public boolean sslAuthenticate() {
+    return SSL_ENCRYPT.get(this);
+  }
+
+  /**
    * Set the number of compute threads
    *
    * @param numComputeThreads Number of compute threads to use
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2236092..61beb9a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -26,6 +26,7 @@
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.netty.SSLEventHandler;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
 import org.apache.giraph.edge.CreateSourceVertexCallback;
@@ -1073,6 +1074,15 @@
           "Whether to use SASL with DIGEST and Hadoop Job Tokens to " +
           "authenticate and authorize Netty BSP Clients to Servers.");
 
+  /**
+   * Whether to use SSL to authenticate and authorize "
+   * Netty BSP Clients to Servers.
+   */
+  BooleanConfOption SSL_ENCRYPT =
+    new BooleanConfOption("giraph.sslEncrypt", false,
+        "Whether to use SSL to authenticate and authorize " +
+            "Netty BSP Clients to Servers.");
+
   /** Use unsafe serialization? */
   BooleanConfOption USE_UNSAFE_SERIALIZATION =
       new BooleanConfOption("giraph.useUnsafeSerialization", true,
@@ -1335,5 +1345,11 @@
   BooleanConfOption FAIL_ON_EMPTY_INPUT = new BooleanConfOption(
       "giraph.failOnEmptyInput", true,
       "Whether to fail the job or just warn when input is empty");
+
+  /** SSLEventHandler class - optional */
+  ClassConfOption<SSLEventHandler> SSL_EVENT_HANDLER_CLASS =
+    ClassConfOption.create("giraph.sslEventHandler",
+      null, SSLEventHandler.class,
+      "SSLEventHandler class - optional");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index d244d20..f9b6761 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -28,6 +28,7 @@
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.comm.netty.SSLEventHandler;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.EdgeStoreFactory;
@@ -544,6 +545,28 @@
   }
 
   /**
+   * Get the user's subclassed {@link SSLEventHandler}.
+   *
+   * @return User's SSL Event Handler class
+   */
+  public Class<? extends SSLEventHandler> getSSLEventHandlerClass() {
+    return SSL_EVENT_HANDLER_CLASS.get(this);
+  }
+
+  /**
+   * Create a user SSLEventHandler class
+   *
+   * @return Instantiated user SSL Event Handler class
+   */
+  public SSLEventHandler createSSLEventHandler() {
+    if (getSSLEventHandlerClass() != null) {
+      return ReflectionUtils.newInstance(getSSLEventHandlerClass(), this);
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Get the user's subclassed
    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
    *