blob: fdf45c512bf0494ec0bb918d4a8c9392b288ff00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.db;
import java.math.BigInteger;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import com.datastax.driver.core.Row;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.RestoreSliceStatus;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.restore.RestoreSliceTask;
import org.apache.cassandra.sidecar.restore.RestoreSliceTracker;
import org.apache.cassandra.sidecar.restore.StorageClient;
import org.apache.cassandra.sidecar.restore.StorageClientPool;
import org.apache.cassandra.sidecar.stats.RestoreJobStats;
import org.apache.cassandra.sidecar.utils.SSTableImporter;
import org.jetbrains.annotations.NotNull;
/**
* Data object that contains all values that matter to the restore job slice
*/
public class RestoreSlice
{
private final UUID jobId;
private final String keyspace;
private final String table;
private final String sliceId;
private final short bucketId;
private final String bucket;
private final String key;
private final String checksum; // etag
private final Path targetPathInStaging; // the path to store the s3 object of the slice
private final String uploadId;
private final InstanceMetadata owner;
private final BigInteger startToken;
private final BigInteger endToken;
private final Map<String, RestoreSliceStatus> statusByReplica;
private final Set<String> replicas;
private final long creationTimeNanos;
private final long compressedSize;
private final long uncompressedSize;
private RestoreSliceTracker tracker;
private boolean existsOnS3 = false;
private int downloadAttempt = 0;
private volatile boolean isCancelled = false;
public static Builder builder()
{
return new Builder();
}
private RestoreSlice(Builder builder)
{
this.jobId = builder.jobId;
this.keyspace = builder.keyspace;
this.table = builder.table;
this.sliceId = builder.sliceId;
this.bucketId = builder.bucketId;
this.bucket = builder.bucket;
this.key = builder.key;
this.checksum = builder.checksum;
this.targetPathInStaging = builder.targetPathInStaging;
this.uploadId = builder.uploadId;
this.owner = builder.owner;
this.startToken = builder.startToken;
this.endToken = builder.endToken;
this.statusByReplica = builder.statusByReplica;
this.replicas = builder.replicas;
this.compressedSize = builder.compressedSize;
this.uncompressedSize = builder.uncompressedSize;
this.creationTimeNanos = System.nanoTime();
}
public Builder unbuild()
{
return new Builder(this);
}
@Override
public int hashCode()
{
// Note: destinationPathInStaging and owner are not included as they are 'transient'.
// status_by_replicas and replicas are not added as instances can be added
return Objects.hash(jobId, keyspace, table, sliceId, bucketId, bucket, key,
checksum, startToken, endToken, compressedSize, uncompressedSize);
}
@Override
public boolean equals(Object obj)
{
if (obj == this)
return true;
if (!(obj instanceof RestoreSlice))
return false;
RestoreSlice that = (RestoreSlice) obj;
// Note: destinationPathInStaging and owner are not included as they are 'transient'.
// status_by_replicas and replicas are not added as instances can be added
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.keyspace, that.keyspace)
&& Objects.equals(this.table, that.table)
&& Objects.equals(this.sliceId, that.sliceId)
&& Objects.equals(this.bucketId, that.bucketId)
&& Objects.equals(this.bucket, that.bucket)
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.checksum, that.checksum)
&& Objects.equals(this.startToken, that.startToken)
&& Objects.equals(this.endToken, that.endToken)
&& this.compressedSize == that.compressedSize
&& this.uncompressedSize == that.uncompressedSize;
}
// -- INTERNAL FLOW CONTROL METHODS --
/**
* Register the {@link RestoreSliceTracker} for the slice
*/
public void registerTracker(RestoreSliceTracker tracker)
{
this.tracker = tracker;
}
/**
* Make the slice as completed
*/
public void complete()
{
tracker.completeSlice(this);
}
public void failAtInstance(int instanceId)
{
statusByReplica.put(String.valueOf(instanceId), RestoreSliceStatus.FAILED);
}
/**
* Fail the job, including all its owning slices, with the provided {@link RestoreJobFatalException}.
*/
public void fail(RestoreJobFatalException exception)
{
tracker.fail(exception);
}
public void setExistsOnS3()
{
this.existsOnS3 = true;
}
public void incrementDownloadAttempt()
{
this.downloadAttempt++;
}
/**
* Cancel the slice to prevent processing them in the future.
*/
public void cancel()
{
isCancelled = true;
}
/**
* @return {@link RestoreSliceTask} of the restore slice. See {@link RestoreSliceTask} for the steps.
*/
public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool s3ClientPool,
ExecutorPools.TaskExecutorPool executorPool,
SSTableImporter importer,
double requiredUsableSpacePercentage,
RestoreJobStats stats)
{
if (isCancelled)
return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
this, null));
try
{
RestoreJob restoreJob = job();
StorageClient s3Client = s3ClientPool.storageClient(restoreJob);
return new RestoreSliceTask(restoreJob, this, s3Client,
executorPool, importer, requiredUsableSpacePercentage, stats);
}
catch (IllegalStateException illegalState)
{
// The slice is not registered with a tracker, retry later.
return promise -> promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started",
this, illegalState));
}
catch (Exception cause)
{
return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
this, cause));
}
}
// -- (self-explanatory) GETTERS --
@NotNull
public final RestoreJob job() // disable override to always lookup from registered tracker
{
if (tracker == null)
{
throw new IllegalStateException("Restore slice is not registered with a tracker");
}
return tracker.restoreJob();
}
public UUID jobId()
{
return jobId;
}
public String keyspace()
{
return keyspace;
}
public String table()
{
return table;
}
public String sliceId()
{
return sliceId;
}
public Short bucketId()
{
return this.bucketId;
}
public String bucket()
{
return bucket;
}
public String key()
{
return key;
}
public String checksum()
{
return checksum;
}
public String uploadId()
{
return uploadId;
}
public BigInteger startToken()
{
return this.startToken;
}
public BigInteger endToken()
{
return this.endToken;
}
public Map<String, RestoreSliceStatus> statusByReplica()
{
return statusByReplica;
}
public Set<String> replicas()
{
return this.replicas;
}
public Path targetPathInStaging()
{
return targetPathInStaging;
}
public long compressedSize()
{
return compressedSize;
}
public long uncompressedSize()
{
return uncompressedSize;
}
public long creationTimeNanos()
{
return creationTimeNanos;
}
public InstanceMetadata owner()
{
return owner;
}
public boolean existsOnS3()
{
return existsOnS3;
}
public int downloadAttempt()
{
return downloadAttempt;
}
public boolean isCancelled()
{
return isCancelled;
}
// -------------
public String shortDescription()
{
return "SliceId: " + sliceId + ", Key: " + key + ", Bucket: " + bucket + ", Checksum: " + checksum;
}
public static RestoreSlice from(Row row)
{
Builder builder = new Builder();
builder.sliceId(row.getString("slice_id"));
builder.bucketId(row.getShort("bucket_id"));
builder.storageBucket(row.getString("bucket"));
builder.storageKey(row.getString("key"));
builder.checksum(row.getString("checksum"));
builder.startToken(row.getVarint("start_token"));
builder.endToken(row.getVarint("end_token"));
builder.compressedSize(row.getLong("compressed_size"));
builder.uncompressedSize(row.getLong("uncompressed_size"));
builder.replicaStatus(row.getMap("status_by_replica", String.class, RestoreSliceStatus.class));
builder.replicas(row.getSet("all_replicas", String.class));
return builder.build();
}
/**
* Builder for building a {@link RestoreSlice}
*/
public static class Builder implements DataObjectBuilder<Builder, RestoreSlice>
{
private UUID jobId;
private String keyspace;
private String table;
private String sliceId;
private short bucketId;
private String bucket;
private String key;
private String checksum; // etag
private Path targetPathInStaging; // the path to store the s3 object of the slice
private String uploadId;
private InstanceMetadata owner;
private BigInteger startToken;
private BigInteger endToken;
private Map<String, RestoreSliceStatus> statusByReplica;
private Set<String> replicas;
private long compressedSize;
private long uncompressedSize;
private Builder()
{
}
private Builder(RestoreSlice slice)
{
this.jobId = slice.jobId;
this.keyspace = slice.keyspace;
this.table = slice.table;
this.sliceId = slice.sliceId;
this.bucketId = slice.bucketId;
this.bucket = slice.bucket;
this.key = slice.key;
this.checksum = slice.checksum;
this.targetPathInStaging = slice.targetPathInStaging;
this.uploadId = slice.uploadId;
this.owner = slice.owner;
this.startToken = slice.startToken;
this.endToken = slice.endToken;
this.statusByReplica = Collections.unmodifiableMap(slice.statusByReplica);
this.replicas = Collections.unmodifiableSet(slice.replicas);
}
public Builder jobId(UUID jobId)
{
return update(b -> b.jobId = jobId);
}
public Builder keyspace(String keyspace)
{
return update(b -> b.keyspace = keyspace);
}
public Builder table(String table)
{
return update(b -> b.table = table);
}
public Builder sliceId(String sliceId)
{
return update(b -> b.sliceId = sliceId);
}
public Builder bucketId(short bucketId)
{
return update(b -> b.bucketId = bucketId);
}
public Builder storageBucket(String storageBucket)
{
return update(b -> b.bucket = storageBucket);
}
public Builder storageKey(String storageKey)
{
return update(b -> b.key = storageKey);
}
public Builder checksum(String checksum)
{
return update(b -> b.checksum = checksum);
}
public Builder targetPathInStaging(Path basePath, String uploadId)
{
return update(b -> {
b.targetPathInStaging = basePath.resolve(uploadId);
b.uploadId = uploadId;
});
}
public Builder ownerInstance(InstanceMetadata owner)
{
return update(b -> b.owner = owner);
}
public Builder startToken(BigInteger startToken)
{
return update(b -> b.startToken = startToken);
}
public Builder endToken(BigInteger endToken)
{
return update(b -> b.endToken = endToken);
}
public Builder compressedSize(long compressedSize)
{
return update(b -> b.compressedSize = compressedSize);
}
public Builder uncompressedSize(long uncompressedSize)
{
return update(b -> b.uncompressedSize = uncompressedSize);
}
public Builder replicaStatus(Map<String, RestoreSliceStatus> statusByReplica)
{
return update(b -> b.statusByReplica = Collections.unmodifiableMap(statusByReplica));
}
public Builder replicas(Set<String> replicas)
{
return update(b -> b.replicas = Collections.unmodifiableSet(replicas));
}
/**
* Bulk set fields with the supplied object {@link QualifiedTableName}
*/
public Builder qualifiedTableName(QualifiedTableName qualifiedTableName)
{
return update(b -> {
b.keyspace = qualifiedTableName.keyspace();
b.table = qualifiedTableName.tableName();
});
}
/**
* Bulk set fields with the supplied object {@link CreateSliceRequestPayload}
*/
public Builder createSliceRequestPayload(CreateSliceRequestPayload payload)
{
return update(b -> {
b.sliceId = payload.sliceId();
b.bucketId = payload.bucketIdAsShort();
b.bucket = payload.bucket();
b.key = payload.key();
b.checksum = payload.checksum();
b.startToken = payload.startToken();
b.endToken = payload.endToken();
b.compressedSize = payload.compressedSizeOrZero();
b.uncompressedSize = payload.uncompressedSizeOrZero();
});
}
@Override
public RestoreSlice build()
{
return new RestoreSlice(this);
}
@Override
public Builder self()
{
return this;
}
}
}