// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/runtime-filter-bank.h"

#include <boost/algorithm/string/join.hpp>

#include "gen-cpp/ImpalaInternalService_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
#include "runtime/backend-client.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
#include "util/min-max-filter.h"
#include "util/pretty-printer.h"

#include "common/names.h"

using namespace impala;
using namespace boost;
using namespace strings;

DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
    "positives in a runtime filter before it is disabled.");

const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;

RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
    long total_filter_mem_required)
  : state_(state),
    filter_mem_tracker_(
        new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
    closed_(false),
    total_bloom_filter_mem_required_(total_filter_mem_required) {
  bloom_memory_allocated_ =
      state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
}

Status RuntimeFilterBank::ClaimBufferReservation() {
  DCHECK(!buffer_pool_client_.is_registered());
  string filter_bank_name = Substitute(
      "RuntimeFilterBank (Fragment Id: $0)", PrintId(state_->fragment_instance_id()));
  RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(filter_bank_name,
      state_->query_state()->file_group(), state_->instance_buffer_reservation(),
      filter_mem_tracker_.get(), total_bloom_filter_mem_required_,
      state_->runtime_profile(), &buffer_pool_client_));
  VLOG_FILE << filter_bank_name << " claiming reservation "
            << total_bloom_filter_mem_required_;
  state_->query_state()->initial_reservations()->Claim(
      &buffer_pool_client_, total_bloom_filter_mem_required_);
  return Status::OK();
}

RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
    bool is_producer) {
  RuntimeFilter* ret = nullptr;
  lock_guard<mutex> l(runtime_filter_lock_);
  if (is_producer) {
    DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
    ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
    produced_filters_[filter_desc.filter_id] = ret;
  } else {
    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
      ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
      // The filter bank may have already been cancelled. In that case, still allocate the
      // filter but cancel it immediately, so that callers of RuntimeFilterBank don't need
      // to have separate handling of that case.
      if (cancelled_) ret->Cancel();
      consumed_filters_[filter_desc.filter_id] = ret;
      VLOG(2) << "registered consumer filter " << filter_desc.filter_id;
    } else {
      // The filter has already been registered in this filter bank by another
      // target node.
      DCHECK_GT(filter_desc.targets.size(), 1);
      ret = consumed_filters_[filter_desc.filter_id];
      VLOG_QUERY << "re-registered consumer filter " << filter_desc.filter_id;
    }
  }
  return ret;
}

namespace {

/// Sends a filter to the coordinator. Executed asynchronously in the context of
/// ExecEnv::rpc_pool().
void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
    ImpalaBackendClientCache* client_cache) {
  Status status;
  ImpalaBackendConnection coord(client_cache, address, &status);
  if (!status.ok()) {
    // Failing to send a filter is not a query-wide error - the remote fragment will
    // continue regardless.
    // TODO: Retry.
    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
    return;
  }
  TUpdateFilterResult res;
  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
}

}

void RuntimeFilterBank::UpdateFilterFromLocal(
    int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
  DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
      << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
  TUpdateFilterParams params;
  // A runtime filter may have both local and remote targets.
  bool has_local_target = false;
  bool has_remote_target = false;
  TRuntimeFilterType::type type;
  {
    lock_guard<mutex> l(runtime_filter_lock_);
    RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
    DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
                                          << filter_id;
    it->second->SetFilter(bloom_filter, min_max_filter);
    has_local_target = it->second->filter_desc().has_local_targets;
    has_remote_target = it->second->filter_desc().has_remote_targets;
    type = it->second->filter_desc().type;
  }

  if (has_local_target) {
    // Do a short circuit publication by pushing the same filter to the consumer side.
    RuntimeFilter* filter;
    {
      lock_guard<mutex> l(runtime_filter_lock_);
      RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
      if (it == consumed_filters_.end()) return;
      filter = it->second;
    }
    filter->SetFilter(bloom_filter, min_max_filter);
    state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
        PrettyPrinter::Print(filter->arrival_delay_ms(), TUnit::TIME_MS));
  }

  if (has_remote_target
      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
    params.__set_filter_id(filter_id);
    params.__set_query_id(state_->query_id());
    if (type == TRuntimeFilterType::BLOOM) {
      BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
      params.__isset.bloom_filter = true;
    } else {
      DCHECK(type == TRuntimeFilterType::MIN_MAX);
      min_max_filter->ToThrift(&params.min_max_filter);
      params.__isset.min_max_filter = true;
    }

    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
        ExecEnv::GetInstance()->impalad_client_cache()));
  }
}

