[rpc] validate security-related parameters earlier

This patch removes the verification of a few security-related RPC flags
from the connection negotiation time (i.e. DoServerNegotiation()) since
that's already done in one of the group flag validators.

Apparently, this way it's more optimal:
  * misconfiguration are spotted earlier: if this type of issue
    is detected, the outcome is visible and actionable outright since
    the server simply refuses to start, reporting on the issue
  * there isn't a way to have a Kudu server running with such an issue
  * some amount of CPU resources is spared since a server no longer
    performs the same verification over and over again when establishing
    a new RPC connection, but performs it once during the start-up time

Change-Id: Ie16537c440041c9ee74276da35f2599592ef7e04
Reviewed-on: http://gerrit.cloudera.org:8080/20961
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 0e7ca24..b40359f 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -115,6 +115,7 @@
 ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(same_tablet_concurrent_writes-itest)
 ADD_KUDU_TEST(security-faults-itest)
+ADD_KUDU_TEST(security-flags-itest)
 ADD_KUDU_TEST(security-itest)
 ADD_KUDU_TEST(security-master-auth-itest)
 ADD_KUDU_TEST(security-unknown-tsk-itest PROCESSORS 4)
diff --git a/src/kudu/integration-tests/security-flags-itest.cc b/src/kudu/integration-tests/security-flags-itest.cc
new file mode 100644
index 0000000..8147621
--- /dev/null
+++ b/src/kudu/integration-tests/security-flags-itest.cc
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using gflags::SetCommandLineOption;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+
+namespace kudu {
+namespace security {
+
+class SecurityFlagsTest : public KuduTest {
+};
+
+TEST_F(SecurityFlagsTest, CheckRpcAuthnFlagsGroupValidator) {
+  // Do not rely on default values for the flags in question but explicitly
+  // set them to the required values instead to verify the functionality
+  // of the corresponding group flag validator.
+  ASSERT_NE("", SetCommandLineOption("unlock_experimental_flags", "true"));
+  ASSERT_NE("", SetCommandLineOption("rpc_authentication", "required"));
+  ASSERT_NE("", SetCommandLineOption("keytab_file", ""));
+  ASSERT_NE("", SetCommandLineOption("rpc_certificate_file", ""));
+  ASSERT_DEATH({ ValidateFlags(); },
+               ".*RPC authentication \\(--rpc_authentication\\) may not be "
+               "required unless Kerberos \\(--keytab_file) or external PKI "
+               "\\(--rpc_certificate_file et al\\) are configured"
+               ".*Detected inconsistency in command-line flags; exiting");
+}
+
+TEST_F(SecurityFlagsTest, RpcAuthnFlagsMasterStartup) {
+  ExternalMiniClusterOptions opts;
+
+  // Explicitly disable Kerberos authentication.
+  opts.enable_kerberos = false;
+
+  // Trying to start only master: it should detect an inconsistency in its
+  // command-line flags.
+  opts.num_masters = 1;
+  opts.num_tablet_servers = 0;
+
+  // Explicitly require RPC authentication for the master.
+  opts.extra_master_flags = { "--rpc_authentication=required" };
+
+  ExternalMiniCluster cluster(std::move(opts));
+  const auto s = cluster.Start();
+
+  // Master should not be running.
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  const auto& errmsg = s.ToString();
+  // Master should detect an inconsistency in the command-line flags.
+  ASSERT_STR_CONTAINS(errmsg, "Unable to start Master");
+  ASSERT_STR_CONTAINS(errmsg, "process exited with non-zero status 1");
+}
+
+TEST_F(SecurityFlagsTest, RpcAuthnFlagsTabletServerStartup) {
+  ExternalMiniClusterOptions opts;
+
+  // Explicitly disable Kerberos authentication.
+  opts.enable_kerberos = false;
+
+  // The cluster has one master and just one tablet server. To simplify this
+  // test scenario which relies on the ExternalMiniCluster's functionality,
+  // allow the single master to start, but the tablet server should not be able
+  // to start since it should detect an inconsistency in the command-line flags.
+  opts.num_masters = 1;
+  opts.num_tablet_servers = 1;
+
+  // Explicitly require RPC authentication for the tablet server.
+  // The master doesn't have this extra flag, so it should be able
+  // to start up just fine.
+  opts.extra_tserver_flags = { "--rpc_authentication=required" };
+
+  cluster::ExternalMiniCluster cluster(std::move(opts));
+  const auto s = cluster.Start();
+
+  // Master should be able to start up and be running.
+  ASSERT_TRUE(cluster.master(0)->IsProcessAlive());
+  // However, the tablet server shouldn't be running.
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  const auto& errmsg = s.ToString();
+  ASSERT_STR_CONTAINS(errmsg, "failed to start tablet server");
+  ASSERT_STR_CONTAINS(errmsg, "process exited with non-zero status 1");
+}
+
+} // namespace security
+} // namespace kudu
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 1796d81..b7d7ed3 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -257,13 +257,10 @@
                                   bool encrypt_loopback,
                                   const MonoTime& deadline) {
   auto* messenger = conn->reactor_thread()->reactor()->messenger();
-  if (authentication == RpcAuthentication::REQUIRED &&
-      messenger->keytab_file().empty() &&
-      !messenger->tls_context().is_external_cert()) {
-    return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be "
-                                   "required unless Kerberos (--keytab_file) or external PKI "
-                                   "(--rpc_certificate_file et al) are configured");
-  }
+  DCHECK(authentication != RpcAuthentication::REQUIRED ||
+      !messenger->keytab_file().empty() ||
+      messenger->tls_context().is_external_cert())
+      << "misconfiguration: check ValidateRpcAuthnFlags() for details";
 
   if (FLAGS_rpc_negotiation_inject_delay_ms > 0) {
     LOG(WARNING) << "Injecting " << FLAGS_rpc_negotiation_inject_delay_ms