blob: 6353894587128e66e24c1c5c02349e4e5448f4ec [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.sidecar.db;
import java.math.BigInteger;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import com.datastax.driver.core.Row;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.restore.RestoreSliceTask;
import org.apache.cassandra.sidecar.restore.RestoreSliceTracker;
import org.apache.cassandra.sidecar.restore.StorageClient;
import org.apache.cassandra.sidecar.restore.StorageClientPool;
import org.apache.cassandra.sidecar.stats.RestoreJobStats;
import org.apache.cassandra.sidecar.utils.SSTableImporter;
import org.jetbrains.annotations.NotNull;
* <p>Data object that contains all values that matter to the restore job slice.</p>
* <p>How the staged files are organized on disk? For each slice,</p>
* <ol>
* <li>the S3 object is downloaded to the path at "stageDirectory/key". It is a zip file.</li>
* <li>the zip is then extracted to the directory at "stageDirectory/keyspace/table/".
* The extracted sstables are imported into Cassandra.</li>
* </ol>
public class RestoreSlice
private final UUID jobId;
private final String keyspace;
private final String table;
private final String sliceId;
private final short bucketId;
private final String bucket;
private final String key;
private final String checksum; // etag
// The path to the directory that stores the s3 object of the slice and the sstables after unzipping.
// Its value is "baseStageDirectory/uploadId"
private final Path stageDirectory;
// The path to the staged s3 object (file). The path is inside stageDirectory.
// Its value is "stageDirectory/key"
private final Path stagedObjectPath;
private final String uploadId;
private final InstanceMetadata owner;
private final BigInteger startToken;
private final BigInteger endToken;
private final Map<String, RestoreSliceStatus> statusByReplica;
private final Set<String> replicas;
private final long creationTimeNanos;
private final long compressedSize;
private final long uncompressedSize;
private RestoreSliceTracker tracker;
// mutable states
private boolean existsOnS3 = false;
private boolean hasStaged = false;
private boolean hasImported = false;
private int downloadAttempt = 0;
private volatile boolean isCancelled = false;
public static Builder builder()
return new Builder();
private RestoreSlice(Builder builder)
this.jobId = builder.jobId;
this.keyspace = builder.keyspace;
this.table = builder.table;
this.sliceId = builder.sliceId;
this.bucketId = builder.bucketId;
this.bucket = builder.bucket;
this.key = builder.key;
this.checksum = builder.checksum;
this.stageDirectory = builder.stageDirectory;
this.stagedObjectPath = builder.stagedObjectPath;
this.uploadId = builder.uploadId;
this.owner = builder.owner;
this.startToken = builder.startToken;
this.endToken = builder.endToken;
this.statusByReplica = builder.statusByReplica;
this.replicas = builder.replicas;
this.compressedSize = builder.compressedSize;
this.uncompressedSize = builder.uncompressedSize;
this.creationTimeNanos = System.nanoTime();
public Builder unbuild()
return new Builder(this);
public int hashCode()
// Note: destinationPathInStaging and owner are not included as they are 'transient'.
// status_by_replicas and replicas are not added as instances can be added
return Objects.hash(jobId, keyspace, table, sliceId, bucketId, bucket, key,
checksum, startToken, endToken, compressedSize, uncompressedSize);
public boolean equals(Object obj)
if (obj == this)
return true;
if (!(obj instanceof RestoreSlice))
return false;
RestoreSlice that = (RestoreSlice) obj;
// Note: destinationPathInStaging and owner are not included as they are 'transient'.
// status_by_replicas and replicas are not added as instances can be added
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.keyspace, that.keyspace)
&& Objects.equals(this.table, that.table)
&& Objects.equals(this.sliceId, that.sliceId)
&& Objects.equals(this.bucketId, that.bucketId)
&& Objects.equals(this.bucket, that.bucket)
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.checksum, that.checksum)
&& Objects.equals(this.startToken, that.startToken)
&& Objects.equals(this.endToken, that.endToken)
&& this.compressedSize == that.compressedSize
&& this.uncompressedSize == that.uncompressedSize;
* Register the {@link RestoreSliceTracker} for the slice
public void registerTracker(RestoreSliceTracker tracker)
this.tracker = tracker;
* Mark the slice as completed
public void complete()
* Mark the slice has completed the stage phase
public void completeStagePhase()
this.hasStaged = true;
* Mark the slice has completed the import phase
public void completeImportPhase()
this.hasImported = true;
public void failAtInstance(int instanceId)
statusByReplica.put(String.valueOf(instanceId), RestoreSliceStatus.FAILED);
* Fail the job, including all its owning slices, with the provided {@link RestoreJobFatalException}.
public void fail(RestoreJobFatalException exception)
public void setExistsOnS3()
this.existsOnS3 = true;
public void incrementDownloadAttempt()
* Cancel the slice to prevent processing them in the future.
public void cancel()
isCancelled = true;
* @return {@link RestoreSliceTask} of the restore slice. See {@link RestoreSliceTask} for the steps.
public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool s3ClientPool,
ExecutorPools.TaskExecutorPool executorPool,
SSTableImporter importer,
double requiredUsableSpacePercentage,
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
RestoreJobStats stats)
if (isCancelled)
return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
this, null));
StorageClient s3Client = s3ClientPool.storageClient(job());
return new RestoreSliceTask(this, s3Client,
executorPool, importer,
catch (IllegalStateException illegalState)
// The slice is not registered with a tracker, retry later.
return promise -> promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started",
this, illegalState));
catch (Exception cause)
return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
this, cause));
// -- (self-explanatory) GETTERS --
public final RestoreJob job() // disable override to always lookup from registered tracker
if (tracker == null)
throw new IllegalStateException("Restore slice is not registered with a tracker");
return tracker.restoreJob();
public UUID jobId()
return jobId;
public String keyspace()
return keyspace;
public String table()
return table;
public String sliceId()
return sliceId;
public Short bucketId()
return this.bucketId;
public String bucket()
return bucket;
public String key()
return key;
public String checksum()
return checksum;
public String uploadId()
return uploadId;
public BigInteger startToken()
return this.startToken;
public BigInteger endToken()
return this.endToken;
public Map<String, RestoreSliceStatus> statusByReplica()
return statusByReplica;
public Set<String> replicas()
return this.replicas;
* @return the path to the directory that stores the s3 object of the slice
* and the sstables after unzipping
public Path stageDirectory()
return stageDirectory;
* @return the path to the staged s3 object
public Path stagedObjectPath()
return stagedObjectPath;
public long compressedSize()
return compressedSize;
public long uncompressedSize()
return uncompressedSize;
public long creationTimeNanos()
return creationTimeNanos;
public InstanceMetadata owner()
return owner;
public boolean existsOnS3()
return existsOnS3;
public boolean hasStaged()
return hasStaged;
public boolean hasImported()
return hasImported;
public int downloadAttempt()
return downloadAttempt;
public boolean isCancelled()
return isCancelled;
// -------------
public String shortDescription()
return "SliceId: " + sliceId + ", Key: " + key + ", Bucket: " + bucket + ", Checksum: " + checksum;
public static RestoreSlice from(Row row)
Builder builder = new Builder();
builder.replicaStatus(row.getMap("status_by_replica", String.class, RestoreSliceStatus.class));
builder.replicas(row.getSet("all_replicas", String.class));
* Builder for building a {@link RestoreSlice}
public static class Builder implements DataObjectBuilder<Builder, RestoreSlice>
private UUID jobId;
private String keyspace;
private String table;
private String sliceId;
private short bucketId;
private String bucket;
private String key;
private String checksum; // etag
private Path stageDirectory;
private Path stagedObjectPath;
private String uploadId;
private InstanceMetadata owner;
private BigInteger startToken;
private BigInteger endToken;
private Map<String, RestoreSliceStatus> statusByReplica;
private Set<String> replicas;
private long compressedSize;
private long uncompressedSize;
private Builder()
private Builder(RestoreSlice slice)
this.jobId = slice.jobId;
this.keyspace = slice.keyspace;
this.table = slice.table;
this.sliceId = slice.sliceId;
this.bucketId = slice.bucketId;
this.bucket = slice.bucket;
this.key = slice.key;
this.checksum = slice.checksum;
this.stageDirectory = slice.stageDirectory;
this.uploadId = slice.uploadId;
this.owner = slice.owner;
this.startToken = slice.startToken;
this.endToken = slice.endToken;
this.statusByReplica = Collections.unmodifiableMap(slice.statusByReplica);
this.replicas = Collections.unmodifiableSet(slice.replicas);
public Builder jobId(UUID jobId)
return update(b -> b.jobId = jobId);
public Builder keyspace(String keyspace)
return update(b -> b.keyspace = keyspace);
public Builder table(String table)
return update(b -> b.table = table);
public Builder sliceId(String sliceId)
return update(b -> b.sliceId = sliceId);
public Builder bucketId(short bucketId)
return update(b -> b.bucketId = bucketId);
public Builder storageBucket(String storageBucket)
return update(b -> b.bucket = storageBucket);
public Builder storageKey(String storageKey)
return update(b -> b.key = storageKey);
public Builder checksum(String checksum)
return update(b -> b.checksum = checksum);
public Builder stageDirectory(Path basePath, String uploadId)
return update(b -> {
b.stageDirectory = basePath.resolve(uploadId);
b.uploadId = uploadId;
public Builder ownerInstance(InstanceMetadata owner)
return update(b -> b.owner = owner);
public Builder startToken(BigInteger startToken)
return update(b -> b.startToken = startToken);
public Builder endToken(BigInteger endToken)
return update(b -> b.endToken = endToken);
public Builder compressedSize(long compressedSize)
return update(b -> b.compressedSize = compressedSize);
public Builder uncompressedSize(long uncompressedSize)
return update(b -> b.uncompressedSize = uncompressedSize);
public Builder replicaStatus(Map<String, RestoreSliceStatus> statusByReplica)
return update(b -> b.statusByReplica = new HashMap<>(statusByReplica));
public Builder replicas(Set<String> replicas)
return update(b -> b.replicas = new HashSet<>(replicas));
* Bulk set fields with the supplied object {@link QualifiedTableName}
public Builder qualifiedTableName(QualifiedTableName qualifiedTableName)
return update(b -> {
b.keyspace = qualifiedTableName.keyspace();
b.table = qualifiedTableName.tableName();
* Bulk set fields with the supplied object {@link CreateSliceRequestPayload}
public Builder createSliceRequestPayload(CreateSliceRequestPayload payload)
return update(b -> {
b.sliceId = payload.sliceId();
b.bucketId = payload.bucketIdAsShort();
b.bucket = payload.bucket();
b.key = payload.key();
b.checksum = payload.checksum();
b.startToken = payload.startToken();
b.endToken = payload.endToken();
b.compressedSize = payload.compressedSizeOrZero();
b.uncompressedSize = payload.uncompressedSizeOrZero();
public RestoreSlice build()
// precompute the path to the to-be-staged object on disk
stagedObjectPath = stageDirectory.resolve(key);
return new RestoreSlice(this);
public Builder self()
return this;