void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
  lock_guard<mutex> l(runtime_filter_lock_);
  if (closed_) return;
  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
  DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
                                        << params.filter_id;

  BloomFilter* bloom_filter = nullptr;
  MinMaxFilter* min_max_filter = nullptr;
  if (it->second->is_bloom_filter()) {
    DCHECK(params.__isset.bloom_filter);
    if (params.bloom_filter.always_true) {
      bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
    } else {
      int64_t required_space =
          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
      DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
          << "BufferPool Client should have enough reservation to fulfill bloom filter "
             "allocation";
      bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
      Status status = bloom_filter->Init(params.bloom_filter);
      if (!status.ok()) {
        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
                   << status.GetDetail();
        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
      } else {
        bloom_filters_.push_back(bloom_filter);
        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
      }
    }
  } else {
    DCHECK(it->second->is_min_max_filter());
    DCHECK(params.__isset.min_max_filter);
    min_max_filter = MinMaxFilter::Create(
        params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
    min_max_filters_.push_back(min_max_filter);
  }
  it->second->SetFilter(bloom_filter, min_max_filter);
  state_->runtime_profile()->AddInfoString(
      Substitute("Filter $0 arrival", params.filter_id),
      PrettyPrinter::Print(it->second->arrival_delay_ms(), TUnit::TIME_MS));
}

BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
  lock_guard<mutex> l(runtime_filter_lock_);
  if (closed_) return nullptr;

  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";

  // Track required space
  int64_t log_filter_size = BitUtil::Log2Ceiling64(it->second->filter_size());
  int64_t required_space = BloomFilter::GetExpectedMemoryUsed(log_filter_size);
  DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
      << "BufferPool Client should have enough reservation to fulfill bloom filter "
         "allocation";
  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
  Status status = bloom_filter->Init(log_filter_size);
  if (!status.ok()) {
    LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
    return nullptr;
  }
  bloom_filters_.push_back(bloom_filter);
  DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
  bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
  return bloom_filter;
}

MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
    int32_t filter_id, ColumnType type) {
  lock_guard<mutex> l(runtime_filter_lock_);
  if (closed_) return nullptr;

  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";

  MinMaxFilter* min_max_filter =
      MinMaxFilter::Create(type, &obj_pool_, filter_mem_tracker_.get());
  min_max_filters_.push_back(min_max_filter);
  return min_max_filter;
}

bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
  double fpp =
      BloomFilter::FalsePositiveProb(observed_ndv, BitUtil::Log2Ceiling64(filter_size));
  return fpp > FLAGS_max_filter_error_rate;
}

void RuntimeFilterBank::Cancel() {
  lock_guard<mutex> l(runtime_filter_lock_);
  CancelLocked();
}

void RuntimeFilterBank::CancelLocked() {
  if (cancelled_) return;
  // Cancel all filters that a thread might be waiting on.
  for (auto& entry : consumed_filters_) entry.second->Cancel();
  cancelled_ = true;
}

void RuntimeFilterBank::Close() {
  lock_guard<mutex> l(runtime_filter_lock_);
  CancelLocked();
  closed_ = true;
  for (BloomFilter* filter : bloom_filters_) filter->Close();
  for (MinMaxFilter* filter : min_max_filters_) filter->Close();
  obj_pool_.Clear();
  if (buffer_pool_client_.is_registered()) {
    VLOG_FILE << "RuntimeFilterBank (Fragment Id: "
              << PrintId(state_->fragment_instance_id())
              << ") returning reservation " << total_bloom_filter_mem_required_;
    state_->query_state()->initial_reservations()->Return(
        &buffer_pool_client_, total_bloom_filter_mem_required_);
    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
  }
  DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
  filter_mem_tracker_->Close();
}
