[java] fixed bug in the connection negotiation code

This patch fixes a typo in the connection negotiation code in the Java
client.  Prior to this fix, channel binding information was not verified
during connection negotiation because the peer certificate was not set.

In addition, I modified the error handing code in Negotiator.java to
abort connection negotiation upon receiving SSLPeerUnverifiedException
due to the absence of the channel binding information or the presence
of the invalid one.

I also added a test to verify that Kudu Java client doesn't connect
to a Kudu server which doesn't provide valid channel binding information
during the connection negotiation phase.

Kudos to Andy Singer for pointing to the bug.

Change-Id: I7bfd428128e224f03901a6cd7b33283495a28d54
Reviewed-on: http://gerrit.cloudera.org:8080/14713
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Todd Lipcon <todd@apache.org>
(cherry picked from commit a0e896475c139d308e3b6e32110e97168b9562c6)
Reviewed-on: http://gerrit.cloudera.org:8080/14724
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 5da0a8c..d02d27f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -30,6 +30,7 @@
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -451,6 +452,11 @@
       // SSLException if we've already attempted to close, otherwise log the error.
       error = new RecoverableException(Status.NetworkError(
           String.format("%s disconnected from peer", getLogPrefix())));
+    } else if (e instanceof SSLPeerUnverifiedException) {
+      String m = String.format("unable to verify identity of peer %s: %s",
+          serverInfo, e.getMessage());
+      error = new NonRecoverableException(Status.NetworkError(m), e);
+      LOG.error(m, e);
     } else {
       // If the connection was explicitly disconnected via a call to disconnect(), we should
       // have either gotten a ClosedChannelException or an SSLException.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 9168031..17a7442 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -601,6 +601,9 @@
       throw new SSLPeerUnverifiedException("no peer cert found");
     }
 
+    // The first element of the array is the peer's own certificate.
+    peerCert = certs[0];
+
     // Don't wrap the TLS socket if we are using TLS for authentication only.
     boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
         isLoopbackConnection(chan);
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 3c4de25..37489a8 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -30,6 +30,7 @@
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
@@ -61,8 +62,19 @@
   private enum Option {
     LONG_LEADER_ELECTION,
     SHORT_TOKENS_AND_TICKETS,
-    START_TSERVERS
-  };
+    START_TSERVERS,
+  }
+
+  static private class KeyValueMessage {
+    final String key;
+    final String val;
+    final String msg;
+    KeyValueMessage(String k, String v, String m) {
+      key = k;
+      val = v;
+      msg = m;
+    }
+  }
 
   private void startCluster(Set<Option> opts) throws IOException {
     MiniKuduClusterBuilder mcb = new MiniKuduClusterBuilder();
@@ -435,4 +447,46 @@
     Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
         "Using caller-provided subject with Kerberos principal test-admin@KRBTEST.COM."));
   }
+
+  /**
+   * Test that if a Kudu server (in this case master) doesn't provide valid
+   * connection binding information, Java client fails to connect to the server.
+   */
+  @Test(timeout=60000)
+  public void testNegotiationChannelBindings() throws Exception {
+    startCluster(ImmutableSet.of(Option.START_TSERVERS));
+    // Test precondition: all is well with masters -- the client is able
+    // to connect to the cluster and create a table.
+    client.createTable("TestSecurity-channel-bindings-0",
+        getBasicSchema(), getBasicCreateTableOptions());
+
+    List<KeyValueMessage> variants = ImmutableList.of(
+        new KeyValueMessage("rpc_inject_invalid_channel_bindings_ratio", "1.0",
+            "invalid channel bindings provided by remote peer"),
+        new KeyValueMessage("rpc_send_channel_bindings", "false",
+            "no channel bindings provided by remote peer"));
+
+    // Make all masters sending invalid channel binding info during connection
+    // negotiation.
+    for (KeyValueMessage kvm : variants) {
+      for (HostAndPort hp : miniCluster.getMasterServers()) {
+        miniCluster.setMasterFlag(hp, kvm.key, kvm.val);
+      }
+
+      // Now, a client should not be able to connect to any master: negotiation
+      // fails because client cannot authenticate the servers since it fails
+      // to verify the connection binding.
+      try {
+        KuduClient c = new KuduClient.KuduClientBuilder(
+            miniCluster.getMasterAddressesAsString()).build();
+        c.createTable("TestSecurity-channel-bindings-1",
+            getBasicSchema(), getBasicCreateTableOptions());
+        Assert.fail("client should not be able to connect to any master");
+      } catch (NonRecoverableException e) {
+        Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
+            "unable to verify identity of peer"));
+        Assert.assertThat(e.getMessage(), CoreMatchers.containsString(kvm.msg));
+      }
+    }
+  }
 }
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 04e08b1..15c47d7 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -80,6 +80,18 @@
 TAG_FLAG(rpc_inject_invalid_authn_token_ratio, runtime);
 TAG_FLAG(rpc_inject_invalid_authn_token_ratio, unsafe);
 
+DEFINE_double(rpc_inject_invalid_channel_bindings_ratio, 0,
+            "The ratio of injection of invalid channel bindings during "
+            "connection negotiation. This is a test-only flag.");
+TAG_FLAG(rpc_inject_invalid_channel_bindings_ratio, runtime);
+TAG_FLAG(rpc_inject_invalid_channel_bindings_ratio, unsafe);
+
+DEFINE_bool(rpc_send_channel_bindings, true,
+            "Whether to send channel bindings in NegotiatePB response as "
+            "prescribed by RFC 5929. This is a test-only flag.");
+TAG_FLAG(rpc_send_channel_bindings, runtime);
+TAG_FLAG(rpc_send_channel_bindings, unsafe);
+
 DECLARE_bool(rpc_encrypt_loopback_connections);
 
 DEFINE_string(trusted_subnets,
@@ -866,7 +878,7 @@
     RETURN_NOT_OK(security::GenerateNonce(nonce_.get_ptr()));
     response.set_nonce(*nonce_);
 
-    if (tls_negotiated_) {
+    if (tls_negotiated_ && PREDICT_TRUE(FLAGS_rpc_send_channel_bindings)) {
       // Send the channel bindings to the client.
       security::Cert cert;
       RETURN_NOT_OK(tls_handshake_.GetLocalCert(&cert));
@@ -874,6 +886,12 @@
       string plaintext_channel_bindings;
       RETURN_NOT_OK(cert.GetServerEndPointChannelBindings(&plaintext_channel_bindings));
 
+      if (kudu::fault_injection::MaybeTrue(
+          FLAGS_rpc_inject_invalid_channel_bindings_ratio)) {
+        DCHECK_GT(plaintext_channel_bindings.size(), 0);
+        plaintext_channel_bindings[0] += 1;
+      }
+
       Slice ciphertext;
       RETURN_NOT_OK(SaslEncode(sasl_conn_.get(),
                                plaintext_channel_bindings,