| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/kudu-scan-node.h" |
| |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include "exec/exec-node-util.h" |
| #include "exec/kudu-scanner.h" |
| #include "exec/kudu-util.h" |
| #include "exprs/scalar-expr.h" |
| #include "gutil/gscoped_ptr.h" |
| #include "runtime/blocking-row-batch-queue.h" |
| #include "runtime/fragment-instance-state.h" |
| #include "runtime/mem-pool.h" |
| #include "runtime/query-state.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/scanner-mem-limiter.h" |
| #include "runtime/thread-resource-mgr.h" |
| #include "runtime/tuple-row.h" |
| #include "util/runtime-profile-counters.h" |
| |
| #include "common/names.h" |
| |
| DEFINE_int32(kudu_max_row_batches, 0, "The maximum size of the row batch queue, " |
| " for Kudu scanners."); |
| |
| // Empirically derived estimate for the Kudu scan's memory consumption per column of |
| // data materialized. |
| DEFINE_int64_hidden(kudu_scanner_thread_estimated_bytes_per_column, 384L * 1024L, |
| "Estimated bytes of memory per materialized column consumed by Kudu scanner thread."); |
| |
| // Empirically derived estimate for the maximum consumption of Kudu scan, based on |
| // experiments with 250-column table with num_scanner_threads=1, where I wasn't able |
| // to coax the scan to use more than 25MB of memory. |
| DEFINE_int64_hidden(kudu_scanner_thread_max_estimated_bytes, 32L * 1024L * 1024L, |
| "Estimated maximum bytes of memory consumed by Kudu scanner thread for high column " |
| "counts."); |
| |
| namespace impala { |
| |
| KuduScanNode::KuduScanNode(ObjectPool* pool, const ScanPlanNode& pnode, |
| const DescriptorTbl& descs) |
| : KuduScanNodeBase(pool, pnode, descs), |
| done_(false), |
| thread_avail_cb_id_(-1) { |
| DCHECK(KuduIsAvailable()); |
| } |
| |
| KuduScanNode::~KuduScanNode() { |
| DCHECK(is_closed()); |
| } |
| |
| Status KuduScanNode::Prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(KuduScanNodeBase::Prepare(state)); |
| thread_state_.Prepare(this, EstimateScannerThreadMemConsumption()); |
| return Status::OK(); |
| } |
| |
| Status KuduScanNode::Open(RuntimeState* state) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| ScopedOpenEventAdder ea(this); |
| RETURN_IF_ERROR(KuduScanNodeBase::Open(state)); |
| thread_state_.Open(this, FLAGS_kudu_max_row_batches); |
| |
| if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(); |
| |
| thread_avail_cb_id_ = state->resource_pool()->AddThreadAvailableCb( |
| bind<void>(mem_fn(&KuduScanNode::ThreadAvailableCb), this, _1)); |
| ThreadAvailableCb(state->resource_pool()); |
| return Status::OK(); |
| } |
| |
| Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| ScopedGetNextEventAdder ea(this, eos); |
| RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(QueryMaintenance(state)); |
| |
| // If there are no scan tokens, nothing is ever placed in the materialized |
| // row batch, so exit early for this case. |
| if (NumScanTokens() == 0 || ReachedLimitShared()) { |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| *eos = false; |
| unique_ptr<RowBatch> materialized_batch = thread_state_.batch_queue()->GetBatch(); |
| if (materialized_batch != NULL) { |
| row_batch->AcquireState(materialized_batch.get()); |
| if (CheckLimitAndTruncateRowBatchIfNeededShared(row_batch, eos)) { |
| SetDone(); |
| } |
| COUNTER_SET(rows_returned_counter_, rows_returned_shared()); |
| materialized_batch.reset(); |
| } else { |
| *eos = true; |
| } |
| |
| unique_lock<mutex> l(lock_); |
| return status_; |
| } |
| |
| void KuduScanNode::Close(RuntimeState* state) { |
| if (is_closed()) return; |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| if (thread_avail_cb_id_ != -1) { |
| state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_); |
| } |
| |
| SetDone(); |
| |
| thread_state_.Close(this); |
| KuduScanNodeBase::Close(state); |
| } |
| |
| int64_t KuduScanNode::EstimateScannerThreadMemConsumption() { |
| int64_t num_cols = max<int64_t>(1, tuple_desc()->slots().size()); |
| return min(FLAGS_kudu_scanner_thread_max_estimated_bytes, |
| FLAGS_kudu_scanner_thread_estimated_bytes_per_column * num_cols); |
| } |
| |
| void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) { |
| ScannerMemLimiter* mem_limiter = runtime_state_->query_state()->scanner_mem_limiter(); |
| while (true) { |
| unique_lock<mutex> lock(lock_); |
| // All done or all tokens are assigned. |
| if (done_ || !HasScanToken()) break; |
| bool first_thread = thread_state_.GetNumActive() == 0; |
| |
| // * Don't start up a ScannerThread if the row batch queue is full since |
| // we are not scanner bound. |
| // * Don't start up a thread if there is not enough memory available for the |
| // estimated memory consumption (include reservation and non-reserved memory). |
| if (!first_thread) { |
| if (thread_state_.batch_queue()->IsFull()) break; |
| if (!mem_limiter->ClaimMemoryForScannerThread( |
| this, EstimateScannerThreadMemConsumption())) { |
| COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1); |
| break; |
| } |
| } |
| |
| // Check if we can get a token. We need at least one thread to run. |
| if (first_thread) { |
| pool->AcquireThreadToken(); |
| } else if (thread_state_.GetNumActive() >= thread_state_.max_num_scanner_threads() |
| || !pool->TryAcquireThreadToken()) { |
| mem_limiter->ReleaseMemoryForScannerThread( |
| this, EstimateScannerThreadMemConsumption()); |
| break; |
| } |
| |
| string name = Substitute( |
| "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)", |
| PrintId(runtime_state_->fragment_instance_id()), id(), |
| thread_state_.GetNumStarted()); |
| |
| // Reserve the first token so no other thread picks it up. |
| const string* token = GetNextScanToken(); |
| auto fn = [this, first_thread, token, name]() { |
| this->RunScannerThread(first_thread, name, token); |
| }; |
| std::unique_ptr<Thread> t; |
| Status status = |
| Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true); |
| if (!status.ok()) { |
| // Release the token and skip running callbacks to find a replacement. Skipping |
| // serves two purposes. First, it prevents a mutual recursion between this function |
| // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and |
| // is likely to continue failing for future callbacks. |
| pool->ReleaseThreadToken(first_thread, true); |
| if (!first_thread) { |
| mem_limiter->ReleaseMemoryForScannerThread( |
| this, EstimateScannerThreadMemConsumption()); |
| } |
| |
| // Abort the query. This is still holding the lock_, so done_ is known to be |
| // false and status_ must be ok. |
| DCHECK(status_.ok()); |
| status_ = status; |
| SetDoneInternal(); |
| break; |
| } |
| // Thread successfully started |
| thread_state_.AddThread(move(t)); |
| } |
| } |
| |
| Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_token) { |
| bool eos; |
| RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token, &eos)); |
| if (eos) return Status::OK(); |
| while (!eos && !done_) { |
| unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(), |
| runtime_state_->batch_size(), mem_tracker()); |
| RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos)); |
| while (!done_) { |
| scanner->KeepKuduScannerAlive(); |
| if (thread_state_.EnqueueBatchWithTimeout(&row_batch, 1000000)) { |
| break; |
| } |
| // Make sure that we still own the RowBatch if BlockingPutWithTimeout() timed out. |
| DCHECK(row_batch != nullptr); |
| } |
| } |
| if (eos) scan_ranges_complete_counter_->Add(1); |
| return Status::OK(); |
| } |
| |
| void KuduScanNode::RunScannerThread( |
| bool first_thread, const string& name, const string* initial_token) { |
| DCHECK(initial_token != nullptr); |
| SCOPED_THREAD_COUNTER_MEASUREMENT(thread_state_.thread_counters()); |
| SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); |
| KuduScanner scanner(this, runtime_state_); |
| |
| const string* scan_token = initial_token; |
| Status status = scanner.Open(); |
| if (status.ok()) { |
| // Here, even though a read of 'done_' may conflict with a write to it, |
| // ProcessScanToken() will return early, as will GetNextScanToken(). |
| while (!done_ && scan_token != nullptr) { |
| status = ProcessScanToken(&scanner, *scan_token); |
| if (!status.ok()) break; |
| |
| // Check if we have enough thread tokens to keep using this optional thread. This |
| // check is racy: multiple threads may notice that the optional tokens are exceeded |
| // and shut themselves down. If we shut down too many and there are more optional |
| // tokens, ThreadAvailableCb() will be invoked again. |
| if (!first_thread && runtime_state_->resource_pool()->optional_exceeded()) break; |
| |
| unique_lock<mutex> l(lock_); |
| if (!done_) { |
| scan_token = GetNextScanToken(); |
| } else { |
| scan_token = nullptr; |
| } |
| } |
| } |
| scanner.Close(); |
| |
| { |
| unique_lock<mutex> l(lock_); |
| if (!status.ok() && status_.ok()) { |
| status_ = status; |
| SetDoneInternal(); |
| } |
| if (thread_state_.DecrementNumActive()) SetDoneInternal(); |
| } |
| |
| // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which |
| // invokes ThreadAvailableCb() which attempts to take the same lock. |
| VLOG_RPC << "Thread done: " << name; |
| if (!first_thread) { |
| ScannerMemLimiter* mem_limiter = runtime_state_->query_state()->scanner_mem_limiter(); |
| mem_limiter->ReleaseMemoryForScannerThread( |
| this, EstimateScannerThreadMemConsumption()); |
| } |
| runtime_state_->resource_pool()->ReleaseThreadToken(first_thread); |
| } |
| |
| void KuduScanNode::SetDoneInternal() { |
| if (done_) return; |
| done_ = true; |
| thread_state_.Shutdown(); |
| } |
| |
| void KuduScanNode::SetDone() { |
| unique_lock<mutex> l(lock_); |
| SetDoneInternal(); |
| } |
| |
| } // namespace impala |