blob: 2f865ff9e075f9d80dce5efbe70f39e5a451e9c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.restore;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.LocalDate;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
import org.apache.cassandra.sidecar.locator.TokenRange;
import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
/**
* {@link RestoreJobDiscoverer} handles background restore job discovery and handling it according to job status
*/
@Singleton
public class RestoreJobDiscoverer implements PeriodicTask
{
private static final Logger LOGGER = LoggerFactory.getLogger(RestoreJobDiscoverer.class);
private final RestoreJobConfiguration restoreJobConfig;
private final SidecarSchema sidecarSchema;
private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
private final RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor;
private final Provider<RestoreJobManagerGroup> restoreJobManagerGroupSingleton;
private final LocalTokenRangesProvider localTokenRangesProvider;
private final InstanceMetadataFetcher instanceMetadataFetcher;
private final RestoreMetrics metrics;
private final JobIdsByDay jobIdsByDay;
private volatile boolean refreshSignaled = true;
private int inflightJobsCount = 0;
private int jobDiscoveryRecencyDays;
private PeriodicTaskExecutor periodicTaskExecutor;
@Inject
public RestoreJobDiscoverer(SidecarConfiguration config,
SidecarSchema sidecarSchema,
RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor,
Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider,
CachedLocalTokenRanges cachedLocalTokenRanges,
InstanceMetadataFetcher instanceMetadataFetcher,
SidecarMetrics metrics)
{
this(config.restoreJobConfiguration(),
sidecarSchema,
restoreJobDatabaseAccessor,
restoreSliceDatabaseAccessor,
restoreJobManagerGroupProvider,
cachedLocalTokenRanges,
instanceMetadataFetcher,
metrics);
}
@VisibleForTesting
RestoreJobDiscoverer(RestoreJobConfiguration restoreJobConfig,
SidecarSchema sidecarSchema,
RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor,
Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider,
LocalTokenRangesProvider cachedLocalTokenRanges,
InstanceMetadataFetcher instanceMetadataFetcher,
SidecarMetrics metrics)
{
this.restoreJobConfig = restoreJobConfig;
this.sidecarSchema = sidecarSchema;
this.restoreJobDatabaseAccessor = restoreJobDatabaseAccessor;
this.restoreSliceDatabaseAccessor = restoreSliceDatabaseAccessor;
this.jobDiscoveryRecencyDays = restoreJobConfig.jobDiscoveryRecencyDays();
this.restoreJobManagerGroupSingleton = restoreJobManagerGroupProvider;
this.localTokenRangesProvider = cachedLocalTokenRanges;
this.instanceMetadataFetcher = instanceMetadataFetcher;
this.metrics = metrics.server().restore();
this.jobIdsByDay = new JobIdsByDay();
}
@Override
public boolean shouldSkip()
{
boolean shouldSkip = !sidecarSchema.isInitialized();
if (shouldSkip)
{
LOGGER.trace("Skipping restore job discovering");
}
// skip the task when sidecar schema is not initialized
return shouldSkip;
}
@Override
public long delay()
{
// delay value is re-evaluated when rescheduling
return hasInflightJobs()
? restoreJobConfig.jobDiscoveryActiveLoopDelayMillis()
: restoreJobConfig.jobDiscoveryIdleLoopDelayMillis();
}
@Override
public void execute(Promise<Void> promise)
{
executeInternal();
promise.tryComplete();
}
public void executeInternal()
{
Preconditions.checkState(periodicTaskExecutor != null, "Loop executor is not registered");
// Log one message every few minutes should be acceptable
LOGGER.info("Discovering restore jobs. " +
"refreshSignaled={} inflightJobsCount={} delayMs={} jobDiscoveryRecencyDays={}",
refreshSignaled, inflightJobsCount, delay(), jobDiscoveryRecencyDays);
boolean hasInflightJobBefore = hasInflightJobs();
// reset in-flight jobs
inflightJobsCount = 0;
List<RestoreJob> restoreJobs = restoreJobDatabaseAccessor.findAllRecent(jobDiscoveryRecencyDays);
long nowMillis = System.currentTimeMillis();
LocalDate today = LocalDate.fromMillisSinceEpoch(nowMillis);
RestoreJobManagerGroup restoreJobManagers = restoreJobManagerGroupSingleton.get();
int days = 0;
int abortedJobs = 0;
int expiredJobs = 0;
for (RestoreJob job : restoreJobs)
{
if (jobIdsByDay.shouldLogJob(job))
{
LOGGER.info("Found job. jobId={} job={}", job.jobId, job);
}
try
{
switch (job.status)
{
case CREATED:
case STAGED:
if (job.expireAt == null // abort all old jobs that has no expireAt value
|| job.expireAt.getTime() < nowMillis)
{
expiredJobs += 1;
boolean aborted = abortJob(job);
abortedJobs += (aborted ? 1 : 0);
break; // do not proceed further if the job has expired
}
// find the oldest non-completed job
days = Math.max(days, delta(today, job.createdAt));
restoreJobManagers.updateRestoreJob(job);
if (job.isManagedBySidecar())
{
// todo: potential exceedingly number of queries
findSlicesAndSubmit(job);
}
inflightJobsCount += 1;
break;
case FAILED:
case ABORTED:
case SUCCEEDED:
restoreJobManagers.removeJobInternal(job.jobId);
break;
default:
LOGGER.warn("Encountered unknown job status. jobId={} status={}", job.jobId, job.status);
}
}
catch (Exception exception) // do not fail on the job. Continue to drain the entire list
{
LOGGER.warn("Exception on processing job. jobId: {}", job.jobId, exception);
}
}
jobIdsByDay.cleanupMaybe();
// shrink recency to lookup less data next time
jobDiscoveryRecencyDays = Math.min(days, jobDiscoveryRecencyDays);
// reset the external refresh signal if any
refreshSignaled = false;
LOGGER.info("Exit job discovery. " +
"refreshSignaled={} " +
"inflightJobsCount={} " +
"jobDiscoveryRecencyDays={} " +
"expiredJobs={} " +
"abortedJobs={}",
refreshSignaled, inflightJobsCount, jobDiscoveryRecencyDays, expiredJobs, abortedJobs);
metrics.activeJobs.metric.setValue(inflightJobsCount);
boolean hasInflightJobsNow = hasInflightJobs();
// need to update delay time; reschedule self
if (hasInflightJobBefore != hasInflightJobsNow)
{
periodicTaskExecutor.reschedule(this);
}
}
@Override
public void registerPeriodicTaskExecutor(PeriodicTaskExecutor executor)
{
this.periodicTaskExecutor = executor;
}
/**
* Signal the job discovery loop to refresh in the next execution
*/
public void signalRefresh()
{
refreshSignaled = true;
}
// find all slices of the job that should be downloaded to the local instances,
// according to the cluster token ownership
private void findSlicesAndSubmit(RestoreJob restoreJob)
{
localTokenRangesProvider.localTokenRanges(restoreJob.keyspaceName)
.forEach((key, ranges) -> {
int instanceId = key;
InstanceMetadata instance = instanceMetadataFetcher.instance(instanceId);
ranges.forEach(range -> findSlicesOfRangeAndSubmit(instance, restoreJob, range));
});
}
// try to submit the slice.
// If it is already exist, it is a no-op.
// If the submission fails, the slice status of the instance is updated.
private void findSlicesOfRangeAndSubmit(InstanceMetadata instance, RestoreJob restoreJob, TokenRange range)
{
short bucketId = 0; // TODO: update the implementation to pick proper bucketId
restoreSliceDatabaseAccessor
.selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range.start, range.end)
.forEach(slice -> {
// set the owner instance, which is not read from database
slice = slice.unbuild().ownerInstance(instance).build();
try
{
// todo: do not re-submit for download if the slice is staged (when job status is before staged)
// or imported (when job status is staged) on the instance already
restoreJobManagerGroupSingleton.get().trySubmit(instance, slice, restoreJob);
}
catch (RestoreJobFatalException e)
{
slice.fail(e); // TODO: is it still needed? no, remove it later.
slice.failAtInstance(instance.id());
restoreSliceDatabaseAccessor.updateStatus(slice);
}
});
}
private boolean abortJob(RestoreJob job)
{
LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job);
try
{
restoreJobDatabaseAccessor.abort(job.jobId);
return true;
}
catch (Exception exception) // do not fail on the job. Continue to drain the entire list
{
LOGGER.warn("Exception on aborting job. jobId: " + job.jobId, exception);
}
return false;
}
// get the number of days delta between 2 dates. Always return non-negative values
private int delta(LocalDate date1, LocalDate date2)
{
return Math.abs(date1.getDaysSinceEpoch() - date2.getDaysSinceEpoch());
}
static class JobIdsByDay
{
private final Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay = new HashMap<>();
private final Set<Integer> discoveredDays = new HashSet<>(); // contains the days of the jobs seen from the current round of discovery
/**
* Log the jobs when any of the condition is met:
* - newly discovered
* - in CREATED status
* - status changed
*
* @return true to log the job
*/
boolean shouldLogJob(RestoreJob job)
{
int day = job.createdAt.getDaysSinceEpoch();
discoveredDays.add(day);
Map<UUID, RestoreJobStatus> jobs = jobsByDay.computeIfAbsent(day, key -> new HashMap<>());
RestoreJobStatus oldStatus = jobs.put(job.jobId, job.status);
return oldStatus == null || job.status == RestoreJobStatus.CREATED || oldStatus != job.status;
}
void cleanupMaybe()
{
// remove all the jobIds of the days that are not discovered
jobsByDay.keySet().removeIf(day -> !discoveredDays.contains(day));
discoveredDays.clear();
}
@VisibleForTesting
Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay()
{
return jobsByDay;
}
}
@VisibleForTesting
boolean hasInflightJobs()
{
return inflightJobsCount != 0;
}
@VisibleForTesting
int jobDiscoveryRecencyDays()
{
return jobDiscoveryRecencyDays;
}
@VisibleForTesting
boolean isRefreshSignaled()
{
return refreshSignaled;
}
}