blob: 332d5bd8726b70d98741dbfa1fb112b19abeab69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#pragma once
#include <folly/Conv.h>
#include <folly/ExceptionWrapper.h>
#include <folly/Format.h>
#include <folly/Logging.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
#include <algorithm>
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "hbase/connection/rpc-client.h"
#include "hbase/client/async-connection.h"
#include "hbase/client/hbase-rpc-controller.h"
#include "hbase/client/raw-scan-result-consumer.h"
#include "hbase/client/region-location.h"
#include "hbase/client/request-converter.h"
#include "hbase/client/response-converter.h"
#include "hbase/client/result.h"
#include "hbase/client/scan-result-cache.h"
#include "hbase/client/scan.h"
#include "hbase/exceptions/exception.h"
#include "client/Client.pb.h"
#include "HBase.pb.h"
#include "hbase/utils/bytes-util.h"
#include "hbase/utils/connection-util.h"
#include "hbase/utils/optional.h"
#include "hbase/utils/sys-util.h"
#include "hbase/utils/time-util.h"
using std::chrono::nanoseconds;
using std::chrono::milliseconds;
namespace hbase {
class AsyncScanRpcRetryingCaller;
// The resume method is allowed to be called in another thread so here we also use the
// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
// and when user calls resume method, we will change the state to RESUMED. But the resume method
// could be called in other thread, and in fact, user could just do this:
// controller.suspend().resume()
// This is strange but valid. This means the scan could be resumed before we call the prepare
// method to do the actual suspend work. So in the resume method, we will check if the state is
// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
// method, if the state is RESUMED already, we will just return an let the scan go on.
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread.
// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the
// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The
// application is expected to consume the scan results before the scanner lease timeout.
class ScanResumerImpl : public ScanResumer {
public:
explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
virtual ~ScanResumerImpl() = default;
/**
* Resume the scan. You are free to call it multiple time but only the first call will take
* effect.
*/
void Resume() override;
// return false if the scan has already been resumed. See the comment above for ScanResumerImpl
// for more details.
bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows);
private:
// INITIALIZED -> SUSPENDED -> RESUMED
// INITIALIZED -> RESUMED
ScanResumerState state_ = ScanResumerState::kInitialized;
std::mutex mutex_;
std::shared_ptr<pb::ScanResponse> resp_ = nullptr;
int64_t num_complete_rows_ = 0;
std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
};
class ScanControllerImpl : public ScanController {
public:
virtual ~ScanControllerImpl() = default;
explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
/**
* Suspend the scan.
* <p>
* This means we will stop fetching data in background, i.e., will not call onNext any more
* before you resume the scan.
* @return A resumer used to resume the scan later.
*/
std::shared_ptr<ScanResumer> Suspend();
/**
* Terminate the scan.
* <p>
* This is useful when you have got enough results and want to stop the scan in onNext method,
* or you want to stop the scan in onHeartbeat method because it has spent too many time.
*/
void Terminate();
// return the current state, and set the state to DESTROYED.
ScanControllerState Destroy();
std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; }
private:
void PreCheck();
std::string DebugString(ScanControllerState state);
std::string DebugString(ScanResumerState state);
private:
// Make sure the methods are only called in this thread.
std::thread::id caller_thread_id_ = std::this_thread::get_id();
// INITIALIZED -> SUSPENDED -> DESTROYED
// INITIALIZED -> TERMINATED -> DESTROYED
// INITIALIZED -> DESTROYED
// If the state is incorrect we will throw IllegalStateException.
ScanControllerState state_ = ScanControllerState::kInitialized;
std::shared_ptr<ScanResumerImpl> resumer_ = nullptr;
std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
};
class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> {
public:
AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
std::shared_ptr<folly::HHWheelTimer> retry_timer,
std::shared_ptr<hbase::RpcClient> rpc_client,
std::shared_ptr<Scan> scan, int64_t scanner_id,
std::shared_ptr<ScanResultCache> results_cache,
std::shared_ptr<RawScanResultConsumer> consumer,
std::shared_ptr<RegionLocation> region_location,
nanoseconds scanner_lease_timeout_nanos, nanoseconds pause,
uint32_t max_retries, nanoseconds scan_timeout_nanos,
nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<pb::ScanResponse> open_scan_resp,
const std::shared_ptr<CellScanner> cell_scanner);
private:
int64_t RemainingTimeNs();
void OnComplete(std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<pb::ScanResponse> resp,
const std::shared_ptr<CellScanner> cell_scanner);
void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp);
void CompleteExceptionally(bool close_scanner);
void CompleteNoMoreResults();
void CompleteWhenNoMoreResultsInRegion();
void CompleteWithNextStartRow(std::string row, bool inclusive);
void UpdateNextStartRowWhenError(const Result& result);
void CompleteWhenError(bool close_scanner);
void OnError(const folly::exception_wrapper& e);
bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info);
void Next();
void Call();
void CloseScanner();
void ResetController(std::shared_ptr<HBaseRpcController> controller,
const int64_t& timeout_nanos);
private:
std::shared_ptr<AsyncConnection> conn_;
std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<hbase::RpcClient> rpc_client_;
std::shared_ptr<Scan> scan_;
int64_t scanner_id_;
std::shared_ptr<ScanResultCache> results_cache_;
std::shared_ptr<RawScanResultConsumer> consumer_;
std::shared_ptr<RegionLocation> region_location_;
nanoseconds scanner_lease_timeout_nanos_;
nanoseconds pause_;
uint32_t max_retries_;
nanoseconds scan_timeout_nanos_;
nanoseconds rpc_timeout_nanos_;
uint32_t start_log_errors_count_;
std::shared_ptr<folly::Promise<bool>> promise_;
std::shared_ptr<HBaseRpcController> controller_;
optional<std::string> next_start_row_when_error_ = optional<std::string>();
bool include_next_start_row_when_error_ = true;
uint64_t start_ns_;
uint32_t tries_;
std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
uint32_t max_attempts_;
int64_t next_call_seq_ = -1L;
friend class ScanResumerImpl;
friend class ScanControllerImpl;
};
} // namespace hbase