blob: d9d77f89e39fc24c58a314fb9f708fc0e38ef7a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.sidecar.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.cassandra.sidecar.client.request.ImportSSTableRequest;
import org.apache.cassandra.sidecar.client.retry.IgnoreConflictRetryPolicy;
import org.apache.cassandra.sidecar.client.retry.OncePerInstanceRetryPolicy;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.client.retry.RunnableOnStatusCodeRetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.RandomInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.common.NodeSettings;
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.data.RingResponse;
import org.apache.cassandra.sidecar.common.data.SSTableImportResponse;
import org.apache.cassandra.sidecar.common.data.SchemaResponse;
import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
/**
* The SidecarClient class to perform requests
*/
public class SidecarClient implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(SidecarClient.class);
protected RequestExecutor executor;
protected final RetryPolicy defaultRetryPolicy;
protected final RetryPolicy ignoreConflictRetryPolicy;
protected RequestContext.Builder baseBuilder;
public SidecarClient(SidecarInstancesProvider instancesProvider,
RequestExecutor requestExecutor,
SidecarClientConfig sidecarClientConfig,
RetryPolicy defaultRetryPolicy)
{
this.defaultRetryPolicy = defaultRetryPolicy;
ignoreConflictRetryPolicy = new IgnoreConflictRetryPolicy(sidecarClientConfig.maxRetries(),
sidecarClientConfig.retryDelayMillis(),
sidecarClientConfig.maxRetryDelayMillis());
baseBuilder = new RequestContext.Builder()
.instanceSelectionPolicy(new RandomInstanceSelectionPolicy(instancesProvider))
.retryPolicy(defaultRetryPolicy);
executor = requestExecutor;
}
/**
* Executes the Sidecar health request using the configured selection policy and with no retries
*
* @return a completable future of the Sidecar health response
*/
public CompletableFuture<HealthResponse> sidecarHealth()
{
return executor.executeRequestAsync(requestBuilder()
.sidecarHealthRequest()
.retryPolicy(new OncePerInstanceRetryPolicy())
.build());
}
/**
* Executes the Cassandra health request using the configured selection policy and with no retries
*
* @return a completable future of the Cassandra health response
*/
public CompletableFuture<HealthResponse> cassandraHealth()
{
return executor.executeRequestAsync(requestBuilder()
.cassandraHealthRequest()
.retryPolicy(new OncePerInstanceRetryPolicy())
.build());
}
/**
* Executes the full schema request using the default retry policy and configured selection policy
*
* @return a completable future of the full schema response
*/
public CompletableFuture<SchemaResponse> fullSchema()
{
return executor.executeRequestAsync(requestBuilder().schemaRequest().build());
}
/**
* Executes the schema request for the {@code keyspace} using the default retry policy and configured selection
* policy
*
* @param keyspace the keyspace in Cassandra
* @return a completable future of the schema response for the provided {@code keyspace}
*/
public CompletableFuture<SchemaResponse> schema(String keyspace)
{
return executor.executeRequestAsync(requestBuilder().schemaRequest(keyspace).build());
}
/**
* Executes the ring request for the {@code keyspace} using the default retry policy and configured selection
* policy
*
* @param keyspace the keyspace in Cassandra
* @return a completable future of the ring response for the provided {@code keyspace}
*/
public CompletableFuture<RingResponse> ring(String keyspace)
{
return executor.executeRequestAsync(requestBuilder().ringRequest(keyspace).build());
}
/**
* Executes the node settings request using the default retry policy and configured selection policy
*
* @return a completable future of the node settings
*/
public CompletableFuture<NodeSettings> nodeSettings()
{
return executor.executeRequestAsync(requestBuilder().nodeSettingsRequest().build());
}
/**
* Executes the node settings request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @return a completable future of the node settings
*/
public CompletableFuture<NodeSettings> nodeSettings(SidecarInstance instance)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.nodeSettingsRequest()
.build());
}
/**
* Executes the gossip info request using the default retry policy and configured selection policy
*
* @return a completable future of the gossip info
*/
public CompletableFuture<GossipInfoResponse> gossipInfo()
{
return executor.executeRequestAsync(requestBuilder().gossipInfoRequest().build());
}
/**
* Executes the time skew request using the default retry policy and configured selection policy
*
* @return a completable future of the time skew
*/
public CompletableFuture<TimeSkewResponse> timeSkew()
{
return executor.executeRequestAsync(requestBuilder().timeSkewRequest().build());
}
/**
* Executes the time skew request using the default retry policy and uses random instance selection policy
* with the provided instances
*
* @param instances the list of Sidecar instances to try for this request
* @return a completable future of the time skew
*/
public CompletableFuture<TimeSkewResponse> timeSkew(List<? extends SidecarInstance> instances)
{
SidecarInstancesProvider instancesProvider = new SimpleSidecarInstancesProvider(instances);
InstanceSelectionPolicy instanceSelectionPolicy = new RandomInstanceSelectionPolicy(instancesProvider);
return executor.executeRequestAsync(requestBuilder()
.instanceSelectionPolicy(instanceSelectionPolicy)
.timeSkewRequest()
.build());
}
/**
* Executes the token-range replicas request using the default retry policy and configured selection policy
*
* @param instances the list of Sidecar instances to try for this request
* @param keyspace the keyspace in Cassandra
* @return a completable future of the token-range replicas
*/
public CompletableFuture<TokenRangeReplicasResponse> tokenRangeReplicas(List<? extends SidecarInstance> instances,
String keyspace)
{
SidecarInstancesProvider instancesProvider = new SimpleSidecarInstancesProvider(instances);
InstanceSelectionPolicy instanceSelectionPolicy = new RandomInstanceSelectionPolicy(instancesProvider);
return executeRequestAsync(requestBuilder()
.instanceSelectionPolicy(instanceSelectionPolicy)
.tokenRangeReplicasRequest(keyspace)
.build());
}
/**
* Executes the list snapshot files request including secondary index files using the default retry policy and
* provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param snapshotName the name of the snapshot
* @return a completable future for the request
*/
public CompletableFuture<ListSnapshotFilesResponse> listSnapshotFiles(SidecarInstance instance,
String keyspace,
String table,
String snapshotName)
{
return listSnapshotFiles(instance, keyspace, table, snapshotName, true);
}
/**
* Executes the list snapshot files request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param snapshotName the name of the snapshot
* @param includeSecondaryIndexFiles whether to include secondary index files
* @return a completable future for the request
*/
public CompletableFuture<ListSnapshotFilesResponse> listSnapshotFiles(SidecarInstance instance,
String keyspace,
String table,
String snapshotName,
boolean includeSecondaryIndexFiles)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.listSnapshotFilesRequest(keyspace,
table,
snapshotName,
includeSecondaryIndexFiles)
.build());
}
/**
* Executes the clear snapshot request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param snapshotName the name of the snapshot
* @return a completable future for the request
*/
public CompletableFuture<Void> clearSnapshot(SidecarInstance instance,
String keyspace,
String table,
String snapshotName)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.clearSnapshotRequest(keyspace, table, snapshotName)
.build());
}
/**
* Executes the create snapshot request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param snapshotName the name of the snapshot
* @return a completable future for the request
*/
public CompletableFuture<Void> createSnapshot(SidecarInstance instance,
String keyspace,
String table,
String snapshotName)
{
return executor.executeRequestAsync(requestBuilder().retryPolicy(ignoreConflictRetryPolicy)
.singleInstanceSelectionPolicy(instance)
.createSnapshotRequest(keyspace, table, snapshotName)
.build());
}
/**
* Streams the specified {@code range} of an SSTable {@code componentName} for the given {@code keyspace},
* {@code table} from an existing {@code snapshotName}, the stream is consumed by the
* {@link StreamConsumer consumer}.
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param snapshotName the name of the snapshot
* @param componentName the name of the SSTable component
* @param range the HTTP range for the request
* @param streamConsumer the object that consumes the stream
*/
public void streamSSTableComponent(SidecarInstance instance,
String keyspace,
String table,
String snapshotName,
String componentName,
HttpRange range,
StreamConsumer streamConsumer)
{
executor.streamRequest(requestBuilder()
.singleInstanceSelectionPolicy(instance)
.ssTableComponentRequest(keyspace, table, snapshotName, componentName, range)
.build(), streamConsumer);
}
/**
* Uploads the SSTable to the provided {@code instance} using the default retry policy.
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param uploadId the unique identifier for the upload
* @param componentName the name of the SSTable component
* @param checksum hash value to check integrity of SSTable component uploaded
* @param filename the path to the file to be uploaded
* @return a completable future for the request
*/
public CompletableFuture<Void> uploadSSTableRequest(SidecarInstance instance,
String keyspace,
String table,
String uploadId,
String componentName,
String checksum,
String filename)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.uploadSSTableRequest(keyspace,
table,
uploadId,
componentName,
checksum,
filename)
.build());
}
/**
* Executes the import SSTable request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param keyspace the keyspace in Cassandra
* @param table the table name in Cassandra
* @param uploadId the unique identifier for the upload
* @param options additional options for the import process
* @return a completable future for the request
*/
public CompletableFuture<SSTableImportResponse> importSSTableRequest(SidecarInstance instance,
String keyspace,
String table,
String uploadId,
ImportSSTableRequest.ImportOptions options)
{
Runnable customLog = () ->
LOGGER.info("Request to {} ACCEPTED but not yet complete - " +
"will retry until success/failure. uploadId={}", instance, uploadId);
RetryPolicy retryPolicy = new RunnableOnStatusCodeRetryPolicy(customLog,
defaultRetryPolicy,
HttpResponseStatus.ACCEPTED.code(),
10);
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.retryPolicy(retryPolicy)
.importSSTableRequest(keyspace, table, uploadId, options)
.build());
}
/**
* Executes the clear upload session request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @param uploadId the unique identifier for the upload
* @return a completable future for the request
*/
public CompletableFuture<Void> cleanUploadSession(SidecarInstance instance, String uploadId)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
.cleanSSTableUploadSessionRequest(uploadId)
.build());
}
/**
* Returns a copy of the request builder with the default parameters configured for the client.
*
* <p>The request builder can be used to create the request, containing default values as depicted in the example
* below:
*
* <pre>
* RequestContext requestContext = client.requestBuilder()
* .request(new new NodeSettingsRequest())
* .retryPolicy(new NoRetryPolicy())
* .build();
* </pre>
*
* <p>The example above will create a request to retrieve the node settings from a random Sidecar instance
* in the cluster. It will use the {@code NoRetryPolicy} policy. A custom retry policy can encapsulate the
* desired behavior of the client when dealing with specific status codes.
*
* @return a copy of the builder to prevent threads modifying the state of the builder
*/
public RequestContext.Builder requestBuilder()
{
return baseBuilder.copy();
}
/**
* @return the default {@link RetryPolicy} configured for the client
*/
public RetryPolicy defaultRetryPolicy()
{
return defaultRetryPolicy;
}
/**
* Returns a future with the expected instance of type {@code <T>} after executing the {@code request} and
* processing it.
*
* @param context the request context
* @param <T> the expected type for the instance
* @return a future with the expected instance of type {@code <T>} after executing the {@code request} and
* processing it
*/
public <T> CompletableFuture<T> executeRequestAsync(RequestContext context)
{
return executor.executeRequestAsync(context);
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws Exception
{
executor.close();
}
}