blob: cf7806db453113e702b50699ffe9e8b5cccfc10f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import com.google.common.base.Preconditions;
import com.stumbleupon.async.Deferred;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master.TableIdentifierPB;
/**
* A synchronous and thread-safe client for Kudu.
* <p>
* This class acts as a wrapper around {@link AsyncKuduClient} which contains all the relevant
* documentation.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KuduClient implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
public static final long NO_TIMESTAMP = -1;
@InterfaceAudience.LimitedPrivate("Test")
final AsyncKuduClient asyncClient;
KuduClient(AsyncKuduClient asyncClient) {
this.asyncClient = asyncClient;
}
/**
* Updates the last timestamp received from a server. Used for CLIENT_PROPAGATED
* external consistency.
*
* @param lastPropagatedTimestamp the last timestamp received from a server.
*/
public void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) {
asyncClient.updateLastPropagatedTimestamp(lastPropagatedTimestamp);
}
/**
* Returns the last timestamp received from a server. Used for CLIENT_PROPAGATED
* external consistency. Note that the returned timestamp is encoded and cannot be
* interpreted as a raw timestamp.
*
* @return a long indicating the specially-encoded last timestamp received from a server
*/
public long getLastPropagatedTimestamp() {
return asyncClient.getLastPropagatedTimestamp();
}
/**
* Checks if the client received any timestamps from a server. Used for
* CLIENT_PROPAGATED external consistency.
*
* @return true if last propagated timestamp has been set
*/
public boolean hasLastPropagatedTimestamp() {
return asyncClient.hasLastPropagatedTimestamp();
}
/**
* Returns a string representation of this client's location. If this
* client was not assigned a location, returns the empty string.
*
* @return a string representation of this client's location
*/
public String getLocationString() {
return asyncClient.getLocationString();
}
/**
* Returns the ID of the cluster that this client is connected to.
* It will be an empty string if the client is not connected or
* the client is connected to a cluster that doesn't support
* cluster IDs.
*
* @return the ID of the cluster that this client is connected to
*/
public String getClusterId() {
return asyncClient.getClusterId();
}
/**
* Returns the unique client id assigned to this client.
* @return the unique client id assigned to this client.
*/
String getClientId() {
return asyncClient.getClientId();
}
/**
* Returns the Hive Metastore configuration of the cluster.
*
* @return the Hive Metastore configuration of the cluster
* @throws KuduException if the configuration can not be retrieved
*/
@InterfaceAudience.LimitedPrivate("Impala")
@InterfaceStability.Unstable
public HiveMetastoreConfig getHiveMetastoreConfig() throws KuduException {
return joinAndHandleException(asyncClient.getHiveMetastoreConfig());
}
/**
* Create a table on the cluster with the specified name, schema, and table configurations.
* @param name the table's name
* @param schema the table's schema
* @param builder a builder containing the table's configurations
* @return an object to communicate with the created table
* @throws KuduException if anything went wrong
*/
public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
throws KuduException {
Deferred<KuduTable> d = asyncClient.createTable(name, schema, builder);
return joinAndHandleException(d);
}
/**
* Waits for all of the tablets in a table to be created, or until the
* default admin operation timeout is reached.
* @param name the table's name
* @return true if the table is done being created, or false if the default
* admin operation timeout was reached.
* @throws KuduException for any error returned by sending RPCs to the master
* (e.g. the table does not exist)
*/
public boolean isCreateTableDone(String name) throws KuduException {
TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder().setTableName(name);
Deferred<KuduTable> d = asyncClient.getDelayedIsCreateTableDoneDeferred(table, null, null);
try {
joinAndHandleException(d);
} catch (KuduException e) {
if (e.getStatus().isTimedOut()) {
return false;
}
throw e;
}
return true;
}
/**
* Delete a table on the cluster with the specified name.
* @param name the table's name
* @return an rpc response object
* @throws KuduException if anything went wrong
*/
public DeleteTableResponse deleteTable(String name) throws KuduException {
Deferred<DeleteTableResponse> d = asyncClient.deleteTable(name);
return joinAndHandleException(d);
}
/**
* Alter a table on the cluster as specified by the builder.
* @param name the table's name (old name if the table is being renamed)
* @param ato the alter table options
* @return an rpc response object
* @throws KuduException if anything went wrong
*/
public AlterTableResponse alterTable(String name, AlterTableOptions ato) throws KuduException {
Deferred<AlterTableResponse> d = asyncClient.alterTable(name, ato);
return joinAndHandleException(d);
}
/**
* Waits for all of the tablets in a table to be altered, or until the
* default admin operation timeout is reached.
* @param name the table's name
* @return true if the table is done being altered, or false if the default
* admin operation timeout was reached.
* @throws KuduException for any error returned by sending RPCs to the master
* (e.g. the table does not exist)
*/
public boolean isAlterTableDone(String name) throws KuduException {
TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder().setTableName(name);
Deferred<AlterTableResponse> d =
asyncClient.getDelayedIsAlterTableDoneDeferred(table, null, null);
try {
joinAndHandleException(d);
} catch (KuduException e) {
if (e.getStatus().isTimedOut()) {
return false;
}
throw e;
}
return true;
}
/**
* Get the list of running tablet servers.
* @return a list of tablet servers
* @throws KuduException if anything went wrong
*/
public ListTabletServersResponse listTabletServers() throws KuduException {
Deferred<ListTabletServersResponse> d = asyncClient.listTabletServers();
return joinAndHandleException(d);
}
/**
* Get the list of all the tables.
* @return a list of all the tables
* @throws KuduException if anything went wrong
*/
public ListTablesResponse getTablesList() throws KuduException {
return getTablesList(null);
}
/**
* Get a list of table names. Passing a null filter returns all the tables. When a filter is
* specified, it only returns tables that satisfy a substring match.
* @param nameFilter an optional table name filter
* @return a deferred that contains the list of table names
* @throws KuduException if anything went wrong
*/
public ListTablesResponse getTablesList(String nameFilter) throws KuduException {
Deferred<ListTablesResponse> d = asyncClient.getTablesList(nameFilter);
return joinAndHandleException(d);
}
/**
* Get table's statistics from master.
* @param name the table's name
* @return the statistics of table
* @throws KuduException if anything went wrong
*/
public KuduTableStatistics getTableStatistics(String name) throws KuduException {
Deferred<KuduTableStatistics> d = asyncClient.getTableStatistics(name);
return joinAndHandleException(d);
}
/**
* Test if a table exists.
* @param name a non-null table name
* @return true if the table exists, else false
* @throws KuduException if anything went wrong
*/
public boolean tableExists(String name) throws KuduException {
Deferred<Boolean> d = asyncClient.tableExists(name);
return joinAndHandleException(d);
}
/**
* Open the table with the given id.
*
* @param id the id of the table to open
* @return a KuduTable if the table exists
* @throws KuduException if anything went wrong
*/
KuduTable openTableById(final String id) throws KuduException {
Deferred<KuduTable> d = asyncClient.openTableById(id);
return joinAndHandleException(d);
}
/**
* Open the table with the given name.
*
* New range partitions created by other clients will immediately be available
* after opening the table.
*
* @param name table to open
* @return a KuduTable if the table exists
* @throws KuduException if anything went wrong
*/
public KuduTable openTable(final String name) throws KuduException {
Deferred<KuduTable> d = asyncClient.openTable(name);
return joinAndHandleException(d);
}
/**
* Create a new session for interacting with the cluster.
* User is responsible for destroying the session object.
* This is a fully local operation (no RPCs or blocking).
* @return a synchronous wrapper around KuduSession.
*/
public KuduSession newSession() {
AsyncKuduSession session = asyncClient.newSession();
return new KuduSession(session);
}
/**
* Start a new multi-row distributed transaction.
* <p>
* Start a new multi-row transaction and return a handle for the transactional
* object to manage the newly started transaction. Under the hood, this makes
* an RPC call to the Kudu cluster and registers a newly created transaction
* in the system. This call is blocking.
*
* @return a handle to the newly started transaction in case of success
*/
public KuduTransaction newTransaction() throws KuduException {
KuduTransaction txn = new KuduTransaction(asyncClient);
txn.begin();
return txn;
}
/**
* Check if statistics collection is enabled for this client.
* @return true if it is enabled, else false
*/
public boolean isStatisticsEnabled() {
return asyncClient.isStatisticsEnabled();
}
/**
* Get the statistics object of this client.
*
* @return this client's Statistics object
* @throws IllegalStateException thrown if statistics collection has been disabled
*/
public Statistics getStatistics() {
return asyncClient.getStatistics();
}
/**
* Creates a new {@link KuduScanner.KuduScannerBuilder} for a particular table.
* @param table the table you intend to scan.
* The string is assumed to use the platform's default charset.
* @return a new scanner builder for the table
*/
public KuduScanner.KuduScannerBuilder newScannerBuilder(KuduTable table) {
return new KuduScanner.KuduScannerBuilder(asyncClient, table);
}
/**
* Creates a new {@link KuduScanToken.KuduScanTokenBuilder} for a particular table.
* Used for integrations with compute frameworks.
* @param table the table you intend to scan
* @return a new scan token builder for the table
*/
public KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder(KuduTable table) {
return new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
}
/**
* Analogous to {@link #shutdown()}.
* @throws KuduException if an error happens while closing the connections
*/
@Override
public void close() throws KuduException {
try {
asyncClient.close();
} catch (Exception e) {
throw KuduException.transformException(e);
}
}
/**
* Performs a graceful shutdown of this instance.
* @throws KuduException if anything went wrong
*/
public void shutdown() throws KuduException {
Deferred<ArrayList<Void>> d = asyncClient.shutdown();
joinAndHandleException(d);
}
/**
* Export serialized authentication data that may be passed to a different
* client instance and imported to provide that client the ability to connect
* to the cluster.
*/
@InterfaceStability.Unstable
public byte[] exportAuthenticationCredentials() throws KuduException {
return joinAndHandleException(asyncClient.exportAuthenticationCredentials());
}
/**
* Import data allowing this client to authenticate to the cluster.
* This will typically be used before making any connections to servers
* in the cluster.
*
* Note that, if this client has already been used by one user, this
* method cannot be used to switch authenticated users. Attempts to
* do so have undefined results, and may throw an exception.
*
* @param authnData then authentication data provided by a prior call to
* {@link #exportAuthenticationCredentials()}
*/
@InterfaceStability.Unstable
public void importAuthenticationCredentials(byte[] authnData) {
asyncClient.importAuthenticationCredentials(authnData);
}
/**
* Get the timeout used for operations on sessions and scanners.
* @return a timeout in milliseconds
*/
public long getDefaultOperationTimeoutMs() {
return asyncClient.getDefaultOperationTimeoutMs();
}
/**
* Get the timeout used for admin operations.
* @return a timeout in milliseconds
*/
public long getDefaultAdminOperationTimeoutMs() {
return asyncClient.getDefaultAdminOperationTimeoutMs();
}
/**
* @return the list of master addresses, stringified using commas to separate
* them
*/
public String getMasterAddressesAsString() {
return asyncClient.getMasterAddressesAsString();
}
/**
* Sends a request to the master to check if the cluster supports ignore operations.
* @return true if the cluster supports ignore operations
*/
@InterfaceAudience.Private
public boolean supportsIgnoreOperations() throws KuduException {
return joinAndHandleException(asyncClient.supportsIgnoreOperations());
}
/**
* @return a HostAndPort describing the current leader master
* @throws KuduException if a leader master could not be found in time
*/
@InterfaceAudience.LimitedPrivate("Test")
public HostAndPort findLeaderMasterServer() throws KuduException {
// Consult the cache to determine the current leader master.
//
// If one isn't found, issue an RPC that retries until the leader master
// is discovered. We don't need the RPC's results; it's just a simple way to
// wait until a leader master is elected.
TableLocationsCache.Entry entry = asyncClient.getTableLocationEntry(
AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER, null);
if (entry == null) {
// If there's no leader master, this will time out and throw an exception.
listTabletServers();
entry = asyncClient.getTableLocationEntry(
AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER, null);
}
Preconditions.checkNotNull(entry);
Preconditions.checkState(!entry.isNonCoveredRange());
ServerInfo info = entry.getTablet().getLeaderServerInfo();
Preconditions.checkNotNull(info);
return info.getHostAndPort();
}
// Helper method to handle joining and transforming the Exception we receive.
static <R> R joinAndHandleException(Deferred<R> deferred) throws KuduException {
try {
return deferred.join();
} catch (Exception e) {
throw KuduException.transformException(e);
}
}
/**
* Builder class to use in order to connect to Kudu.
* All the parameters beyond those in the constructors are optional.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static final class KuduClientBuilder {
private AsyncKuduClient.AsyncKuduClientBuilder clientBuilder;
/**
* Creates a new builder for a client that will connect to the specified masters.
* @param masterAddresses comma-separated list of "host:port" pairs of the masters
*/
public KuduClientBuilder(String masterAddresses) {
clientBuilder = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses);
}
/**
* Creates a new builder for a client that will connect to the specified masters.
*
* <p>Here are some examples of recognized formats:
* <ul>
* <li>example.com
* <li>example.com:80
* <li>192.0.2.1
* <li>192.0.2.1:80
* <li>[2001:db8::1]
* <li>[2001:db8::1]:80
* <li>2001:db8::1
* </ul>
*
* @param masterAddresses list of master addresses
*/
public KuduClientBuilder(List<String> masterAddresses) {
clientBuilder = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses);
}
/**
* Sets the default timeout used for administrative operations (e.g. createTable, deleteTable,
* etc).
* Optional.
* If not provided, defaults to 30s.
* A value of 0 disables the timeout.
* @param timeoutMs a timeout in milliseconds
* @return this builder
*/
public KuduClientBuilder defaultAdminOperationTimeoutMs(long timeoutMs) {
clientBuilder.defaultAdminOperationTimeoutMs(timeoutMs);
return this;
}
/**
* Sets the default timeout used for user operations (using sessions and scanners).
* Optional.
* If not provided, defaults to 30s.
* A value of 0 disables the timeout.
* @param timeoutMs a timeout in milliseconds
* @return this builder
*/
public KuduClientBuilder defaultOperationTimeoutMs(long timeoutMs) {
clientBuilder.defaultOperationTimeoutMs(timeoutMs);
return this;
}
/**
* Socket read timeouts are no longer used in the Java client and have no effect.
* Setting this has no effect.
* @param timeoutMs a timeout in milliseconds
* @return this builder
* @deprecated socket read timeouts are no longer used
*/
@Deprecated public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
LOG.info("defaultSocketReadTimeoutMs is deprecated");
return this;
}
/**
* Disable this client's collection of statistics.
* Statistics are enabled by default.
* @return this builder
*/
public KuduClientBuilder disableStatistics() {
clientBuilder.disableStatistics();
return this;
}
/**
* @deprecated the bossExecutor is no longer used and will have no effect if provided
*/
@Deprecated
public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor workerExecutor) {
clientBuilder.nioExecutors(bossExecutor, workerExecutor);
return this;
}
/**
* Set the executor which will be used for the embedded Netty workers.
*
* Optional.
* If not provided, uses a simple cached threadpool. If workerExecutor is null,
* then such a thread pool will be used.
* Note: executor's max thread number must be greater or equal to corresponding
* worker count, or netty cannot start enough threads, and client will get stuck.
* If not sure, please just use CachedThreadPool.
*/
public KuduClientBuilder nioExecutor(Executor workerExecutor) {
clientBuilder.nioExecutor(workerExecutor);
return this;
}
/**
* @deprecated the bossExecutor is no longer used and will have no effect if provided
*/
@Deprecated
public KuduClientBuilder bossCount(int bossCount) {
LOG.info("bossCount is deprecated");
return this;
}
/**
* Set the maximum number of worker threads.
* A worker thread performs non-blocking read and write for one or more
* Netty Channels in a non-blocking mode.
*
* Optional.
* If not provided, (2 * the number of available processors) is used. If
* this client instance will be used on a machine running many client
* instances, it may be wise to lower this count, for example to avoid
* resource limits, at the possible cost of some performance of this client
* instance.
*/
public KuduClientBuilder workerCount(int workerCount) {
clientBuilder.workerCount(workerCount);
return this;
}
/**
* Creates a new client that connects to the masters.
* Doesn't block and won't throw an exception if the masters don't exist.
* @return a new asynchronous Kudu client
*/
public KuduClient build() {
AsyncKuduClient client = clientBuilder.build();
return new KuduClient(client);
}
}
}