blob: 7152f0a4926bc3fdd522167371252758491cb517 [file] [log] [blame]
package org.apache.archiva.scheduler.repository;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.archiva.common.ArchivaException;
import org.apache.archiva.configuration.ArchivaConfiguration;
import org.apache.archiva.configuration.ConfigurationEvent;
import org.apache.archiva.configuration.ConfigurationListener;
import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
import org.apache.archiva.metadata.repository.MetadataRepository;
import org.apache.archiva.metadata.repository.MetadataRepositoryException;
import org.apache.archiva.metadata.repository.RepositorySession;
import org.apache.archiva.metadata.repository.RepositorySessionFactory;
import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
import org.apache.archiva.redback.components.scheduler.Scheduler;
import org.apache.archiva.components.taskqueue.TaskQueue;
import org.apache.archiva.components.taskqueue.TaskQueueException;
import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
import org.apache.archiva.scheduler.repository.model.RepositoryTask;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Default implementation of a scheduling component for archiva.
*/
@Service( "archivaTaskScheduler#repository" )
public class DefaultRepositoryArchivaTaskScheduler
implements RepositoryArchivaTaskScheduler, ConfigurationListener
{
private Logger log = LoggerFactory.getLogger( getClass() );
@Inject
private Scheduler scheduler;
@Inject
private CronExpressionValidator cronValidator;
@Inject
@Named( value = "taskQueue#repository-scanning" )
private TaskQueue<RepositoryTask> repositoryScanningQueue;
@Inject
private ArchivaConfiguration archivaConfiguration;
@Inject
@Named( value = "repositoryStatisticsManager#default" )
private RepositoryStatisticsManager repositoryStatisticsManager;
/**
* TODO: could have multiple implementations
*/
@Inject
private RepositorySessionFactory repositorySessionFactory;
private static final String REPOSITORY_SCAN_GROUP = "rg";
private static final String REPOSITORY_JOB = "rj";
private static final String REPOSITORY_JOB_TRIGGER = "rjt";
static final String TASK_QUEUE = "TASK_QUEUE";
static final String TASK_REPOSITORY = "TASK_REPOSITORY";
public static final String CRON_HOURLY = "0 0 * * * ?";
private Set<String> jobs = new HashSet<>();
private List<String> queuedRepos = new ArrayList<>();
@PostConstruct
public void startup()
throws ArchivaException
{
StopWatch stopWatch = new StopWatch();
stopWatch.start();
archivaConfiguration.addListener( this );
List<ManagedRepositoryConfiguration> repositories =
archivaConfiguration.getConfiguration().getManagedRepositories();
RepositorySession repositorySession = null;
try
{
repositorySession = repositorySessionFactory.createSession();
}
catch ( MetadataRepositoryException e )
{
e.printStackTrace( );
}
try
{
MetadataRepository metadataRepository = repositorySession.getRepository();
for ( ManagedRepositoryConfiguration repoConfig : repositories )
{
if ( repoConfig.isScanned() )
{
try
{
scheduleRepositoryJobs( repoConfig );
}
catch ( SchedulerException e )
{
throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
}
try
{
if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
{
queueInitialRepoScan( repoConfig );
}
}
catch ( MetadataRepositoryException e )
{
log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
e.getMessage(), e );
}
}
}
}
finally
{
repositorySession.close();
}
stopWatch.stop();
log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
}
@PreDestroy
public void stop()
throws SchedulerException
{
for ( String job : jobs )
{
scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
}
jobs.clear();
queuedRepos.clear();
}
@SuppressWarnings( "unchecked" )
@Override
public boolean isProcessingRepositoryTask( String repositoryId )
{
synchronized ( repositoryScanningQueue )
{
List<RepositoryTask> queue = null;
try
{
queue = repositoryScanningQueue.getQueueSnapshot();
}
catch ( TaskQueueException e )
{
// not possible with plexus-taskqueue implementation, ignore
}
for ( RepositoryTask queuedTask : queue )
{
if ( queuedTask.getRepositoryId().equals( repositoryId ) )
{
return true;
}
}
return false;
}
}
@Override
public boolean isProcessingRepositoryTask( RepositoryTask task )
{
synchronized ( repositoryScanningQueue )
{
List<RepositoryTask> queue = null;
try
{
queue = repositoryScanningQueue.getQueueSnapshot();
}
catch ( TaskQueueException e )
{
// not possible with plexus-taskqueue implementation, ignore
}
for ( RepositoryTask queuedTask : queue )
{
if ( task.equals( queuedTask ) )
{
return true;
}
}
return false;
}
}
@Override
public void queueTask( RepositoryTask task )
throws TaskQueueException
{
synchronized ( repositoryScanningQueue )
{
if ( isProcessingRepositoryTask( task ) )
{
log.debug( "Repository task '{}' is already queued. Skipping task.", task );
}
else
{
// add check if the task is already queued if it is a file scan
repositoryScanningQueue.put( task );
}
}
}
@Override
public boolean unQueueTask( RepositoryTask task )
throws TaskQueueException
{
synchronized ( repositoryScanningQueue )
{
if ( !isProcessingRepositoryTask( task ) )
{
log.info( "cannot unqueue Repository task '{}' not already queued.", task );
return false;
}
else
{
return repositoryScanningQueue.remove( task );
}
}
}
@Override
public void configurationEvent( ConfigurationEvent event )
{
if ( event.getType() == ConfigurationEvent.SAVED )
{
for ( String job : jobs )
{
try
{
scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
}
catch ( SchedulerException e )
{
log.error( "Error restarting the repository scanning job after property change." );
}
}
jobs.clear();
List<ManagedRepositoryConfiguration> repositories =
archivaConfiguration.getConfiguration().getManagedRepositories();
for ( ManagedRepositoryConfiguration repoConfig : repositories )
{
if ( repoConfig.getRefreshCronExpression() != null )
{
try
{
scheduleRepositoryJobs( repoConfig );
}
catch ( SchedulerException e )
{
log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
}
}
}
}
}
private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
MetadataRepository metadataRepository )
throws MetadataRepositoryException
{
long start = System.currentTimeMillis();
boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
long end = System.currentTimeMillis();
log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
return res;
}
// MRM-848: Pre-configured repository initially appear to be empty
private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
{
String repoId = repoConfig.getId();
RepositoryTask task = new RepositoryTask();
task.setRepositoryId( repoId );
if ( !queuedRepos.contains( repoId ) )
{
log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
try
{
queuedRepos.add( repoConfig.getId() );
this.queueTask( task );
}
catch ( TaskQueueException e )
{
log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
}
}
}
private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
throws SchedulerException
{
if ( repoConfig.getRefreshCronExpression() == null )
{
log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
return;
}
if ( !repoConfig.isScanned() )
{
log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
return;
}
// get the cron string for these database scanning jobs
String cronString = repoConfig.getRefreshCronExpression();
if ( !cronValidator.validate( cronString ) )
{
log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString,
repoConfig.getId() );
cronString = CRON_HOURLY;
}
JobDataMap jobDataMap = new JobDataMap( );
jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
// setup the unprocessed artifact job
JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
.withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
.setJobData( jobDataMap )
.build();
try
{
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
.withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
.build();
jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
scheduler.scheduleJob( repositoryJob, trigger );
}
catch ( RuntimeException e )
{
log.error(
"ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
repoConfig.getId(), e.getMessage() );
}
}
}