blob: 902e15eaa74008559d9a42525a6e68d7a94ad754 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.util.Pair;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region, or a specific region.
* 2. The scanner sends all the RPCs to the default/specific region until it is done, or, there
* is a timeout on the default/specific region (a timeout of zero is disallowed).
* 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) only
* for Consistency.TIMELINE without specific replica id specified.
* 4. The results from the first successful scanner are taken, and it is stored which server
* returned the results.
* 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
* in which case, the other replicas are queried (as in (3) above).
*
*/
@InterfaceAudience.Private
class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
final ClusterConnection cConnection;
protected final ExecutorService pool;
protected final int timeBeforeReplicas;
private final Scan scan;
private final int retries;
private Result lastResult;
private final RpcRetryingCaller<Result[]> caller;
private final TableName tableName;
private Configuration conf;
private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<>();
private boolean someRPCcancelled = false; //required for testing purposes only
private int regionReplication = 0;
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;
if (timeBeforeReplicas < 0) {
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
}
this.timeBeforeReplicas = timeBeforeReplicas;
this.scan = scan;
this.retries = retries;
this.tableName = tableName;
this.conf = conf;
this.scannerTimeout = scannerTimeout;
this.caller = caller;
}
public void setClose() {
if(currentScannerCallable != null) {
currentScannerCallable.setClose();
} else {
LOG.warn("Calling close on ScannerCallable reference that is already null, "
+ "which shouldn't happen.");
}
}
public void setRenew(boolean val) {
currentScannerCallable.setRenew(val);
}
public void setCaching(int caching) {
currentScannerCallable.setCaching(caching);
}
public int getCaching() {
return currentScannerCallable.getCaching();
}
public HRegionInfo getHRegionInfo() {
return currentScannerCallable.getHRegionInfo();
}
public MoreResults moreResultsInRegion() {
return currentScannerCallable.moreResultsInRegion();
}
public MoreResults moreResultsForScan() {
return currentScannerCallable.moreResultsForScan();
}
@Override
public Result[] call(int timeout) throws IOException {
// If the active replica callable was closed somewhere, invoke the RPC to
// really close it. In the case of regular scanners, this applies. We make couple
// of RPCs to a RegionServer, and when that region is exhausted, we set
// the closed flag. Then an RPC is required to actually close the scanner.
if (currentScannerCallable != null && currentScannerCallable.closed) {
// For closing we target that exact scanner (and not do replica fallback like in
// the case of normal reads)
if (LOG.isTraceEnabled()) {
LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
}
Result[] r = currentScannerCallable.call(timeout);
currentScannerCallable = null;
return r;
} else if(currentScannerCallable == null) {
LOG.warn("Another call received, but our ScannerCallable is already null. "
+ "This shouldn't happen, but there's not much to do, so logging and returning null.");
return null;
}
// We need to do the following:
//1. When a scan goes out to a certain replica (default or not), we need to
// continue to hit that until there is a failure. So store the last successfully invoked
// replica
//2. We should close the "losing" scanners (scanners other than the ones we hear back
// from first)
//
// Since RegionReplication is a table attribute, it wont change as long as table is enabled,
// it just needs to be set once.
if (regionReplication <= 0) {
RegionLocations rl = null;
try {
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
// We cannot get the primary replica region location, it is possible that the region server
// hosting meta table is down, it needs to proceed to try cached replicas directly.
if (cConnection instanceof ConnectionImplementation) {
rl = ((ConnectionImplementation) cConnection)
.getCachedLocation(tableName, currentScannerCallable.getRow());
if (rl == null) {
throw e;
}
} else {
// For completeness
throw e;
}
}
regionReplication = rl.size();
}
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false);
// submit call for the primary replica or user specified replica
addCallsForCurrentReplica(cs);
int startIndex = 0;
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
// After poll, if f is not null, there must be a completed task
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); //great we got a response
}
} catch (ExecutionException e) {
// We ignore the ExecutionException and continue with the replicas
if (LOG.isDebugEnabled()) {
LOG.debug("Scan with primary region returns " + e.getCause());
}
// If rl's size is 1 or scan's consitency is strong, or scan is over specific replica,
// it needs to throw out the exception from the primary replica
if (regionReplication == 1 || scan.getConsistency() == Consistency.STRONG ||
scan.getReplicaId() >= 0) {
// Rethrow the first exception
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
}
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
// submit call for the all of the secondaries at once
int endIndex = regionReplication;
if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) {
// When scan's consistency is strong or scan is over specific replica region, do not send to
// the secondaries
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, 0, regionReplication - 1);
}
try {
Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
TimeUnit.MILLISECONDS, startIndex, endIndex);
if (f == null) {
throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
}
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll();
}
LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
throw new IOException("Imposible? Arrive at an unreachable line...");
}
private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
AtomicBoolean done, ExecutorService pool) {
if (done.compareAndSet(false, true)) {
if (currentScannerCallable != scanner) replicaSwitched.set(true);
currentScannerCallable = scanner;
// store where to start the replica scanner from if we need to.
if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
if (LOG.isTraceEnabled()) {
LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
" associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
}
// close all outstanding replica scanners but the one we heard back from
outstandingCallables.remove(scanner);
for (ScannerCallable s : outstandingCallables) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing scanner id=" + s.scannerId +
", replica=" + s.getHRegionInfo().getRegionId() +
" because slow and replica=" +
this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
}
// Submit the "close" to the pool since this might take time, and we don't
// want to wait for the "close" to happen yet. The "wait" will happen when
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
final RetryingRPC r = new RetryingRPC(s);
pool.submit(new Callable<Void>(){
@Override
public Void call() throws Exception {
r.call(scannerTimeout);
return null;
}
});
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();
}
}
/**
* When a scanner switches in the middle of scanning (the 'next' call fails
* for example), the upper layer {@link ClientScanner} needs to know
*/
public boolean switchedToADifferentReplica() {
return replicaSwitched.get();
}
/**
* @return true when the most recent RPC response indicated that the response was a heartbeat
* message. Heartbeat messages are sent back from the server when the processing of the
* scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
* timeouts during long running scan operations.
*/
public boolean isHeartbeatMessage() {
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
}
public Cursor getCursor() {
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
}
private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
}
private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
continue; //this was already scheduled earlier
}
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
setStartRowForReplicaCallable(s);
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id);
}
}
/**
* Set the start row for the replica callable based on the state of the last result received.
* @param callable The callable to set the start row on
*/
private void setStartRowForReplicaCallable(ScannerCallable callable) {
if (this.lastResult == null || callable == null) {
return;
}
// 1. The last result was a partial result which means we have not received all of the cells
// for this row. Thus, use the last result's row as the start row. If a replica switch
// occurs, the scanner will ensure that any accumulated partial results are cleared,
// and the scan can resume from this row.
// 2. The last result was not a partial result which means it contained all of the cells for
// that row (we no longer need any information from it). Set the start row to the next
// closest row that could be seen.
callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow());
}
@VisibleForTesting
boolean isAnyRPCcancelled() {
return someRPCcancelled;
}
class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
final ScannerCallable callable;
RpcRetryingCaller<Result[]> caller;
private volatile boolean cancelled = false;
RetryingRPC(ScannerCallable callable) {
this.callable = callable;
// For the Consistency.STRONG (default case), we reuse the caller
// to keep compatibility with what is done in the past
// For the Consistency.TIMELINE case, we can't reuse the caller
// since we could be making parallel RPCs (caller.callWithRetries is synchronized
// and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
.<Result[]>newCaller();
}
}
@Override
public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
// since the retries is done within the ResultBoundedCompletionService,
// we don't invoke callWithRetries here
if (cancelled) {
return null;
}
Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
return new Pair<>(res, this.callable);
}
@Override
public void prepare(boolean reload) throws IOException {
if (cancelled) return;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
callable.prepare(reload);
}
@Override
public void throwable(Throwable t, boolean retrying) {
callable.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return callable.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return callable.sleep(pause, tries);
}
@Override
public void cancel() {
cancelled = true;
caller.cancel();
if (callable.getRpcController() != null) {
callable.getRpcController().startCancel();
}
someRPCcancelled = true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
}
@Override
public void prepare(boolean reload) throws IOException {
}
@Override
public void throwable(Throwable t, boolean retrying) {
currentScannerCallable.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return currentScannerCallable.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return currentScannerCallable.sleep(pause, tries);
}
}