// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
#define IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H

#include "codegen/impala-ir.h"
#include "common/object-pool.h"
#include "gen-cpp/data_stream_service.pb.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/mem-pool.h"
#include "runtime/types.h"
#include "util/runtime-profile.h"

#include <boost/scoped_ptr.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/unordered_map.hpp>

#include <condition_variable>

namespace kudu {
namespace rpc {
class RpcContext;
class RpcController;
} // namespace rpc
} // namespace kudu

namespace impala {

class BloomFilter;
class MemTracker;
class MinMaxFilter;
class RuntimeFilter;
class RuntimeState;
class TRuntimeFilterDesc;
class TQueryCtx;

/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
/// predicates across the plan tree dynamically. Each fragment instance manages its
/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
/// objects and data structures.
///
/// A RuntimeFilterBank manages both production and consumption of filters. In the case
/// where a given filter is both consumed and produced by the same fragment, the
/// RuntimeFilterBank treats each filter independently.
///
/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
/// fragments update the filters by calling UpdateFilterFromLocal() (which may only be
/// called once per filter ID per filter bank), with either a bloom filter or a min-max
/// filter, depending on the filter's type. The 'bloom_filter' or 'min_max_filter' that is
/// passed into UpdateFilterFromLocal() must have been allocated by
/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
/// associated with filters.
///
/// Filters are aggregated at the coordinator, and then made available to consumers after
/// PublishGlobalFilter() has been called.
///
/// After PublishGlobalFilter() has been called (and again, it may only be called once per
/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
/// bloom_filter or min_max_filter, and may be used for filter evaluation. This
/// operation occurs without synchronisation, and neither the thread that calls
/// PublishGlobalFilter() nor the thread that may call RuntimeFilter::Eval() need to
/// coordinate in any way.
class RuntimeFilterBank {
 public:
  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
      long total_filter_mem_required);

  /// Initialize 'buffer_pool_client_' and claim the initial reservation. The client is
  /// automatically cleaned up in Close(). Should not be called if the client is already
  /// open.
  ///
  /// Must return the initial reservation to QueryState::initial_reservations(), which is
  /// done automatically in Close() as long as the initial reservation is not released
  /// before Close().
  Status ClaimBufferReservation() WARN_UNUSED_RESULT;

  /// Registers a filter that will either be produced (is_producer == false) or consumed
  /// (is_producer == true) by fragments that share this RuntimeState. The filter
  /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
  RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);

  /// Updates a filter's 'bloom_filter' or 'min_max_filter' which has been produced by
  /// some operator in the local fragment instance. At most one of 'bloom_filter' and
  /// 'min_max_filter' may be non-NULL, depending on the filter's type. They may both be
  /// NULL, representing a filter that allows all rows to pass.
  void UpdateFilterFromLocal(
      int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);

  /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
  /// consumption by operators that wish to use it for filtering.
  void PublishGlobalFilter(
      const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);

  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
  /// 'filter_size' would have an expected false-positive rate which would exceed
  /// FLAGS_max_filter_error_rate.
  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);

  /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
  /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
  /// clients and subsequently accessed without synchronization. Concurrent calls to
  /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
  /// need for client synchronization.
  inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);

  /// Returns a bloom_filter that can be used by an operator to produce a local filter,
  /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
  /// the RuntimeFilterBank and should not be deleted by the caller. The filter identified
  /// by 'filter_id' must have been previously registered as a 'producer' by
  /// RegisterFilter().
  ///
  /// If memory allocation for the filter fails, or if Close() has been called first,
  /// returns NULL.
  BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);

  /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
  MinMaxFilter* AllocateScratchMinMaxFilter(int32_t filter_id, ColumnType type);

  /// Default hash seed to use when computing hashed values to insert into filters.
  static int32_t IR_ALWAYS_INLINE DefaultHashSeed() { return 1234; }

  /// Called to signal that the query is being cancelled. Wakes up any threads blocked
  /// waiting for filters to allow them to finish.
  void Cancel();

  /// Releases all memory allocated for BloomFilters.
  void Close();

  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
  static const int64_t MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;  // 512MB

 private:
  /// Implementation of Cancel(). 'runtime_filter_lock_' must be held by caller.
  void CancelLocked();

  /// Lock protecting produced_filters_ and consumed_filters_.
  boost::mutex runtime_filter_lock_;

  /// Map from filter id to a RuntimeFilter.
  typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;

  /// All filters expected to be produced by the local plan fragment instance.
  RuntimeFilterMap produced_filters_;

  /// All filters expected to be consumed by the local plan fragment instance.
  RuntimeFilterMap consumed_filters_;

  /// Lock protecting 'num_inflight_rpcs_' and it should not be taken at the same
  /// time as runtime_filter_lock_.
  SpinLock num_inflight_rpcs_lock_;
  /// Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
  /// KRPC calls to prevent the memory pointed to by a BloomFilter* being
  /// deallocated in RuntimeFilterBank::Close() before all KRPC calls have
  /// been completed.
  int32_t num_inflight_rpcs_ = 0;
  std::condition_variable_any krpcs_done_cv_;

  /// Fragment instance's runtime state.
  RuntimeState* state_;

  /// Object pool to track allocated Bloom filters.
  ObjectPool obj_pool_;

  /// MemTracker to track Bloom filter memory.
  boost::scoped_ptr<MemTracker> filter_mem_tracker_;

  /// True iff Cancel() or Close() has been called. Protected by 'runtime_filter_lock_'.
  bool cancelled_ = false;

  /// True iff Close() has been called. Used to prevent races between
  /// AllocateScratchBloomFilter() and Close().
  bool closed_;

  /// Total amount of memory allocated to Bloom Filters
  RuntimeProfile::Counter* bloom_memory_allocated_;

  /// Total amount of memory required by the bloom filters as calculated by the planner.
  long total_bloom_filter_mem_required_;

  /// Contains references to all the bloom filters generated. Used in Close() to safely
  /// release all memory allocated for BloomFilters.
  vector<BloomFilter*> bloom_filters_;

  /// Contains references to all the min-max filters generated. Used in Close() to safely
  /// release all memory allocated for MinMaxFilters.
  vector<MinMaxFilter*> min_max_filters_;

  /// Buffer pool client for the filter bank. Initialized with the required reservation
  /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
  /// pool in Close().
  BufferPool::ClientHandle buffer_pool_client_;

  /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
  /// UpdateFilterFromLocal().
  void UpdateFilterCompleteCb(
      const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
};

}

#endif
