blob: 806c6260fd934025465b5c64b7d650f391448030 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/coordinator.h"
#include "common/names.h"
#include "common/object-pool.h"
#include "runtime/coordinator-backend-state.h"
#include "testutil/gtest-util.h"
#include "util/network-util.h"
#include "util/runtime-profile.h"
DECLARE_uint64(release_backend_states_delay_ms);
namespace impala {
class CoordinatorBackendStateTest : public testing::Test {
protected:
// Amount of time to wait, in milliseconds, to trigger the 'Timed Release' condition.
const int64_t timeout_release_ms_ = 1.5 * FLAGS_release_backend_states_delay_ms;
// Pool for objects to be destroyed during test teardown.
ObjectPool pool_;
RuntimeProfile* profile_ = nullptr;
virtual void SetUp() { profile_ = RuntimeProfile::Create(&pool_, "pool1"); }
/// Utility function to create a dummy QuerySchedule.
void MakeQuerySchedule(QuerySchedule** query_schedule) {
TUniqueId* query_id = pool_.Add(new TUniqueId());
TQueryExecRequest* request = pool_.Add(new TQueryExecRequest());
TQueryOptions* query_options = pool_.Add(new TQueryOptions());
*query_schedule =
pool_.Add(new QuerySchedule(*query_id, *request, *query_options, profile_));
}
/// Utility function to create a specified number of dummy BackendStates and add them
/// to the supplied vector. 'coordinator_backend' points to the Backend running the
/// Coordinator. The 'coordinator_backend' is added to the supplied vector as well.
void MakeBackendStates(int num_states, QuerySchedule* query_schedule,
Coordinator::BackendState** coordinator_backend,
std::vector<Coordinator::BackendState*>* backend_states) {
TQueryCtx* query_ctx = pool_.Add(new TQueryCtx());
PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
for (int i = 0; i < num_states; ++i) {
TNetworkAddress addr = MakeNetworkAddress(strings::Substitute("host-$0", i), 25000);
BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
per_backend_exec_params->emplace(addr, *backend_exec_params);
Coordinator::BackendState* backend_state = pool_.Add(new Coordinator::BackendState(
*query_schedule, *query_ctx, 0, TRuntimeFilterMode::OFF, *backend_exec_params));
backend_states->push_back(backend_state);
// Mark the first BackendState as the Coordinator Backend.
if (i == 0) {
backend_exec_params->is_coord_backend = true;
*coordinator_backend = backend_state;
}
}
}
virtual void TearDown() { pool_.Clear(); }
};
/// Validate the state machine of BackendResourceState by testing that the following
/// conditions hold:
/// * All BackendStates are initially marked as IN_USE.
/// * Releasing all Backends transitions all Backends to the RELEASED state.
TEST_F(CoordinatorBackendStateTest, StateMachine) {
// Initialize a BackendResourceState with three BackendStates.
int num_backends = 3;
QuerySchedule* query_schedule = nullptr;
MakeQuerySchedule(&query_schedule);
std::vector<Coordinator::BackendState*> backend_states;
Coordinator::BackendState* coordinator_backend = nullptr;
MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
Coordinator::BackendResourceState* backend_resource_state =
pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
// Assert that all BackendStates are initially in the IN_USE state.
ASSERT_EQ(backend_resource_state->backend_resource_states_.size(), num_backends);
for (auto backend_state : backend_resource_state->backend_resource_states_) {
ASSERT_EQ(
backend_state.second, Coordinator::BackendResourceState::ResourceState::IN_USE);
}
// Assert that releasing an empty vector of BackendStates does not transition any
// BackendState to the RELEASED state.
std::vector<Coordinator::BackendState*> empty_backend_states;
backend_resource_state->BackendsReleased(empty_backend_states);
ASSERT_TRUE(empty_backend_states.empty());
ASSERT_EQ(backend_resource_state->backend_resource_states_.size(), num_backends);
for (auto backend_state : backend_resource_state->backend_resource_states_) {
ASSERT_EQ(
backend_state.second, Coordinator::BackendResourceState::ResourceState::IN_USE);
}
// Assert that releasing all BackendStates transitions all BackendStates from
// IN_USE to RELEASED.
backend_resource_state->BackendsReleased(backend_states);
std::vector<Coordinator::BackendState*> unreleased_backend_states =
backend_resource_state->CloseAndGetUnreleasedBackends();
ASSERT_EQ(unreleased_backend_states.size(), 0);
}
/// Validate the 'Coordinator Only' heuristic.
TEST_F(CoordinatorBackendStateTest, CoordinatorOnly) {
// Initialize a BackendResourceState with eight BackendStates.
int num_backends = 8;
QuerySchedule* query_schedule = nullptr;
MakeQuerySchedule(&query_schedule);
std::vector<Coordinator::BackendState*> backend_states;
Coordinator::BackendState* coordinator_backend = nullptr;
MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
Coordinator::BackendResourceState* backend_resource_state =
pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
// Create a vector of non-Coordinator Backends.
std::vector<Coordinator::BackendState*> non_coord_backend_states;
for (auto backend_state : backend_states) {
if (!backend_state->exec_params()->is_coord_backend) {
non_coord_backend_states.push_back(backend_state);
}
}
ASSERT_EQ(non_coord_backend_states.size(), num_backends - 1);
// Release all but the last non-Coordinator Backend. Each call to MarkBackendFinished
// should not cause any PENDING Backends to transition to RELEASABLE because the
// 'Timeout Release' heuristic should not have be triggered.
std::vector<Coordinator::BackendState*> releasable_backend_states;
for (int i = 0; i < non_coord_backend_states.size() - 1; ++i) {
backend_resource_state->MarkBackendFinished(
non_coord_backend_states[i], &releasable_backend_states);
ASSERT_EQ(releasable_backend_states.size(), 0);
}
// Release the last non-Coordinator Backend, this should trigger all PENDING Bbackends
// to transition to the RELEASABLE state.
backend_resource_state->MarkBackendFinished(
non_coord_backend_states[non_coord_backend_states.size() - 1],
&releasable_backend_states);
// Assert that all non-Coordinator Backends are RELEASABLE and then mark them as
// RELEASED.
ASSERT_EQ(releasable_backend_states.size(), non_coord_backend_states.size());
backend_resource_state->BackendsReleased(releasable_backend_states);
// Release the remaining BackendStates.
std::vector<Coordinator::BackendState*> unreleased_backend_states =
backend_resource_state->CloseAndGetUnreleasedBackends();
ASSERT_EQ(unreleased_backend_states.size(), 1);
backend_resource_state->BackendsReleased(unreleased_backend_states);
}
/// Validate the 'Timed Release' heuristic.
TEST_F(CoordinatorBackendStateTest, TimedRelease) {
// Initialize a BackendResourceState with eight BackendStates.
int num_backends = 8;
QuerySchedule* query_schedule = nullptr;
MakeQuerySchedule(&query_schedule);
std::vector<Coordinator::BackendState*> backend_states;
Coordinator::BackendState* coordinator_backend = nullptr;
MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
Coordinator::BackendResourceState* backend_resource_state =
pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
// Sleep until the 'Timed Release' timeout is hit.
SleepForMs(timeout_release_ms_);
// Mark half of the BackendStates as finished. Marking all Backends up to
// num_backends / 2 - 1 should result in 0 releasable Backends, and marking the
// num_backends / 2 Backend should result in num_backends / 2 releasable Backends.
std::vector<Coordinator::BackendState*> releasable_backend_states;
for (int i = 0; i < num_backends / 2 - 1; ++i) {
backend_resource_state->MarkBackendFinished(
backend_states.at(i), &releasable_backend_states);
ASSERT_EQ(releasable_backend_states.size(), 0);
}
backend_resource_state->MarkBackendFinished(
backend_states.at(num_backends / 2 - 1), &releasable_backend_states);
// Assert that half of the BackendStates transitioned to RELEASABLE.
ASSERT_EQ(releasable_backend_states.size(), num_backends / 2);
// Release half of the BackendStates.
backend_resource_state->BackendsReleased(releasable_backend_states);
// Mark the remaining half of the BackendStates as finished and assert that no Backends
// transition to RELEASABLE (the 'Timed Release' timeout should not be hit so no
// backends should be released).
releasable_backend_states.clear();
for (int i = num_backends / 2; i < num_backends / 2 + num_backends / 4; ++i) {
backend_resource_state->MarkBackendFinished(
backend_states.at(i), &releasable_backend_states);
ASSERT_EQ(releasable_backend_states.size(), 0);
}
// Release the remaining BackendStates.
std::vector<Coordinator::BackendState*> unreleased_backend_states =
backend_resource_state->CloseAndGetUnreleasedBackends();
ASSERT_EQ(unreleased_backend_states.size(), num_backends / 2);
backend_resource_state->BackendsReleased(unreleased_backend_states);
}
/// Validate the 'Batched Release' heuristic.
TEST_F(CoordinatorBackendStateTest, BatchedRelease) {
// Initialize a BackendResouceState with 128 BackendStates.
int num_backends = 128;
QuerySchedule* query_schedule = nullptr;
MakeQuerySchedule(&query_schedule);
std::vector<Coordinator::BackendState*> backend_states;
Coordinator::BackendState* coordinator_backend = nullptr;
MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
Coordinator::BackendResourceState* backend_resource_state =
pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
// Sleep until the 'Timed Release' timeout is hit.
SleepForMs(timeout_release_ms_);
// Mark (num_backends / 2 - 1) of the BackendStates as finished. Marking all Backends
// up to num_backends / 2 - 1 should result in 0 releasable Backends, and marking the
// num_backends / 2 backend should result in num_backends / 2 releasable Backends.
std::vector<Coordinator::BackendState*> releasable_backend_states;
for (int i = 0; i < num_backends / 2 - 1; ++i) {
backend_resource_state->MarkBackendFinished(
backend_states.at(i), &releasable_backend_states);
ASSERT_EQ(releasable_backend_states.size(), 0);
}
backend_resource_state->MarkBackendFinished(
backend_states.at(num_backends / 2 - 1), &releasable_backend_states);
// Assert that (num_backends / 2) of the BackendStates transitioned to RELEASABLE.
ASSERT_EQ(releasable_backend_states.size(), num_backends / 2);
// Release half of the BackendStates.
backend_resource_state->BackendsReleased(releasable_backend_states);
// Sleep until the 'Timed Release' timeout is hit.
SleepForMs(timeout_release_ms_);
// Mark (num_backends / 4 - 1) of the BackendStates as finished.
releasable_backend_states.clear();
for (int i = num_backends / 2; i < num_backends / 2 + num_backends / 4 - 1; ++i) {
backend_resource_state->MarkBackendFinished(
backend_states.at(i), &releasable_backend_states);
ASSERT_EQ(releasable_backend_states.size(), 0);
}
backend_resource_state->MarkBackendFinished(
backend_states.at(num_backends / 2 + num_backends / 4 - 1),
&releasable_backend_states);
// Assert that (num_backends / 4) of the BackendStates transitioned to RELEASABLE.
ASSERT_EQ(releasable_backend_states.size(), num_backends / 4);
// Release a fourth of the BackendStates.
backend_resource_state->BackendsReleased(releasable_backend_states);
// Release the remaining BackendStates
std::vector<Coordinator::BackendState*> unreleased_backend_states =
backend_resource_state->CloseAndGetUnreleasedBackends();
ASSERT_EQ(unreleased_backend_states.size(),
num_backends - num_backends / 2 - num_backends / 4);
backend_resource_state->BackendsReleased(unreleased_backend_states);
}
} // namespace impala