| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <thread> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client-internal.h" |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/row_result.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/security/crypto.h" |
| #include "kudu/security/token.pb.h" |
| #include "kudu/security/token_signer.h" |
| #include "kudu/security/token_verifier.h" |
| #include "kudu/tablet/key_value_test_schema.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/openssl_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(rpc_reopen_outbound_connections); |
| DECLARE_int32(heartbeat_interval_ms); |
| |
| using std::atomic; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| |
| namespace kudu { |
| |
| using client::KuduClient; |
| using client::KuduClientBuilder; |
| using client::KuduError; |
| using client::KuduInsert; |
| using client::KuduRowResult; |
| using client::KuduScanner; |
| using client::KuduSchema; |
| using client::KuduSession; |
| using client::KuduTable; |
| using client::KuduTableCreator; |
| using client::sp::shared_ptr; |
| using cluster::InternalMiniCluster; |
| using cluster::InternalMiniClusterOptions; |
| using security::DataFormat; |
| using security::PrivateKey; |
| using security::SignedTokenPB; |
| using security::TokenPB; |
| using security::TokenSigner; |
| using security::TokenSigningPrivateKeyPB; |
| using security::TokenVerifier; |
| using security::TokenVerificationResult; |
| |
| class SecurityUnknownTskTest : public KuduTest { |
| public: |
| SecurityUnknownTskTest() |
| : num_tablet_servers_(3), |
| heartbeat_interval_ms_(100), |
| schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) { |
| |
| // Make the ts->master heartbeat interval shorter to run the test faster. |
| FLAGS_heartbeat_interval_ms = heartbeat_interval_ms_; |
| |
| // Within the scope of the same reactor thread, close an already established |
| // idle connection to the server and open a new one upon making another call |
| // to the same server. This is to force authn token verification at each RPC |
| // call: the authn token is verified by the server side during connection |
| // negotiation. This test uses the in-process InternalMiniCluster, this affects Kudu |
| // clients and the server components. In the context of this test, that's |
| // crucial only for the Kudu clients used in the tests. |
| FLAGS_rpc_reopen_outbound_connections = true; |
| } |
| |
| void SetUp() override { |
| KuduTest::SetUp(); |
| |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = num_tablet_servers_; |
| cluster_.reset(new InternalMiniCluster(env_, opts)); |
| ASSERT_OK(cluster_->Start()); |
| } |
| |
| void TearDown() override { |
| cluster_->Shutdown(); |
| } |
| |
| // Generate custom TSK. |
| static Status GenerateTsk(TokenSigningPrivateKeyPB* tsk, int64_t seq_num = 100) { |
| PrivateKey private_key; |
| int key_size = UseLargeKeys() ? 2048 : 512; |
| RETURN_NOT_OK(GeneratePrivateKey(key_size, &private_key)); |
| string private_key_str_der; |
| RETURN_NOT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER)); |
| tsk->set_rsa_key_der(private_key_str_der); |
| // Key sequence number should be high enough to be greater than sequence |
| // numbers of the TSKs generated by the master itself. |
| tsk->set_key_seq_num(seq_num); |
| tsk->set_expire_unix_epoch_seconds(WallTime_Now() + 3600); |
| return Status::OK(); |
| } |
| |
| // Generate authn token signed by the specified TSK. Use current client's |
| // authn token as a 'template' for the new one signed by the specified TSK. |
| Status GenerateAuthnToken(const shared_ptr<KuduClient>& client, |
| const TokenSigningPrivateKeyPB& tsk, |
| SignedTokenPB* new_signed_token) { |
| // Should be already connected to the cluster. |
| auto authn_token = client->data_->messenger_->authn_token(); |
| if (!authn_token) { |
| return Status::RuntimeError("client authn token is not set"); |
| } |
| |
| // 'Copy' the token data. The idea to is remove the signature from the signed |
| // token and sign the token with our custom TSK (see below). |
| TokenSigner* signer = cluster_->mini_master()->master()->token_signer(); |
| TokenPB token; |
| const TokenVerifier& verifier = signer->verifier(); |
| if (verifier.VerifyTokenSignature(*authn_token, &token) != |
| TokenVerificationResult::VALID) { |
| return Status::RuntimeError("current client authn token is not valid"); |
| } |
| |
| // Create an authn token, signing it with the custom TSK. |
| if (!token.SerializeToString(new_signed_token->mutable_token_data())) { |
| return Status::RuntimeError("failed to serialize token data"); |
| } |
| |
| TokenSigner forger(1, 1, 1); |
| RETURN_NOT_OK(forger.ImportKeys({ tsk })); |
| return forger.SignToken(new_signed_token); |
| } |
| |
| // Replace client's authn token with the specified one. |
| void ReplaceAuthnToken(KuduClient* client, const SignedTokenPB& token) { |
| client->data_->messenger_->set_authn_token(token); |
| } |
| |
| // Import the specified TSK into the master's TokenSigner. Once the TSK is |
| // imported, the master is able to verify authn tokens signed by the TSK. |
| // The tablet servers are able to verify corresponding authn tokens after |
| // they receive TSK from the master with tserver-->master heartbeat response. |
| Status ImportTsk(const TokenSigningPrivateKeyPB& tsk) { |
| TokenSigner* signer = cluster_->mini_master()->master()->token_signer(); |
| return signer->ImportKeys({ tsk }); |
| } |
| |
| protected: |
| const int num_tablet_servers_; |
| const int32_t heartbeat_interval_ms_; |
| const KuduSchema schema_; |
| unique_ptr<InternalMiniCluster> cluster_; |
| }; |
| |
| |
| // Tablet server sends back ERROR_UNAVAILABLE error code upon connection |
| // negotiation if it does not recognize the TSK which the client's authn token |
| // is signed with. The client should receive ServiceUnavailable status in that |
| // case and retry the operation. This test exercises some common subset of |
| // client-->master and client-->tserver RPCs. The test verifies both success |
| // and failure scenarios for the selected subset of RPCs. |
| TEST_F(SecurityUnknownTskTest, ErrorUnavailableCommonOperations) { |
| const string table_name = "security-unknown-tsk-itest"; |
| const int64_t timeout_seconds = 3; |
| |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient( |
| &KuduClientBuilder() |
| .default_admin_operation_timeout(MonoDelta::FromSeconds(timeout_seconds)) |
| .default_rpc_timeout(MonoDelta::FromSeconds(timeout_seconds)), |
| &client)); |
| |
| // Generate our custom TSK. |
| TokenSigningPrivateKeyPB tsk; |
| ASSERT_OK(GenerateTsk(&tsk)); |
| |
| // Create new authn token, signing it with the custom TSK. |
| SignedTokenPB new_signed_token; |
| ASSERT_OK(GenerateAuthnToken(client, tsk, &new_signed_token)); |
| |
| // Create and open a table: a proper table handle is necessary for further RPC |
| // calls to the tablet server. The table should consists of multiple tablets |
| // hosted by all available tablet servers, so the insert or scan requests are |
| // sent to all avaialble tablet servers. |
| unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(table_creator->table_name(table_name) |
| .set_range_partition_columns({ "key" }) |
| .add_hash_partitions({ "key" }, num_tablet_servers_) |
| .schema(&schema_) |
| .num_replicas(1) |
| .Create()); |
| ASSERT_OK(client->OpenTable(table_name, &table)); |
| |
| shared_ptr<KuduSession> session = client->NewSession(); |
| // We want to send the write batch to the server as soon as it's applied. |
| ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); |
| |
| // Insert a row into the table -- this is to populate the client's metadata |
| // cache so the client wont try to do that later while trying to re-insert the |
| // same data. If not doing that, then the Apply() call for the duplicate |
| // insert would not be sent to the tablet server: instead, it would be sent to |
| // the master server to find about location of the target tablet. The idea is |
| // to cover client-->tserver RPCs by this test as well. |
| { |
| unique_ptr<KuduInsert> ins(table->NewInsert()); |
| ASSERT_OK(ins->mutable_row()->SetInt32(0, -1)); |
| ASSERT_OK(ins->mutable_row()->SetInt32(1, -1)); |
| ASSERT_OK(session->Apply(ins.release())); |
| } |
| |
| // Replace the original authn token with the specially crafted one. From this |
| // point until importing the custom TSK into the master's verifier, the master |
| // and tablet servers should respond with ERROR_UNAVAILABLE because the client |
| // is about to present authn token signed with unknown TSK. |
| ReplaceAuthnToken(client.get(), new_signed_token); |
| |
| // Try to create the table again: this time the RPC shall not pass since the |
| // authn token has been replaced (but not because the table already exists). |
| { |
| const Status s = table_creator->table_name(table_name) |
| .set_range_partition_columns({ "key" }) |
| .schema(&schema_) |
| .num_replicas(1) |
| .Create(); |
| // The client automatically retries on getting ServiceUnavailable from the |
| // master. It will retry in vain until the operation times out. |
| const string err_msg = s.ToString(); |
| ASSERT_TRUE(s.IsTimedOut()) << err_msg; |
| ASSERT_STR_CONTAINS(err_msg, "CreateTable timed out after deadline expired"); |
| } |
| |
| { |
| shared_ptr<KuduTable> table; |
| const Status s = client->OpenTable(table_name, &table); |
| const string err_msg = s.ToString(); |
| ASSERT_TRUE(s.IsTimedOut()) << err_msg; |
| ASSERT_STR_CONTAINS(err_msg, "GetTableSchema timed out after deadline expired"); |
| } |
| |
| // Try to insert the same data which has been successfully inserted prior to |
| // replacing the authn token. The idea is to exercise client-->tablet server |
| // path: the meta-cache already contains information on the corresponding |
| // tablet server and the client will try to send RPCs to the tablet server |
| // directly, avoiding calls to the master server which would happen if the |
| // meta-cache did not contain the information on the tablet location. |
| { |
| unique_ptr<KuduInsert> ins(table->NewInsert()); |
| ASSERT_OK(ins->mutable_row()->SetInt32(0, -1)); |
| ASSERT_OK(ins->mutable_row()->SetInt32(1, -1)); |
| const Status s_apply = session->Apply(ins.release()); |
| // The error returned is a generic IOError, and the details are provided |
| // by the KuduSession::GetPendingErrors() method. |
| ASSERT_TRUE(s_apply.IsIOError()) << s_apply.ToString(); |
| ASSERT_STR_CONTAINS(s_apply.ToString(), "failed to flush data"); |
| |
| std::vector<KuduError*> errors; |
| ElementDeleter cleanup(&errors); |
| |
| session->GetPendingErrors(&errors, nullptr); |
| ASSERT_EQ(1, errors.size()); |
| ASSERT_NE(nullptr, errors[0]); |
| const Status& ps = errors[0]->status(); |
| const string err_msg = ps.ToString(); |
| // The client automatically retries on getting ServiceUnavailable from the |
| // tablet server. It will retry in vain until the operation times out. |
| ASSERT_TRUE(ps.IsTimedOut()) << err_msg; |
| ASSERT_STR_CONTAINS(err_msg, "Failed to write batch of 1 ops"); |
| } |
| |
| // Try opening a scanner. This should fail, timing out on retries. |
| { |
| KuduScanner scanner(table.get()); |
| ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY)); |
| ASSERT_OK(scanner.SetTimeoutMillis(1000)); |
| const Status s = scanner.Open(); |
| const string err_msg = s.ToString(); |
| ASSERT_TRUE(s.IsTimedOut()) << err_msg; |
| ASSERT_STR_CONTAINS(err_msg, "LookupRpc"); |
| } |
| |
| // In a separate thread, import our TSK into the master's TokenSigner. After |
| // importing, the TSK should propagate to the tablet servers and the client |
| // should be able to authenticate using its custom authn token. |
| thread importer( |
| [&]() { |
| SleepFor(MonoDelta::FromMilliseconds(timeout_seconds * 1000 / 5)); |
| CHECK_OK(ImportTsk(tsk)); |
| }); |
| |
| // An automatic clean-up to handle failure cases in the code below. |
| SCOPED_CLEANUP({ |
| importer.join(); |
| }); |
| |
| // The client should retry its operations until the masters and tablet servers |
| // get the necessary token verification key to verify our custom authn token. |
| for (int i = 0; i < num_tablet_servers_; ++i) { |
| unique_ptr<KuduInsert> ins(table->NewInsert()); |
| ASSERT_OK(ins->mutable_row()->SetInt32(0, i)); |
| ASSERT_OK(ins->mutable_row()->SetInt32(1, i)); |
| ASSERT_OK(session->Apply(ins.release())); |
| } |
| |
| // Run a scan to verify the number of inserted rows. |
| EXPECT_EQ(num_tablet_servers_ + 1, client::CountTableRows(table.get())); |
| } |
| |
| // Replace client's authn token while running a workload which includes creating |
| // a table, inserting data and reading it back. With huge number of runs, |
| // this gives coverage of ERROR_UNAVAILABLE handling for all RPC calls involved |
| // in the workload scenario. |
| TEST_F(SecurityUnknownTskTest, ErrorUnavailableDuringWorkload) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| static const int64_t kTimeoutMs = 20 * 1000; |
| int64_t tsk_seq_num = 100; |
| // Targeting the total runtime to be less than 3 minutes, and in most cases |
| // less than 2 minutes. Sometimes a cycle might take two and in rare cases |
| // up to three timeout intervals to complete. |
| for (int i = 0; i < 3; ++i) { |
| TestWorkload w(cluster_.get()); |
| w.set_num_tablets(num_tablet_servers_); |
| w.set_num_replicas(1); |
| w.set_num_read_threads(2); |
| w.set_num_write_threads(2); |
| w.set_write_batch_size(4096); |
| w.set_client_default_rpc_timeout_millis(kTimeoutMs); |
| w.set_read_timeout_millis(kTimeoutMs); |
| w.set_write_timeout_millis(kTimeoutMs); |
| |
| auto client = w.CreateClient(); |
| atomic<bool> importer_do_run(true); |
| thread importer( |
| [&]() { |
| // See below for the explanation. |
| MonoTime sync_point = MonoTime::Now(); |
| |
| while (importer_do_run) { |
| // Generate our custom TSK. |
| TokenSigningPrivateKeyPB tsk; |
| |
| // The master's TokenSigner might be generating TSKs in the background |
| // according to its own schedule. The master's TokenSigner increments |
| // the sequence number by 1 for every new TSK generated. To avoid TSK |
| // sequence number collisions, it's necessary to increment the sequence |
| // number for our custom TSKs more aggressively. |
| tsk_seq_num += 10; |
| CHECK_OK(GenerateTsk(&tsk, tsk_seq_num)); |
| |
| // Create new authn token, signing it with the custom TSK. |
| SignedTokenPB new_signed_token; |
| CHECK_OK(GenerateAuthnToken(client, tsk, &new_signed_token)); |
| |
| ReplaceAuthnToken(client.get(), new_signed_token); |
| // From now till the call of ImportTsk() the system is unaware of the |
| // custom TSK key and the token signed with it cannot be verified. |
| SleepFor(MonoDelta::FromMilliseconds(rand() % 5 + 5)); |
| CHECK_OK(ImportTsk(tsk)); |
| |
| // After the ImportTsk() call, the public part of the TSK needs to |
| // reach tablet servers so they could verify the custom authn token. |
| // The delay is more than the minimum required heartbeat_inteval_ms_ |
| // to allow for completion of pending operations when retrying with |
| // the 'exponential back-off' policy. In addition, some clients might |
| // be long in the retry sequence, not being able to catch up with the |
| // rate of the token replacement: they need to wake up and make the |
| // retry call when current TSK is known to the system. Due to the |
| // exponential back-off algorithm of the retry sequence, there might |
| // be some clients sleeping for up to ~5 seconds between retries. |
| // To avoid timing out in such situations, every timeout interval |
| // a 'sync point' happens, so the long-sleeping clients are able to |
| // complete their operations. The kSyncSleepIntervalMs is little over |
| // the necessary ~5 seconds to avoid test flakiness on slow VMs. |
| static const int64_t kSyncSleepIntervalMs = 7500; |
| bool is_sync_point = (sync_point + MonoDelta::FromMilliseconds( |
| kTimeoutMs - kSyncSleepIntervalMs) <= MonoTime::Now()); |
| const int64_t sleep_time_ms = is_sync_point |
| ? kSyncSleepIntervalMs : 5 * heartbeat_interval_ms_; |
| SleepFor(MonoDelta::FromMilliseconds(sleep_time_ms)); |
| if (is_sync_point) { |
| sync_point = MonoTime::Now(); |
| } |
| } |
| }); |
| |
| w.Setup(); |
| w.Start(); |
| |
| // Let the workload run for some time. |
| const int64_t halfRun = kTimeoutMs / 2; |
| SleepFor(MonoDelta::FromMilliseconds(rand() % halfRun + halfRun)); |
| |
| w.StopAndJoin(); |
| CHECK_OK(w.Cleanup()); |
| |
| importer_do_run = false; |
| importer.join(); |
| } |
| } |
| |
| } // namespace kudu |