blob: 3d20810adfcb87816e67a9baf442d458aa7735f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.restore;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.ExecutorPoolsHelper;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobTest;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.server.MainModule;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class RestoreJobManagerTest
{
private RestoreJobManager manager;
private Vertx vertx;
private ExecutorPools executorPools;
private static final int jobRecencyDays = 1;
@TempDir
private Path testDir;
@BeforeEach
void setup()
{
Injector injector = Guice.createInjector(new MainModule());
vertx = injector.getInstance(Vertx.class);
executorPools = ExecutorPoolsHelper.createdSharedTestPool(vertx);
RestoreProcessor processor = mock(RestoreProcessor.class);
InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
when(instanceMetadata.stagingDir()).thenReturn(testDir.toString());
RestoreJobConfiguration restoreJobConfiguration = mock(RestoreJobConfiguration.class);
when(restoreJobConfiguration.jobDiscoveryActiveLoopDelayMillis()).thenReturn(0L);
when(restoreJobConfiguration.jobDiscoveryIdleLoopDelayMillis()).thenReturn(0L);
when(restoreJobConfiguration.jobDiscoveryRecencyDays()).thenReturn(jobRecencyDays);
when(restoreJobConfiguration.processMaxConcurrency()).thenReturn(0);
when(restoreJobConfiguration.restoreJobTablesTtlSeconds()).thenReturn(TimeUnit.DAYS.toSeconds(14) + 1);
manager = new RestoreJobManager(restoreJobConfiguration,
instanceMetadata,
executorPools,
processor);
}
@AfterEach
void teardown()
{
// close in the fire-and-forget way
executorPools.close();
vertx.close();
}
@Test
void testTrySubmit() throws RestoreJobException
{
// submit the first time
RestoreSlice slice = getTestSlice();
assertThat(manager.trySubmit(slice, slice.job()))
.isEqualTo(RestoreSliceTracker.Status.CREATED);
// submit twice
assertThat(manager.trySubmit(slice, slice.job()))
.isEqualTo(RestoreSliceTracker.Status.PENDING);
slice.complete();
assertThat(manager.trySubmit(slice, slice.job()))
.isEqualTo(RestoreSliceTracker.Status.COMPLETED);
}
@Test
void testTrySubmitAfterJobFailure() throws RestoreJobException
{
RestoreSlice slice = getTestSlice();
assertThat(manager.trySubmit(slice, slice.job()))
.isEqualTo(RestoreSliceTracker.Status.CREATED);
RestoreJobFatalException failure = new RestoreJobFatalException("fatal");
slice.fail(failure);
assertThatThrownBy(() -> manager.trySubmit(slice, slice.job()))
.isSameAs(failure);
// submitting other slices in the same job should fail too
RestoreSlice anotherSlice = getTestSlice(slice.job());
assertThatThrownBy(() -> manager.trySubmit(anotherSlice, anotherSlice.job()))
.describedAs("Once a slice failed, no more slice can be submitted")
.isSameAs(failure);
// however, slices from a different job are still permitted
RestoreSlice sliceOfDifferentJob = getTestSlice();
assertThat(manager.trySubmit(sliceOfDifferentJob, sliceOfDifferentJob.job()))
.isEqualTo(RestoreSliceTracker.Status.CREATED);
}
@Test
void testRemoveJobInternal() throws RestoreJobException
{
RestoreSlice slice = getTestSlice();
assertThat(manager.trySubmit(slice, slice.job()))
.isEqualTo(RestoreSliceTracker.Status.CREATED);
assertThat(slice.isCancelled()).isFalse();
manager.removeJobInternal(slice.jobId()); // it cancels the non-completed slices
assertThat(slice.isCancelled()).isTrue();
}
@Test
void testUpdateRestoreJobForSubmittedSlice() throws RestoreJobFatalException
{
// test setup and submit slice
RestoreSlice slice = getTestSlice();
RestoreJob job = slice.job();
assertThat(manager.trySubmit(slice, job))
.isEqualTo(RestoreSliceTracker.Status.CREATED);
assertThat(slice.job()).isNotNull();
assertThat(slice.job()).isSameAs(job);
// update with the same job, it should read the same job reference back from the slice
manager.updateRestoreJob(job);
assertThat(slice.job()).isSameAs(job);
// update with the updated job, it should read the reference of the update job from the slice
RestoreJob updatedJob = RestoreJobTest.createNewTestingJob(slice.jobId());
manager.updateRestoreJob(updatedJob);
assertThat(slice.job()).isNotSameAs(job);
assertThat(slice.job()).isSameAs(updatedJob);
}
@Test
void testCheckDirectoryIsObsolete() throws IOException
{
Path jobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis())));
// not old enough
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// still not old enough (not 1 day yet)
jobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis()
- TimeUnit.DAYS.toMillis(jobRecencyDays)
+ 9000)));
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// invalid format: missing 'restore-' prefix
jobDir = newDir(UUIDs.startOf(System.currentTimeMillis()
- TimeUnit.DAYS.toMillis(jobRecencyDays + 1)).toString());
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// invalid format
jobDir = newDir("foo");
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// invalid format: not timeuuid
jobDir = newDir(RestoreJobUtil.prefixedJobId(UUID.randomUUID()));
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// dir not exist
jobDir = testDir.resolve("I_do_not_exist");
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// it is not a directory
jobDir = testDir.resolve("I_am_file");
assertThat(jobDir.toFile().createNewFile()).isTrue();
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isFalse();
// format is good; directory is older than jobRecencyDays
jobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis()
- TimeUnit.DAYS.toMillis(jobRecencyDays)
- 1)));
assertThat(manager.isObsoleteRestoreJobDir(jobDir)).isTrue();
}
@Test
void testDeleteObsoleteData() throws ExecutionException, InterruptedException, TimeoutException, IOException
{
Path oldJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis()
- TimeUnit.DAYS.toMillis(jobRecencyDays)
- 1)));
createFileInDirectory(oldJobDir, 5);
Path olderJobDir
= newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis()
- TimeUnit.DAYS.toMillis(jobRecencyDays + 1))));
createFileInDirectory(olderJobDir, 5);
Path newJobDir = newDir(RestoreJobUtil.prefixedJobId(UUIDs.startOf(System.currentTimeMillis())));
createFileInDirectory(newJobDir, 5);
manager.deleteObsoleteDataAsync()
.toCompletionStage()
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
assertThat(Files.exists(oldJobDir)).describedAs("Should be deleted").isFalse();
assertThat(Files.exists(olderJobDir)).describedAs("Should be deleted").isFalse();
assertThat(Files.exists(newJobDir)).describedAs("Should survive").isTrue();
assertThat(newJobDir.toFile().list())
.describedAs("Should have 5 files intact")
.hasSize(5);
}
private RestoreSlice getTestSlice()
{
return getTestSlice(RestoreJobTest.createNewTestingJob(UUIDs.timeBased()));
}
private RestoreSlice getTestSlice(RestoreJob job)
{
InstanceMetadata owner = mock(InstanceMetadata.class);
when(owner.id()).thenReturn(1);
RestoreSlice slice = RestoreSlice
.builder()
.jobId(job.jobId)
.bucketId((short) 0)
.stageDirectory(testDir, "uploadId")
.storageKey("storageKey")
.storageBucket("storageBucket")
.ownerInstance(owner)
.replicaStatus(Collections.emptyMap())
.replicas(Collections.emptySet())
.build();
RestoreSliceTracker tracker = new RestoreSliceTracker(job, mock(RestoreProcessor.class));
slice.registerTracker(tracker);
return slice;
}
private Path newDir(String name) throws IOException
{
Path dir = testDir.resolve(name);
Files.createDirectories(dir);
return dir;
}
private void createFileInDirectory(Path path, int nFiles) throws IOException
{
for (int i = 0; i < nFiles; i++)
{
Files.createFile(Paths.get(path.toString(), UUID.randomUUID().toString()));
}
assertThat(path.toFile().list()).hasSize(nFiles);
}
}