// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/coordinator-backend-state.h"

#include "common/names.h"

using namespace impala;

DEFINE_uint32_hidden(batched_release_decay_factor, 2,
    "The exponential decay factor for the 'Batched Release' of backends. The default of "
    "2 ensures that the number of times resources are released is bounded by O(log2(n)). "
    "Setting this to another value, such as 10, would bound the number of times "
    "resources are released by O(log10(n)).");
DEFINE_uint64_hidden(release_backend_states_delay_ms, 1000,
    "The timeout for the 'Timed Release' of backends. If more than this many "
    "milliseconds has passed since the last time any Backends were released, then "
    "release all pending Backends. Set to 1000 milliseconds by default.");

DECLARE_uint32(batched_release_decay_factor);
DECLARE_uint64(release_backend_states_delay_ms);

Coordinator::BackendResourceState::BackendResourceState(
    const vector<BackendState*>& backend_states, const QuerySchedule& schedule)
  : num_in_use_(backend_states.size()),
    backend_states_(backend_states),
    num_backends_(backend_states.size()),
    schedule_(schedule),
    release_backend_states_delay_ns_(FLAGS_release_backend_states_delay_ms * 1000000),
    batched_release_decay_value_(FLAGS_batched_release_decay_factor) {
  DCHECK_GT(batched_release_decay_value_, 0)
      << "Invalid value for --batched_release_decay_factor, must greater than 0";
  // Populate the backend_resource_states_ map and mark all BackendStates as
  // IN_USE.
  for (auto backend_state : backend_states_) {
    backend_resource_states_[backend_state] = ResourceState::IN_USE;
  }
  // Start the 'Timed Release' timer.
  released_timer_.Start();
}

Coordinator::BackendResourceState::~BackendResourceState() {
  // Assert that all BackendStates have been released. We use num_backends_ instead of
  // backend_states_.size() so backend_states_ does not need to be alive when the
  // destructor runs.
  DCHECK(closed_);
  DCHECK_EQ(num_released_, num_backends_);
}

void Coordinator::BackendResourceState::MarkBackendFinished(
    BackendState* backend_state, vector<BackendState*>* releasable_backend_states) {
  lock_guard<SpinLock> lock(lock_);
  if (!closed_
      && backend_resource_states_.at(backend_state) == ResourceState::IN_USE) {
    // Transition the BackendState to PENDING and update any related counters.
    backend_resource_states_.at(backend_state) = ResourceState::PENDING;
    ++num_pending_;
    --num_in_use_;

    // If the coordinator backend has not been released, but all other have, then the only
    // running Backend must be the coordinator. The Coordinator fragment should buffer
    // enough rows to allow all other fragments to be released (if result spooling is
    // enabled this is especially true, but even without spooling many queries have a
    // coordinator fragment that buffers multiple RowBatches). If the client does not
    // fetch all rows immediately, then the Coordinator Backend will be long lived
    // compared to the rest of the Backends.
    bool is_coordinator_the_last_unreleased_backend =
        !released_coordinator_ && num_in_use_ == 1;

    // True if the 'Timed Release' heuristic should be triggered.
    bool release_backends_timeout_expired =
        released_timer_.ElapsedTime() > release_backend_states_delay_ns_;

    // True if the 'Batched Release' heuristic should be triggered.
    bool unreleased_backend_threshold_reached = num_pending_
        >= std::max(floor(num_backends_ / batched_release_decay_value_), 1.0);

    // If no Backends are running or if only the Coordinator Backend is running or if both
    // the 'Timed Release' and 'Batched Release' heuristic are true, then transition all
    // PENDING BackendStates to RELEASABLE and update any state necessary for the
    // heuristics.
    if (is_coordinator_the_last_unreleased_backend
        || (release_backends_timeout_expired && unreleased_backend_threshold_reached)) {
      released_timer_.Reset();
      batched_release_decay_value_ *= FLAGS_batched_release_decay_factor;
      for (auto backend_resource_state : backend_resource_states_) {
        if (backend_resource_state.second == ResourceState::PENDING) {
          releasable_backend_states->push_back(backend_resource_state.first);
          backend_resource_states_[backend_resource_state.first] =
              ResourceState::RELEASABLE;
          --num_pending_;
        }
      }
      DCHECK_GE(num_pending_, 0);
    }
  }
}

void Coordinator::BackendResourceState::BackendsReleased(
    const vector<BackendState*>& released_backend_states) {
  lock_guard<SpinLock> lock(lock_);
  // Mark all given BackendStates as RELEASED. A BackendState must be either RELEASABLE
  // or IN_USE before it can marked as RELEASED.
  for (auto backend_state : released_backend_states) {
    DCHECK_NE(backend_resource_states_.at(backend_state), ResourceState::RELEASED);
    if (backend_resource_states_.at(backend_state) == ResourceState::IN_USE) {
      --num_in_use_;
    }
    backend_resource_states_.at(backend_state) = ResourceState::RELEASED;
    // If the Backend running the Coordinator has completed and been released, set
    // released_coordinator_ to true.
    if (backend_state->exec_params()->is_coord_backend) {
      released_coordinator_ = true;
    }
  }
  num_released_ += released_backend_states.size();
}

vector<Coordinator::BackendState*>
Coordinator::BackendResourceState::CloseAndGetUnreleasedBackends() {
  lock_guard<SpinLock> lock(lock_);
  vector<BackendState*> unreleased_backend_states;
  for (auto backend_resource_state : backend_resource_states_) {
    if (backend_resource_state.second == ResourceState::IN_USE
        || backend_resource_state.second == ResourceState::PENDING) {
      unreleased_backend_states.push_back(backend_resource_state.first);
    }
  }
  closed_ = true;
  return unreleased_backend_states;
}
