blob: c6cbb5e41f84cbc799d478dd6078415619944acd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.restore;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
/**
* The group of managers of all instances
*/
@Singleton
public class RestoreJobManagerGroup
{
private final RestoreJobConfiguration restoreJobConfig;
// instance id --> RestoreJobManager
private final Map<Integer, RestoreJobManager> managerGroup = new ConcurrentHashMap<>();
private final RestoreProcessor restoreProcessor;
private final RestoreJobDiscoverer jobDiscoverer;
private final ExecutorPools executorPools;
@Inject
public RestoreJobManagerGroup(SidecarConfiguration configuration,
InstancesConfig instancesConfig,
ExecutorPools executorPools,
PeriodicTaskExecutor periodicTaskExecutor,
RestoreProcessor restoreProcessor,
RestoreJobDiscoverer jobDiscoverer)
{
this.restoreJobConfig = configuration.restoreJobConfiguration();
this.restoreProcessor = restoreProcessor;
this.jobDiscoverer = jobDiscoverer;
this.executorPools = executorPools;
initializeManagers(instancesConfig);
jobDiscoverer.signalRefresh();
periodicTaskExecutor.schedule(jobDiscoverer);
periodicTaskExecutor.schedule(restoreProcessor);
}
/**
* Simply delegates to {@link RestoreJobManager#trySubmit(RestoreSlice, RestoreJob)}
*
* @param instance the cassandra instance to submit the slice to
* @param slice restore slice
* @param restoreJob the restore job instance
* @return status of the submitted slice
* @throws RestoreJobFatalException the job has failed
*/
public RestoreSliceTracker.Status trySubmit(InstanceMetadata instance, RestoreSlice slice, RestoreJob restoreJob)
throws RestoreJobFatalException
{
return getManager(instance).trySubmit(slice, restoreJob);
}
/**
* Send a signal to the restore job discovery loop to refresh
*/
public void signalRefreshRestoreJob()
{
jobDiscoverer.signalRefresh();
}
/**
* Remove the tracker of the job when it is completed and delete its data on disk. The method internal.
* It should only be called by the background task, when it discovers the job is
* in the final {@link RestoreJobStatus}, i.e. SUCCEEDED or FAILED.
* If the restore job is not cached, it is a no-op.
*
* @param jobId id of the restore job
*/
void removeJobInternal(UUID jobId)
{
managerGroup.values().forEach(manager -> manager.removeJobInternal(jobId));
}
/**
* Similar to {@link RestoreJobManager#updateRestoreJob(RestoreJob)}.
* Update the restore job for each instance.
* It should only be called by the background task, when it discovers the job is
* in the CREATED job status.
*
* @param restoreJob restore job to update
*/
void updateRestoreJob(RestoreJob restoreJob)
{
if (restoreJob.status != RestoreJobStatus.CREATED)
{
throw new IllegalStateException("Cannot update with a non-created restore job");
}
managerGroup.values().forEach(manager -> manager.updateRestoreJob(restoreJob));
}
/**
* @return the {@link RestoreJobManager} of the instance
*/
private RestoreJobManager getManager(InstanceMetadata instance)
{
return managerGroup.computeIfAbsent(instance.id(),
id -> new RestoreJobManager(restoreJobConfig, instance,
executorPools, restoreProcessor));
}
// Create RestoreJobManager instances eagerly
private void initializeManagers(InstancesConfig instancesConfig)
{
// todo: allow register listener for instances list changes in the instancesConfig?
instancesConfig.instances().forEach(this::getManager);
}
}