blob: 0a2a66eecaeb49b8b7959dfe5025c9efc8f74b73 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The table implementation based on {@link AsyncTable}.
class TableOverAsyncTable implements Table {
private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
private final AsyncConnectionImpl conn;
private final AsyncTable<?> table;
private final IOExceptionSupplier<ExecutorService> poolSupplier;
TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
IOExceptionSupplier<ExecutorService> poolSupplier) {
this.conn = conn;
this.table = table;
this.poolSupplier = poolSupplier;
public TableName getName() {
return table.getName();
public Configuration getConfiguration() {
return table.getConfiguration();
public TableDescriptor getDescriptor() throws IOException {
return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
public boolean exists(Get get) throws IOException {
return FutureUtils.get(table.exists(get));
public boolean[] exists(List<Get> gets) throws IOException {
return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
public void batch(List<? extends Row> actions, Object[] results) throws IOException {
if (ArrayUtils.isEmpty(results)) {
List<ThrowableWithExtraContext> errors = new ArrayList<>();
List<CompletableFuture<Object>> futures = table.batch(actions);
for (int i = 0, n = results.length; i < n; i++) {
try {
results[i] = FutureUtils.get(futures.get(i));
} catch (IOException e) {
results[i] = e;
errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
"Error when processing " + actions.get(i)));
if (!errors.isEmpty()) {
throw new RetriesExhaustedException(errors.size(), errors);
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback)
throws IOException, InterruptedException {
ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(actions.size());
AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
List<CompletableFuture<R>> futures = table.<R> batch(actions);
for (int i = 0, n = futures.size(); i < n; i++) {
final int index = i;
FutureUtils.addListener(futures.get(i), (r, e) -> {
if (e != null) {
errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
"Error when processing " + actions.get(index)));
if (!ArrayUtils.isEmpty(results)) {
results[index] = e;
} else {
if (!ArrayUtils.isEmpty(results)) {
results[index] = r;
(l, le) -> {
if (le != null) {
errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
"Error when finding the region for row " +
} else {
callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
if (!errors.isEmpty()) {
throw new RetriesExhaustedException(errors.size(),;
public Result get(Get get) throws IOException {
return FutureUtils.get(table.get(get));
public Result[] get(List<Get> gets) throws IOException {
return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]);
public ResultScanner getScanner(Scan scan) throws IOException {
return table.getScanner(scan);
public ResultScanner getScanner(byte[] family) throws IOException {
return table.getScanner(family);
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
return table.getScanner(family, qualifier);
public void put(Put put) throws IOException {
public void put(List<Put> puts) throws IOException {
public void delete(Delete delete) throws IOException {
public void delete(List<Delete> deletes) throws IOException {
private static final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
private final AsyncTable.CheckAndMutateBuilder builder;
public CheckAndMutateBuilderImpl(
org.apache.hadoop.hbase.client.AsyncTable.CheckAndMutateBuilder builder) {
this.builder = builder;
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
return this;
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
return this;
public CheckAndMutateBuilder ifNotExists() {
return this;
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
builder.ifMatches(compareOp, value);
return this;
public boolean thenPut(Put put) throws IOException {
return FutureUtils.get(builder.thenPut(put));
public boolean thenDelete(Delete delete) throws IOException {
return FutureUtils.get(builder.thenDelete(delete));
public boolean thenMutate(RowMutations mutation) throws IOException {
return FutureUtils.get(builder.thenMutate(mutation));
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(table.checkAndMutate(row, family));
public void mutateRow(RowMutations rm) throws IOException {
public Result append(Append append) throws IOException {
return FutureUtils.get(table.append(append));
public Result increment(Increment increment) throws IOException {
return FutureUtils.get(table.increment(increment));
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
throws IOException {
return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount));
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
Durability durability) throws IOException {
return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability));
public void close() {
private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl
implements CoprocessorRpcChannel {
RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs);
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
CoprocessorBlockingRpcCallback<Message> callback = new CoprocessorBlockingRpcCallback<>();
super.callMethod(method, c, request, responsePrototype, callback);
Message ret;
try {
ret = callback.get();
} catch (IOException e) {
setCoprocessorError(controller, e);
if (c.failed()) {
setCoprocessorError(controller, c.getFailed());
public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype) throws ServiceException {
ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
CoprocessorBlockingRpcCallback<Message> done = new CoprocessorBlockingRpcCallback<>();
callMethod(method, c, request, responsePrototype, done);
Message ret;
try {
ret = done.get();
} catch (IOException e) {
throw new ServiceException(e);
if (c.failed()) {
setCoprocessorError(controller, c.getFailed());
throw new ServiceException(c.getFailed());
return ret;
public RegionCoprocessorRpcChannel coprocessorService(byte[] row) {
return new RegionCoprocessorRpcChannel(conn, getName(), null, row,
getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS));
* Get the corresponding start keys and regions for an arbitrary range of keys.
* <p>
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range
* @param includeEndKey true if endRow is inclusive, false if exclusive
* @return A pair of list of start keys and list of HRegionLocations that contain the specified
* range
* @throws IOException if a remote or network exception occurs
private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
final byte[] endKey, final boolean includeEndKey) throws IOException {
return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
* Get the corresponding start keys and regions for an arbitrary range of keys.
* <p>
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range
* @param includeEndKey true if endRow is inclusive, false if exclusive
* @param reload true to reload information or false to use cached information
* @return A pair of list of start keys and list of HRegionLocations that contain the specified
* range
* @throws IOException if a remote or network exception occurs
private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
List<byte[]> keysInRange = new ArrayList<>();
List<HRegionLocation> regionsInRange = new ArrayList<>();
byte[] currentKey = startKey;
do {
HRegionLocation regionLocation =
FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload));
currentKey = regionLocation.getRegion().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
(endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 ||
(includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
return new Pair<>(keysInRange, regionsInRange);
private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
if (start == null) {
start = HConstants.EMPTY_START_ROW;
if (end == null) {
end = HConstants.EMPTY_END_ROW;
return getKeysAndRegionsInRange(start, end, true).getFirst();
private interface StubCall<R> {
R call(RegionCoprocessorRpcChannel channel) throws Exception;
private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
Callback<R> callback, StubCall<R> call) throws Throwable {
// get regions covered by the row range
ExecutorService pool = this.poolSupplier.get();
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
try {
for (byte[] r : keys) {
RegionCoprocessorRpcChannel channel = coprocessorService(r);
Future<R> future = pool.submit(new Callable<R>() {
public R call() throws Exception {
R result =;
byte[] region = channel.getLastRegion();
if (callback != null) {
callback.update(region, r, result);
return result;
futures.put(r, future);
} catch (RejectedExecutionException e) {
// maybe the connection has been closed, let's check
if (conn.isClosed()) {
throw new DoNotRetryIOException("Connection is closed", e);
} else {
throw new HBaseIOException("Coprocessor operation is rejected", e);
for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
try {
} catch (ExecutionException ee) {
LOG.warn("Error calling coprocessor service " + serviceName + " for row " +
Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause();
} catch (InterruptedException ie) {
throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName +
" for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
byte[] endKey, Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable {
coprocssorService(service.getName(), startKey, endKey, callback, channel -> {
T instance = org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype);
public long getRpcTimeout(TimeUnit unit) {
return table.getRpcTimeout(unit);
public long getReadRpcTimeout(TimeUnit unit) {
return table.getReadRpcTimeout(unit);
public long getWriteRpcTimeout(TimeUnit unit) {
return table.getWriteRpcTimeout(unit);
public long getOperationTimeout(TimeUnit unit) {
return table.getOperationTimeout(unit);
public RegionLocator getRegionLocator() throws IOException {
return conn.toConnection().getRegionLocator(getName());