blob: 618788cd203e36b92405cca8806da262092725de [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/thread/mutex.hpp>
#include "runtime/raw-value.h"
#include "runtime/runtime-filter-bank.h"
#include "util/bloom-filter.h"
#include "util/condition-variable.h"
#include "util/time.h"
namespace impala {
class BloomFilter;
/// RuntimeFilters represent set-membership predicates that are computed during query
/// execution (rather than during planning). They can then be sent to other operators to
/// reduce their output. For example, a RuntimeFilter might compute a predicate
/// corresponding to set membership, where the members of that set can only be computed at
/// runtime (for example, the distinct values of the build side of a hash table). Other
/// plan nodes can use that predicate by testing for membership of that set to filter rows
/// early on in the plan tree (e.g. the scan that feeds the probe side of that join node
/// could eliminate rows from consideration for join matching).
/// A RuntimeFilter may compute its set-membership predicate as a bloom filters or a
/// min-max filter, depending on its filter description.
class RuntimeFilter {
RuntimeFilter(const TRuntimeFilterDesc& filter, int64_t filter_size)
: bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
registration_time_(MonotonicMillis()), arrival_time_(0L),
filter_size_(filter_size) {
DCHECK(filter_desc_.type == TRuntimeFilterType::MIN_MAX || filter_size_ > 0);
/// Returns true if SetFilter() has been called.
bool HasFilter() const { return has_filter_.Load(); }
const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
int32_t id() const { return filter_desc().filter_id; }
int64_t filter_size() const { return filter_size_; }
ColumnType type() const {
return ColumnType::FromThrift(filter_desc().src_expr.nodes[0].type);
bool is_bloom_filter() const { return filter_desc().type == TRuntimeFilterType::BLOOM; }
bool is_min_max_filter() const {
return filter_desc().type == TRuntimeFilterType::MIN_MAX;
MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
/// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
/// depending on the type of this RuntimeFilter. Can only legally be called
/// once per filter. Does not acquire the memory associated with 'bloom_filter'.
void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
/// Signal that no filter should be arriving, waking up any threads blocked in
/// WaitForArrival().
void Cancel();
/// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is
/// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently
/// with SetBloomFilter(). 'val' is a value derived from evaluating a tuple row against
/// the expression of the owning filter context. 'col_type' is the value's type.
/// Inlined in IR so that the constant 'col_type' can be propagated.
bool IR_ALWAYS_INLINE Eval(void* val, const ColumnType& col_type) const noexcept;
/// Returns the amount of time in milliseconds elapsed between the registration of the
/// filter and its arrival. If the filter has not yet arrived, it returns the time
/// elapsed since registration.
int32_t arrival_delay_ms() const {
if (arrival_time_.Load() == 0L) return MonotonicMillis() - registration_time_;
return arrival_time_.Load() - registration_time_;
/// Periodically (every 20ms) checks to see if the global filter has arrived. Waits for
/// a maximum of timeout_ms before returning. Returns true if the filter has arrived,
/// false otherwise.
bool WaitForArrival(int32_t timeout_ms) const;
/// Returns true if the filter returns true/false for all elements, i.e. Eval(v) returns
/// true/false for all v.
inline bool AlwaysTrue() const;
inline bool AlwaysFalse() const;
/// Frequency with which to check for filter arrival in WaitForArrival()
static const int SLEEP_PERIOD_MS;
/// Class name in LLVM IR.
static const char* LLVM_CLASS_NAME;
/// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
/// it does not filter any rows, either because it was not created
/// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
/// positive rate was determined to be too high.
AtomicPtr<BloomFilter> bloom_filter_;
/// May be NULL even after arrival_time_ is set if filter_desc_.min_max_filter is false.
AtomicPtr<MinMaxFilter> min_max_filter_;
/// Reference to the filter's thrift descriptor in the thrift Plan tree.
const TRuntimeFilterDesc& filter_desc_;
/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
/// Time, in ms (from MonotonicMillis()), that the global filter arrived, or the
/// filter was cancelled. Set in SetFilter() or Cancel().
AtomicInt64 arrival_time_;
/// Only set after arrival_time_, if SetFilter() was called.
AtomicBool has_filter_{false};
/// The size of the Bloom filter, in bytes.
const int64_t filter_size_;
/// Lock to protect 'arrival_cv_'
mutable boost::mutex arrival_mutex_;
/// Signalled when a filter arrives or the filter is cancelled. Paired with
/// 'arrival_mutex_'
mutable ConditionVariable arrival_cv_;