| package org.eclipse.aether.connector.basic; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import static java.util.Objects.requireNonNull; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.eclipse.aether.ConfigurationProperties; |
| import org.eclipse.aether.RepositorySystemSession; |
| import org.eclipse.aether.RequestTrace; |
| import org.eclipse.aether.repository.RemoteRepository; |
| import org.eclipse.aether.spi.connector.ArtifactDownload; |
| import org.eclipse.aether.spi.connector.ArtifactUpload; |
| import org.eclipse.aether.spi.connector.MetadataDownload; |
| import org.eclipse.aether.spi.connector.MetadataUpload; |
| import org.eclipse.aether.spi.connector.RepositoryConnector; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; |
| import org.eclipse.aether.spi.connector.layout.RepositoryLayout; |
| import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider; |
| import org.eclipse.aether.spi.connector.transport.GetTask; |
| import org.eclipse.aether.spi.connector.transport.PeekTask; |
| import org.eclipse.aether.spi.connector.transport.PutTask; |
| import org.eclipse.aether.spi.connector.transport.Transporter; |
| import org.eclipse.aether.spi.connector.transport.TransporterProvider; |
| import org.eclipse.aether.spi.io.FileProcessor; |
| import org.eclipse.aether.spi.log.Logger; |
| import org.eclipse.aether.transfer.ChecksumFailureException; |
| import org.eclipse.aether.transfer.NoRepositoryConnectorException; |
| import org.eclipse.aether.transfer.NoRepositoryLayoutException; |
| import org.eclipse.aether.transfer.NoTransporterException; |
| import org.eclipse.aether.transfer.TransferEvent; |
| import org.eclipse.aether.transfer.TransferResource; |
| import org.eclipse.aether.util.ChecksumUtils; |
| import org.eclipse.aether.util.ConfigUtils; |
| import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; |
| import org.eclipse.aether.util.concurrency.WorkerThreadFactory; |
| |
| /** |
| */ |
| final class BasicRepositoryConnector |
| implements RepositoryConnector |
| { |
| |
| private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads"; |
| |
| private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads"; |
| |
| private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold"; |
| |
| private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums"; |
| |
| private final Logger logger; |
| |
| private final FileProcessor fileProcessor; |
| |
| private final RemoteRepository repository; |
| |
| private final RepositorySystemSession session; |
| |
| private final Transporter transporter; |
| |
| private final RepositoryLayout layout; |
| |
| private final ChecksumPolicyProvider checksumPolicyProvider; |
| |
| private final PartialFile.Factory partialFileFactory; |
| |
| private final int maxThreads; |
| |
| private final boolean smartChecksums; |
| |
| private final boolean persistedChecksums; |
| |
| private Executor executor; |
| |
| private boolean closed; |
| |
| public BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository, |
| TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider, |
| ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor, |
| Logger logger ) |
| throws NoRepositoryConnectorException |
| { |
| try |
| { |
| layout = layoutProvider.newRepositoryLayout( session, repository ); |
| } |
| catch ( NoRepositoryLayoutException e ) |
| { |
| throw new NoRepositoryConnectorException( repository, e.getMessage(), e ); |
| } |
| try |
| { |
| transporter = transporterProvider.newTransporter( session, repository ); |
| } |
| catch ( NoTransporterException e ) |
| { |
| throw new NoRepositoryConnectorException( repository, e.getMessage(), e ); |
| } |
| this.checksumPolicyProvider = checksumPolicyProvider; |
| |
| this.session = session; |
| this.repository = repository; |
| this.fileProcessor = fileProcessor; |
| this.logger = logger; |
| |
| maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); |
| smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); |
| persistedChecksums = |
| ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS, |
| ConfigurationProperties.PERSISTED_CHECKSUMS ); |
| |
| boolean resumeDownloads = |
| ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME ); |
| long resumeThreshold = |
| ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(), |
| CONFIG_PROP_RESUME_THRESHOLD ); |
| int requestTimeout = |
| ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT, |
| ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(), |
| ConfigurationProperties.REQUEST_TIMEOUT ); |
| partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout, logger ); |
| } |
| |
| private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas ) |
| { |
| if ( maxThreads <= 1 ) |
| { |
| return DirectExecutor.INSTANCE; |
| } |
| int tasks = safe( artifacts ).size() + safe( metadatas ).size(); |
| if ( tasks <= 1 ) |
| { |
| return DirectExecutor.INSTANCE; |
| } |
| if ( executor == null ) |
| { |
| executor = |
| new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>(), |
| new WorkerThreadFactory( getClass().getSimpleName() + '-' |
| + repository.getHost() + '-' ) ); |
| } |
| return executor; |
| } |
| |
| @Override |
| protected void finalize() |
| throws Throwable |
| { |
| try |
| { |
| close(); |
| } |
| finally |
| { |
| super.finalize(); |
| } |
| } |
| |
| public void close() |
| { |
| if ( !closed ) |
| { |
| closed = true; |
| if ( executor instanceof ExecutorService ) |
| { |
| ( (ExecutorService) executor ).shutdown(); |
| } |
| transporter.close(); |
| } |
| } |
| |
| public void get( Collection<? extends ArtifactDownload> artifactDownloads, |
| Collection<? extends MetadataDownload> metadataDownloads ) |
| { |
| if ( closed ) |
| { |
| throw new IllegalStateException( "connector closed" ); |
| } |
| |
| Executor executor = getExecutor( artifactDownloads, metadataDownloads ); |
| RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); |
| |
| for ( MetadataDownload transfer : safe( metadataDownloads ) ) |
| { |
| URI location = layout.getLocation( transfer.getMetadata(), false ); |
| |
| TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() ); |
| TransferEvent.Builder builder = newEventBuilder( resource, false, false ); |
| MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder ); |
| |
| ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource ); |
| List<RepositoryLayout.Checksum> checksums = null; |
| if ( checksumPolicy != null ) |
| { |
| checksums = layout.getChecksums( transfer.getMetadata(), false, location ); |
| } |
| |
| Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener ); |
| executor.execute( errorForwarder.wrap( task ) ); |
| } |
| |
| for ( ArtifactDownload transfer : safe( artifactDownloads ) ) |
| { |
| URI location = layout.getLocation( transfer.getArtifact(), false ); |
| |
| TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() ); |
| TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() ); |
| ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder ); |
| |
| Runnable task; |
| if ( transfer.isExistenceCheck() ) |
| { |
| task = new PeekTaskRunner( location, listener ); |
| } |
| else |
| { |
| ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource ); |
| List<RepositoryLayout.Checksum> checksums = null; |
| if ( checksumPolicy != null ) |
| { |
| checksums = layout.getChecksums( transfer.getArtifact(), false, location ); |
| } |
| |
| task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener ); |
| } |
| executor.execute( errorForwarder.wrap( task ) ); |
| } |
| |
| errorForwarder.await(); |
| } |
| |
| public void put( Collection<? extends ArtifactUpload> artifactUploads, |
| Collection<? extends MetadataUpload> metadataUploads ) |
| { |
| if ( closed ) |
| { |
| throw new IllegalStateException( "connector closed" ); |
| } |
| |
| for ( ArtifactUpload transfer : safe( artifactUploads ) ) |
| { |
| URI location = layout.getLocation( transfer.getArtifact(), true ); |
| |
| TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() ); |
| TransferEvent.Builder builder = newEventBuilder( resource, true, false ); |
| ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder ); |
| |
| List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location ); |
| |
| Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener ); |
| task.run(); |
| } |
| |
| for ( MetadataUpload transfer : safe( metadataUploads ) ) |
| { |
| URI location = layout.getLocation( transfer.getMetadata(), true ); |
| |
| TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() ); |
| TransferEvent.Builder builder = newEventBuilder( resource, true, false ); |
| MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder ); |
| |
| List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location ); |
| |
| Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener ); |
| task.run(); |
| } |
| } |
| |
| private static <T> Collection<T> safe( Collection<T> items ) |
| { |
| return ( items != null ) ? items : Collections.<T>emptyList(); |
| } |
| |
| private TransferResource newTransferResource( URI path, File file, RequestTrace trace ) |
| { |
| return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace ); |
| } |
| |
| private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek ) |
| { |
| TransferEvent.Builder builder = new TransferEvent.Builder( session, resource ); |
| if ( upload ) |
| { |
| builder.setRequestType( TransferEvent.RequestType.PUT ); |
| } |
| else if ( !peek ) |
| { |
| builder.setRequestType( TransferEvent.RequestType.GET ); |
| } |
| else |
| { |
| builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE ); |
| } |
| return builder; |
| } |
| |
| private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource ) |
| { |
| return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy ); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.valueOf( repository ); |
| } |
| |
| abstract class TaskRunner |
| implements Runnable |
| { |
| |
| protected final URI path; |
| |
| protected final TransferTransportListener<?> listener; |
| |
| public TaskRunner( URI path, TransferTransportListener<?> listener ) |
| { |
| this.path = path; |
| this.listener = listener; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| listener.transferInitiated(); |
| runTask(); |
| listener.transferSucceeded(); |
| } |
| catch ( Exception e ) |
| { |
| listener.transferFailed( e, transporter.classify( e ) ); |
| } |
| } |
| |
| protected abstract void runTask() |
| throws Exception; |
| |
| } |
| |
| class PeekTaskRunner |
| extends TaskRunner |
| { |
| |
| public PeekTaskRunner( URI path, TransferTransportListener<?> listener ) |
| { |
| super( path, listener ); |
| } |
| |
| protected void runTask() |
| throws Exception |
| { |
| transporter.peek( new PeekTask( path ) ); |
| } |
| |
| } |
| |
| class GetTaskRunner |
| extends TaskRunner |
| implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher |
| { |
| |
| private final File file; |
| |
| private final ChecksumValidator checksumValidator; |
| |
| public GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy, |
| List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener ) |
| { |
| super( path, listener ); |
| this.file = requireNonNull( file, "destination file cannot be null" ); |
| checksumValidator = |
| new ChecksumValidator( logger, file, fileProcessor, this, checksumPolicy, safe( checksums ) ); |
| } |
| |
| public void checkRemoteAccess() |
| throws Exception |
| { |
| transporter.peek( new PeekTask( path ) ); |
| } |
| |
| public boolean fetchChecksum( URI remote, File local ) |
| throws Exception |
| { |
| try |
| { |
| transporter.get( new GetTask( remote ).setDataFile( local ) ); |
| } |
| catch ( Exception e ) |
| { |
| if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND ) |
| { |
| return false; |
| } |
| throw e; |
| } |
| return true; |
| } |
| |
| protected void runTask() |
| throws Exception |
| { |
| fileProcessor.mkdirs( file.getParentFile() ); |
| |
| PartialFile partFile = partialFileFactory.newInstance( file, this ); |
| if ( partFile == null ) |
| { |
| logger.debug( "Concurrent download of " + file + " just finished, skipping download" ); |
| return; |
| } |
| |
| try |
| { |
| File tmp = partFile.getFile(); |
| listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) ); |
| for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ ) |
| { |
| boolean resume = partFile.isResume() && trial <= firstTrial; |
| GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener ); |
| transporter.get( task ); |
| try |
| { |
| checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums() |
| : null ); |
| break; |
| } |
| catch ( ChecksumFailureException e ) |
| { |
| boolean retry = trial < lastTrial && e.isRetryWorthy(); |
| if ( !retry && !checksumValidator.handle( e ) ) |
| { |
| throw e; |
| } |
| listener.transferCorrupted( e ); |
| if ( retry ) |
| { |
| checksumValidator.retry(); |
| } |
| else |
| { |
| break; |
| } |
| } |
| } |
| fileProcessor.move( tmp, file ); |
| if ( persistedChecksums ) |
| { |
| checksumValidator.commit(); |
| } |
| } |
| finally |
| { |
| partFile.close(); |
| checksumValidator.close(); |
| } |
| } |
| |
| } |
| |
| class PutTaskRunner |
| extends TaskRunner |
| { |
| |
| private final File file; |
| |
| private final Collection<RepositoryLayout.Checksum> checksums; |
| |
| public PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums, |
| TransferTransportListener<?> listener ) |
| { |
| super( path, listener ); |
| this.file = requireNonNull( file, "source file cannot be null" ); |
| this.checksums = safe( checksums ); |
| } |
| |
| protected void runTask() |
| throws Exception |
| { |
| transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) ); |
| uploadChecksums( file, path ); |
| } |
| |
| private void uploadChecksums( File file, URI location ) |
| { |
| if ( checksums.isEmpty() ) |
| { |
| return; |
| } |
| try |
| { |
| Set<String> algos = new HashSet<String>(); |
| for ( RepositoryLayout.Checksum checksum : checksums ) |
| { |
| algos.add( checksum.getAlgorithm() ); |
| } |
| Map<String, Object> sumsByAlgo = ChecksumUtils.calc( file, algos ); |
| for ( RepositoryLayout.Checksum checksum : checksums ) |
| { |
| uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) ); |
| } |
| } |
| catch ( IOException e ) |
| { |
| String msg = "Failed to upload checksums for " + file + ": " + e.getMessage(); |
| if ( logger.isDebugEnabled() ) |
| { |
| logger.warn( msg, e ); |
| } |
| else |
| { |
| logger.warn( msg ); |
| } |
| } |
| } |
| |
| private void uploadChecksum( URI location, Object checksum ) |
| { |
| try |
| { |
| if ( checksum instanceof Exception ) |
| { |
| throw (Exception) checksum; |
| } |
| transporter.put( new PutTask( location ).setDataString( (String) checksum ) ); |
| } |
| catch ( Exception e ) |
| { |
| String msg = "Failed to upload checksum " + location + ": " + e.getMessage(); |
| if ( logger.isDebugEnabled() ) |
| { |
| logger.warn( msg, e ); |
| } |
| else |
| { |
| logger.warn( msg ); |
| } |
| } |
| } |
| |
| } |
| |
| private static class DirectExecutor |
| implements Executor |
| { |
| |
| static final Executor INSTANCE = new DirectExecutor(); |
| |
| public void execute( Runnable command ) |
| { |
| command.run(); |
| } |
| |
| } |
| |
| } |