| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_EXEC_KUDU_SCANNER_H_ |
| #define IMPALA_EXEC_KUDU_SCANNER_H_ |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <kudu/client/client.h> |
| |
| #include "common/object-pool.h" |
| #include "exec/kudu-scan-node-base.h" |
| #include "runtime/descriptors.h" |
| |
| namespace impala { |
| |
| class MemPool; |
| class RowBatch; |
| class RuntimeState; |
| class Tuple; |
| |
| /// Wraps a Kudu client scanner to fetch row batches from Kudu. The Kudu client scanner |
| /// is created from a scan token in OpenNextScanToken(), which then provides rows fetched |
| /// by GetNext() until it reaches eos, and the caller may open another scan token. |
| class KuduScanner { |
| public: |
| KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state); |
| |
| /// Prepares this scanner for execution. |
| /// Does not actually open a kudu::client::KuduScanner. |
| Status Open(); |
| |
| /// Opens a new kudu::client::KuduScanner using 'scan_token'. If there are no rows to |
| /// scan (eg. because there is a runtime filter that rejects all rows) 'eos' will |
| /// be set to true, otherwise if the return status is OK it will be false. |
| Status OpenNextScanToken(const std::string& scan_token, bool* eos); |
| |
| /// Fetches the next batch from the current kudu::client::KuduScanner. |
| Status GetNext(RowBatch* row_batch, bool* eos); |
| |
| /// Sends a "Ping" to the Kudu TabletServer servicing the current scan, if there is |
| /// one. This serves the purpose of making the TabletServer keep the server side |
| /// scanner alive if the batch queue is full and no batches can be queued. If there are |
| /// any errors, they are ignored here, since we assume that we will just fail the next |
| /// time we try to read a batch. |
| void KeepKuduScannerAlive(); |
| |
| /// Closes this scanner. |
| void Close(); |
| |
| private: |
| /// Handles count(*) queries, writing only the NumRows from the Kudu batch. |
| /// The optimization is possible only in simpler cases e.g. when there are no conjucts. |
| /// Check ScanNode.java#canApplyCountStarOptimization for full detail. |
| Status GetNextWithCountStarOptimization(RowBatch* row_batch, bool* eos); |
| |
| /// Handles the case where the projection is empty (e.g. count(*)). |
| /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one. |
| /// If in the rare case where there is any conjunct, evaluate them once for each row |
| /// and add a row to the row batch only when the conjuncts evaluate to true. |
| Status HandleEmptyProjection(RowBatch* row_batch); |
| |
| /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch. |
| /// - 'batch' is the batch that will point to the new tuples. |
| /// - *tuple_mem should be the location to output tuples. |
| /// Returns OK when one of the following conditions occur: |
| /// - cur_kudu_batch_ is fully consumed |
| /// - batch is full |
| /// - scan_node_ limit has been reached |
| Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem); |
| |
| /// Fetches the next batch of rows from the current kudu::client::KuduScanner. |
| Status GetNextScannerBatch(); |
| |
| /// Closes the current kudu::client::KuduScanner. |
| void CloseCurrentClientScanner(); |
| |
| inline Tuple* next_tuple(Tuple* t) const { |
| uint8_t* mem = reinterpret_cast<uint8_t*>(t); |
| return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size()); |
| } |
| |
| /// Builds the error string by adding the PlanNode id and KuduTable to the message. |
| string BuildErrorString(const char* msg); |
| |
| KuduScanNodeBase* scan_node_; |
| RuntimeState* state_; |
| |
| /// For objects which have the same life time as the scanner. |
| ObjectPool obj_pool_; |
| |
| /// MemPool used for expr-managed allocations in expression evaluators in this scanner. |
| /// Need to be local to each scanner as MemPool is not thread safe. |
| boost::scoped_ptr<MemPool> expr_perm_pool_; |
| |
| /// MemPool used for allocations by expression evaluators in this scanner that hold |
| /// results of expression evaluation. Need to be local to each scanner as MemPool is |
| /// not thread safe. |
| boost::scoped_ptr<MemPool> expr_results_pool_; |
| |
| /// The kudu::client::KuduScanner for the current scan token. A new KuduScanner is |
| /// created for each scan token using KuduScanToken::DeserializeIntoScanner(). |
| boost::scoped_ptr<kudu::client::KuduScanner> scanner_; |
| |
| /// The current batch of retrieved rows. |
| kudu::client::KuduScanBatch cur_kudu_batch_; |
| |
| /// The number of rows already read from cur_kudu_batch_. |
| int cur_kudu_batch_num_read_; |
| |
| /// The last time a keepalive request or successful RPC was sent. |
| int64_t last_alive_time_micros_; |
| |
| /// The scanner's cloned copy of the conjuncts to apply. |
| vector<ScalarExprEvaluator*> conjunct_evals_; |
| |
| /// Timestamp slots in the tuple descriptor of the scan node. Used to convert Kudu |
| /// UNIXTIME_MICRO values inline. |
| vector<const SlotDescriptor*> timestamp_slots_; |
| }; |
| |
| } /// namespace impala |
| |
| #endif /// IMPALA_EXEC_KUDU_SCANNER_H_ |