| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H |
| #define IMPALA_RUNTIME_RUNTIME_FILTER_H |
| |
| #include <boost/thread/mutex.hpp> |
| |
| #include "runtime/raw-value.h" |
| #include "runtime/runtime-filter-bank.h" |
| #include "util/bloom-filter.h" |
| #include "util/condition-variable.h" |
| #include "util/time.h" |
| |
| namespace impala { |
| |
| class BloomFilter; |
| |
| /// RuntimeFilters represent set-membership predicates that are computed during query |
| /// execution (rather than during planning). They can then be sent to other operators to |
| /// reduce their output. For example, a RuntimeFilter might compute a predicate |
| /// corresponding to set membership, where the members of that set can only be computed at |
| /// runtime (for example, the distinct values of the build side of a hash table). Other |
| /// plan nodes can use that predicate by testing for membership of that set to filter rows |
| /// early on in the plan tree (e.g. the scan that feeds the probe side of that join node |
| /// could eliminate rows from consideration for join matching). |
| /// |
| /// A RuntimeFilter may compute its set-membership predicate as a bloom filters or a |
| /// min-max filter, depending on its filter description. |
| class RuntimeFilter { |
| public: |
| RuntimeFilter(const TRuntimeFilterDesc& filter, int64_t filter_size) |
| : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter), |
| registration_time_(MonotonicMillis()), arrival_time_(0L), |
| filter_size_(filter_size) { |
| DCHECK(filter_desc_.type == TRuntimeFilterType::MIN_MAX || filter_size_ > 0); |
| } |
| |
| /// Returns true if SetFilter() has been called. |
| bool HasFilter() const { return has_filter_.Load(); } |
| |
| const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; } |
| int32_t id() const { return filter_desc().filter_id; } |
| int64_t filter_size() const { return filter_size_; } |
| ColumnType type() const { |
| return ColumnType::FromThrift(filter_desc().src_expr.nodes[0].type); |
| } |
| bool is_bloom_filter() const { return filter_desc().type == TRuntimeFilterType::BLOOM; } |
| bool is_min_max_filter() const { |
| return filter_desc().type == TRuntimeFilterType::MIN_MAX; |
| } |
| |
| MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); } |
| |
| /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called |
| /// once per filter. Does not acquire the memory associated with 'bloom_filter'. |
| void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter); |
| |
| /// Signal that no filter should be arriving, waking up any threads blocked in |
| /// WaitForArrival(). |
| void Cancel(); |
| |
| /// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is |
| /// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently |
| /// with SetBloomFilter(). 'val' is a value derived from evaluating a tuple row against |
| /// the expression of the owning filter context. 'col_type' is the value's type. |
| /// Inlined in IR so that the constant 'col_type' can be propagated. |
| bool IR_ALWAYS_INLINE Eval(void* val, const ColumnType& col_type) const noexcept; |
| |
| /// Returns the amount of time in milliseconds elapsed between the registration of the |
| /// filter and its arrival. If the filter has not yet arrived, it returns the time |
| /// elapsed since registration. |
| int32_t arrival_delay_ms() const { |
| if (arrival_time_.Load() == 0L) return MonotonicMillis() - registration_time_; |
| return arrival_time_.Load() - registration_time_; |
| } |
| |
| /// Periodically (every 20ms) checks to see if the global filter has arrived. Waits for |
| /// a maximum of timeout_ms before returning. Returns true if the filter has arrived, |
| /// false otherwise. |
| bool WaitForArrival(int32_t timeout_ms) const; |
| |
| /// Returns true if the filter returns true/false for all elements, i.e. Eval(v) returns |
| /// true/false for all v. |
| inline bool AlwaysTrue() const; |
| inline bool AlwaysFalse() const; |
| |
| /// Frequency with which to check for filter arrival in WaitForArrival() |
| static const int SLEEP_PERIOD_MS; |
| |
| /// Class name in LLVM IR. |
| static const char* LLVM_CLASS_NAME; |
| |
| private: |
| /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that |
| /// it does not filter any rows, either because it was not created |
| /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false |
| /// positive rate was determined to be too high. |
| AtomicPtr<BloomFilter> bloom_filter_; |
| |
| /// May be NULL even after arrival_time_ is set if filter_desc_.min_max_filter is false. |
| AtomicPtr<MinMaxFilter> min_max_filter_; |
| |
| /// Reference to the filter's thrift descriptor in the thrift Plan tree. |
| const TRuntimeFilterDesc& filter_desc_; |
| |
| /// Time in ms (from MonotonicMillis()), that the filter was registered. |
| const int64_t registration_time_; |
| |
| /// Time, in ms (from MonotonicMillis()), that the global filter arrived, or the |
| /// filter was cancelled. Set in SetFilter() or Cancel(). |
| AtomicInt64 arrival_time_; |
| |
| /// Only set after arrival_time_, if SetFilter() was called. |
| AtomicBool has_filter_{false}; |
| |
| /// The size of the Bloom filter, in bytes. |
| const int64_t filter_size_; |
| |
| /// Lock to protect 'arrival_cv_' |
| mutable boost::mutex arrival_mutex_; |
| |
| /// Signalled when a filter arrives or the filter is cancelled. Paired with |
| /// 'arrival_mutex_' |
| mutable ConditionVariable arrival_cv_; |
| }; |
| |
| } |
| |
| #endif |