| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/client/client-test-util.h" |
| |
| #include <algorithm> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/client/scan_batch.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/util/status.h" |
| |
| using std::string; |
| using std::vector; |
| |
| namespace kudu { |
| namespace client { |
| |
| void LogSessionErrorsAndDie(const sp::shared_ptr<KuduSession>& session, |
| const Status& s) { |
| CHECK(!s.ok()); |
| vector<KuduError*> errors; |
| ElementDeleter d(&errors); |
| bool overflow; |
| session->GetPendingErrors(&errors, &overflow); |
| CHECK(!overflow); |
| |
| // Log only the first 10 errors. |
| LOG(INFO) << errors.size() << " failed ops. First 10 errors follow"; |
| int i = 0; |
| for (const KuduError* e : errors) { |
| if (i == 10) { |
| break; |
| } |
| LOG(INFO) << "Op " << e->failed_op().ToString() |
| << " had status " << e->status().ToString(); |
| i++; |
| } |
| CHECK_OK(s); // will fail |
| } |
| |
| Status ScanTableToStrings(KuduTable* table, |
| vector<string>* row_strings, |
| ScannedRowsOrder rows_order) { |
| row_strings->clear(); |
| KuduScanner scanner(table); |
| // TODO(dralves) Change this to READ_AT_SNAPSHOT, fault tolerant scan and get rid |
| // of the retry code below. |
| RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY)); |
| RETURN_NOT_OK(scanner.SetTimeoutMillis(15000)); |
| RETURN_NOT_OK(ScanToStrings(&scanner, row_strings)); |
| if (rows_order == ScannedRowsOrder::kSorted) { |
| std::sort(row_strings->begin(), row_strings->end()); |
| } |
| return Status::OK(); |
| } |
| |
| int64_t CountTableRows(KuduTable* table) { |
| vector<string> rows; |
| CHECK_OK(client::ScanTableToStrings(table, &rows)); |
| return rows.size(); |
| } |
| |
| Status CountRowsWithRetries(KuduScanner* scanner, size_t* row_count) { |
| if (!scanner) { |
| return Status::InvalidArgument("null scanner"); |
| } |
| // KUDU-1656: there might be timeouts, so re-try the operations |
| // to avoid flakiness. |
| Status row_count_status; |
| size_t actual_row_count = 0; |
| row_count_status = scanner->Open(); |
| for (size_t i = 0; i < 3; ++i) { |
| if (!row_count_status.ok()) { |
| if (row_count_status.IsTimedOut()) { |
| // Start the row count over again. |
| continue; |
| } |
| RETURN_NOT_OK(row_count_status); |
| } |
| size_t count = 0; |
| while (scanner->HasMoreRows()) { |
| KuduScanBatch batch; |
| row_count_status = scanner->NextBatch(&batch); |
| if (!row_count_status.ok()) { |
| if (row_count_status.IsTimedOut()) { |
| // Break the NextBatch() cycle and start row count over again. |
| break; |
| } |
| RETURN_NOT_OK(row_count_status); |
| } |
| count += batch.NumRows(); |
| } |
| if (row_count_status.ok()) { |
| // Success: stop the retry cycle. |
| actual_row_count = count; |
| break; |
| } |
| } |
| RETURN_NOT_OK(row_count_status); |
| if (row_count) { |
| *row_count = actual_row_count; |
| } |
| return Status::OK(); |
| } |
| |
| Status ScanToStrings(KuduScanner* scanner, vector<string>* row_strings) { |
| RETURN_NOT_OK(scanner->Open()); |
| KuduScanBatch batch; |
| while (scanner->HasMoreRows()) { |
| RETURN_NOT_OK(scanner->NextBatch(&batch)); |
| for (const KuduScanBatch::RowPtr row : batch) { |
| row_strings->push_back(row.ToString()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace client |
| } // namespace kudu |