blob: 250b8a74f03004a0c520d84579671382f9248207 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2.client;
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableBuilder;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.thrift.Constants;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.http.HttpRequest;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.protocol.HttpContext;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class ThriftConnection implements Connection {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class);
private Configuration conf;
private User user;
// For HTTP protocol
private HttpClient httpClient;
private boolean httpClientCreated = false;
private boolean isClosed = false;
private String host;
private int port;
private boolean isFramed = false;
private boolean isCompact = false;
// TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time.
ThriftClientBuilder clientBuilder;
private int operationTimeout;
private int connectTimeout;
public ThriftConnection(Configuration conf, ExecutorService pool, final User user)
throws IOException {
this.conf = conf;
this.user = user;
this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
Preconditions.checkArgument(port > 0);
Preconditions.checkArgument(host != null);
this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
DefaultThriftClientBuilder.class.getName());
try {
Class<?> clazz = Class.forName(className);
Constructor<?> constructor = clazz.getDeclaredConstructor(ThriftConnection.class);
constructor.setAccessible(true);
clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
} catch (Exception e) {
throw new IOException(e);
}
}
public synchronized void setHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Configuration getConfiguration() {
return conf;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public boolean isFramed() {
return isFramed;
}
public boolean isCompact() {
return isCompact;
}
public int getOperationTimeout() {
return operationTimeout;
}
public int getConnectTimeout() {
return connectTimeout;
}
/**
* the default thrift client builder. One can extend the ThriftClientBuilder to builder custom
* client, implement features like authentication(hbase-examples/thrift/DemoClient)
*/
public static class DefaultThriftClientBuilder extends ThriftClientBuilder {
@Override
public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
TTransport tTransport = null;
try {
TSocket sock = new TSocket(connection.getHost(), connection.getPort());
sock.setSocketTimeout(connection.getOperationTimeout());
sock.setConnectTimeout(connection.getConnectTimeout());
tTransport = sock;
if (connection.isFramed()) {
tTransport = new TFramedTransport(tTransport);
}
sock.open();
} catch (TTransportException e) {
throw new IOException(e);
}
TProtocol prot;
if (connection.isCompact()) {
prot = new TCompactProtocol(tTransport);
} else {
prot = new TBinaryProtocol(tTransport);
}
THBaseService.Client client = new THBaseService.Client(prot);
return new Pair<>(client, tTransport);
}
public DefaultThriftClientBuilder(ThriftConnection connection) {
super(connection);
}
}
/**
* the default thrift http client builder. One can extend the ThriftClientBuilder to builder
* custom http client, implement features like authentication or
* 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
*/
public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
Map<String, String> customHeader = new HashMap<>();
public HTTPThriftClientBuilder(ThriftConnection connection) {
super(connection);
}
public void addCostumHeader(String key, String value) {
customHeader.put(key, value);
}
@Override
public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
Preconditions.checkArgument(connection.getHost().startsWith("http"),
"http client host must start with http or https");
String url = connection.getHost() + ":" + connection.getPort();
try {
THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
for (Map.Entry<String, String> header : customHeader.entrySet()) {
httpClient.setCustomHeader(header.getKey(), header.getValue());
}
httpClient.open();
TProtocol prot = new TBinaryProtocol(httpClient);
THBaseService.Client client = new THBaseService.Client(prot);
return new Pair<>(client, httpClient);
} catch (TTransportException e) {
throw new IOException(e);
}
}
}
/**
* Get a ThriftAdmin, ThriftAdmin is NOT thread safe
* @return a ThriftAdmin
* @throws IOException IOException
*/
@Override
public Admin getAdmin() throws IOException {
Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
}
public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
private long pause;
public DelayRetryHandler(int retryCount, long pause) {
super(retryCount, true, Arrays.asList(InterruptedIOException.class,
UnknownHostException.class, SSLException.class));
this.pause = pause;
}
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
// Don't sleep for retrying the first time
if (executionCount > 1 && pause > 0) {
try {
long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
// reset interrupt marker
Thread.currentThread().interrupt();
}
}
return super.retryRequest(exception, executionCount, context);
}
@Override
protected boolean handleAsIdempotent(HttpRequest request) {
return true;
}
}
public synchronized HttpClient getHttpClient() {
if (httpClient != null) {
return httpClient;
}
int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
HttpClientBuilder builder = HttpClientBuilder.create();
RequestConfig.Builder requestBuilder = RequestConfig.custom();
requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
builder.setRetryHandler(new DelayRetryHandler(retry, pause));
builder.setDefaultRequestConfig(requestBuilder.build());
httpClient = builder.build();
httpClientCreated = true;
return httpClient;
}
@Override
public synchronized void close() throws IOException {
if (httpClient != null && httpClientCreated) {
HttpClientUtils.closeQuietly(httpClient);
}
isClosed = true;
}
@Override
public boolean isClosed() {
return isClosed;
}
/**
* Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
* @return a TableBuilder
* @throws IOException IOException
*/
@Override
public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
return new TableBuilder() {
@Override
public TableBuilder setOperationTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setRpcTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setReadRpcTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setWriteRpcTimeout(int timeout) {
return this;
}
@Override
public Table build() {
try {
Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
} catch (IOException ioE) {
throw new RuntimeException(ioE);
}
}
};
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
@Override
public void clearRegionLocationCache() {
throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
}
@Override
public AsyncConnection toAsyncConnection() {
throw new NotImplementedException("toAsyncConnection not supported in ThriftTable");
}
@Override
public String getClusterId() {
try {
Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
return client.getFirst().getClusterId();
} catch (TException | IOException e) {
LOG.error("Error fetching cluster ID: ", e);
}
return null;
}
}