| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.eclipse.aether.connector.basic; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.net.URI; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.eclipse.aether.RepositorySystemSession; |
| import org.eclipse.aether.metadata.Metadata; |
| import org.eclipse.aether.repository.RemoteRepository; |
| import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource; |
| import org.eclipse.aether.spi.connector.ArtifactDownload; |
| import org.eclipse.aether.spi.connector.ArtifactTransfer; |
| import org.eclipse.aether.spi.connector.ArtifactUpload; |
| import org.eclipse.aether.spi.connector.MetadataDownload; |
| import org.eclipse.aether.spi.connector.MetadataTransfer; |
| import org.eclipse.aether.spi.connector.MetadataUpload; |
| import org.eclipse.aether.spi.connector.RepositoryConnector; |
| import org.eclipse.aether.spi.connector.Transfer; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy; |
| import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; |
| import org.eclipse.aether.spi.connector.layout.RepositoryLayout; |
| import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider; |
| import org.eclipse.aether.spi.connector.transport.GetTask; |
| import org.eclipse.aether.spi.connector.transport.PeekTask; |
| import org.eclipse.aether.spi.connector.transport.PutTask; |
| import org.eclipse.aether.spi.connector.transport.Transporter; |
| import org.eclipse.aether.spi.connector.transport.TransporterProvider; |
| import org.eclipse.aether.spi.io.ChecksumProcessor; |
| import org.eclipse.aether.transfer.ChecksumFailureException; |
| import org.eclipse.aether.transfer.NoRepositoryConnectorException; |
| import org.eclipse.aether.transfer.NoRepositoryLayoutException; |
| import org.eclipse.aether.transfer.NoTransporterException; |
| import org.eclipse.aether.transfer.TransferEvent; |
| import org.eclipse.aether.transfer.TransferResource; |
| import org.eclipse.aether.util.ConfigUtils; |
| import org.eclipse.aether.util.FileUtils; |
| import org.eclipse.aether.util.concurrency.ExecutorUtils; |
| import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS; |
| import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS; |
| |
| /** |
| * |
| */ |
| final class BasicRepositoryConnector implements RepositoryConnector { |
| private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class); |
| |
| private final Map<String, ProvidedChecksumsSource> providedChecksumsSources; |
| |
| private final ChecksumProcessor checksumProcessor; |
| |
| private final RemoteRepository repository; |
| |
| private final RepositorySystemSession session; |
| |
| private final Transporter transporter; |
| |
| private final RepositoryLayout layout; |
| |
| private final ChecksumPolicyProvider checksumPolicyProvider; |
| |
| private final int maxDownstreamThreads; |
| |
| private final int maxUpstreamThreads; |
| |
| private final boolean smartChecksums; |
| |
| private final boolean parallelPut; |
| |
| private final boolean persistedChecksums; |
| |
| private final ConcurrentHashMap<Boolean, Executor> executors; |
| |
| private final AtomicBoolean closed; |
| |
| BasicRepositoryConnector( |
| RepositorySystemSession session, |
| RemoteRepository repository, |
| TransporterProvider transporterProvider, |
| RepositoryLayoutProvider layoutProvider, |
| ChecksumPolicyProvider checksumPolicyProvider, |
| ChecksumProcessor checksumProcessor, |
| Map<String, ProvidedChecksumsSource> providedChecksumsSources) |
| throws NoRepositoryConnectorException { |
| try { |
| layout = layoutProvider.newRepositoryLayout(session, repository); |
| } catch (NoRepositoryLayoutException e) { |
| throw new NoRepositoryConnectorException(repository, e.getMessage(), e); |
| } |
| try { |
| transporter = transporterProvider.newTransporter(session, repository); |
| } catch (NoTransporterException e) { |
| throw new NoRepositoryConnectorException(repository, e.getMessage(), e); |
| } |
| this.checksumPolicyProvider = checksumPolicyProvider; |
| |
| this.session = session; |
| this.repository = repository; |
| this.checksumProcessor = checksumProcessor; |
| this.providedChecksumsSources = providedChecksumsSources; |
| this.executors = new ConcurrentHashMap<>(); |
| this.closed = new AtomicBoolean(false); |
| |
| maxUpstreamThreads = ExecutorUtils.threadCount( |
| session, |
| DEFAULT_THREADS, |
| CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(), |
| CONFIG_PROP_UPSTREAM_THREADS, |
| CONFIG_PROP_THREADS); |
| maxDownstreamThreads = ExecutorUtils.threadCount( |
| session, |
| DEFAULT_THREADS, |
| CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(), |
| CONFIG_PROP_DOWNSTREAM_THREADS, |
| CONFIG_PROP_THREADS); |
| smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS); |
| parallelPut = ConfigUtils.getBoolean( |
| session, |
| DEFAULT_PARALLEL_PUT, |
| CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), |
| CONFIG_PROP_PARALLEL_PUT); |
| persistedChecksums = |
| ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS); |
| } |
| |
| private Executor getExecutor(boolean downstream, int tasks) { |
| int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads; |
| if (maxThreads <= 1) { |
| return ExecutorUtils.DIRECT_EXECUTOR; |
| } |
| if (tasks <= 1) { |
| return ExecutorUtils.DIRECT_EXECUTOR; |
| } |
| return executors.computeIfAbsent( |
| downstream, |
| k -> ExecutorUtils.threadPool( |
| maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-')); |
| } |
| |
| @Override |
| public void close() { |
| if (closed.compareAndSet(false, true)) { |
| for (Executor executor : executors.values()) { |
| ExecutorUtils.shutdown(executor); |
| } |
| transporter.close(); |
| } |
| } |
| |
| private void failIfClosed() { |
| if (closed.get()) { |
| throw new IllegalStateException("connector already closed"); |
| } |
| } |
| |
| @Override |
| public void get( |
| Collection<? extends ArtifactDownload> artifactDownloads, |
| Collection<? extends MetadataDownload> metadataDownloads) { |
| failIfClosed(); |
| |
| Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads); |
| Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads); |
| |
| Executor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size()); |
| RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); |
| List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(); |
| |
| boolean first = true; |
| |
| for (MetadataDownload transfer : safeMetadataDownloads) { |
| URI location = layout.getLocation(transfer.getMetadata(), false); |
| |
| TransferResource resource = newTransferResource(location, transfer); |
| TransferEvent.Builder builder = newEventBuilder(resource, false, false); |
| MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder); |
| |
| ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource); |
| List<RepositoryLayout.ChecksumLocation> checksumLocations = null; |
| if (checksumPolicy != null) { |
| checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location); |
| } |
| |
| Runnable task = new GetTaskRunner( |
| location, |
| transfer.getPath(), |
| checksumPolicy, |
| checksumAlgorithmFactories, |
| checksumLocations, |
| null, |
| listener); |
| if (first) { |
| task.run(); |
| first = false; |
| } else { |
| executor.execute(errorForwarder.wrap(task)); |
| } |
| } |
| |
| for (ArtifactDownload transfer : safeArtifactDownloads) { |
| Map<String, String> providedChecksums = Collections.emptyMap(); |
| for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) { |
| Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums( |
| session, transfer, repository, checksumAlgorithmFactories); |
| |
| if (provided != null) { |
| providedChecksums = provided; |
| break; |
| } |
| } |
| |
| URI location = layout.getLocation(transfer.getArtifact(), false); |
| |
| TransferResource resource = newTransferResource(location, transfer); |
| TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck()); |
| ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder); |
| |
| Runnable task; |
| if (transfer.isExistenceCheck()) { |
| task = new PeekTaskRunner(location, listener); |
| } else { |
| ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource); |
| List<RepositoryLayout.ChecksumLocation> checksumLocations = null; |
| if (checksumPolicy != null) { |
| checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location); |
| } |
| |
| task = new GetTaskRunner( |
| location, |
| transfer.getPath(), |
| checksumPolicy, |
| checksumAlgorithmFactories, |
| checksumLocations, |
| providedChecksums, |
| listener); |
| } |
| if (first) { |
| task.run(); |
| first = false; |
| } else { |
| executor.execute(errorForwarder.wrap(task)); |
| } |
| } |
| |
| errorForwarder.await(); |
| } |
| |
| @Override |
| public void put( |
| Collection<? extends ArtifactUpload> artifactUploads, |
| Collection<? extends MetadataUpload> metadataUploads) { |
| failIfClosed(); |
| |
| Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads); |
| Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads); |
| |
| Executor executor = |
| getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1); |
| RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); |
| |
| boolean first = true; |
| |
| for (ArtifactUpload transfer : safeArtifactUploads) { |
| URI location = layout.getLocation(transfer.getArtifact(), true); |
| |
| TransferResource resource = newTransferResource(location, transfer); |
| TransferEvent.Builder builder = newEventBuilder(resource, true, false); |
| ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder); |
| |
| List<RepositoryLayout.ChecksumLocation> checksumLocations = |
| layout.getChecksumLocations(transfer.getArtifact(), true, location); |
| |
| Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener); |
| if (first) { |
| task.run(); |
| first = false; |
| } else { |
| executor.execute(errorForwarder.wrap(task)); |
| } |
| } |
| |
| errorForwarder.await(); // make sure all artifacts are PUT before we go with Metadata |
| |
| for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) { |
| for (MetadataUpload transfer : transferGroup) { |
| URI location = layout.getLocation(transfer.getMetadata(), true); |
| |
| TransferResource resource = newTransferResource(location, transfer); |
| TransferEvent.Builder builder = newEventBuilder(resource, true, false); |
| MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder); |
| |
| List<RepositoryLayout.ChecksumLocation> checksumLocations = |
| layout.getChecksumLocations(transfer.getMetadata(), true, location); |
| |
| Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener); |
| if (first) { |
| task.run(); |
| first = false; |
| } else { |
| executor.execute(errorForwarder.wrap(task)); |
| } |
| } |
| |
| errorForwarder.await(); // make sure each group is done before starting next group |
| } |
| } |
| |
| /** |
| * This method "groups" the Metadata to be uploaded by their level (version, artifact, group and root). This is MUST |
| * as clients consume metadata in opposite order (root, group, artifact, version), and hence, we must deploy and |
| * ensure (in case of parallel deploy) that all V level metadata is deployed before we start deploying A level, etc. |
| */ |
| private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) { |
| ArrayList<MetadataUpload> v = new ArrayList<>(); |
| ArrayList<MetadataUpload> a = new ArrayList<>(); |
| ArrayList<MetadataUpload> g = new ArrayList<>(); |
| ArrayList<MetadataUpload> r = new ArrayList<>(); |
| |
| for (MetadataUpload transfer : metadataUploads) { |
| Metadata metadata = transfer.getMetadata(); |
| if (!"".equals(metadata.getVersion())) { |
| v.add(transfer); |
| } else if (!"".equals(metadata.getArtifactId())) { |
| a.add(transfer); |
| } else if (!"".equals(metadata.getGroupId())) { |
| g.add(transfer); |
| } else { |
| r.add(transfer); |
| } |
| } |
| |
| List<List<MetadataUpload>> result = new ArrayList<>(4); |
| if (!v.isEmpty()) { |
| result.add(v); |
| } |
| if (!a.isEmpty()) { |
| result.add(a); |
| } |
| if (!g.isEmpty()) { |
| result.add(g); |
| } |
| if (!r.isEmpty()) { |
| result.add(r); |
| } |
| return result; |
| } |
| |
| private static <T> Collection<T> safe(Collection<T> items) { |
| return (items != null) ? items : Collections.emptyList(); |
| } |
| |
| private TransferResource newTransferResource(URI path, Transfer transfer) { |
| if (transfer instanceof ArtifactTransfer) { |
| ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer; |
| return new TransferResource( |
| repository.getId(), |
| repository.getUrl(), |
| path.toString(), |
| artifactTransfer.getPath(), |
| artifactTransfer.getArtifact(), |
| artifactTransfer.getTrace()); |
| } else if (transfer instanceof MetadataTransfer) { |
| MetadataTransfer metadataTransfer = (MetadataTransfer) transfer; |
| return new TransferResource( |
| repository.getId(), |
| repository.getUrl(), |
| path.toString(), |
| metadataTransfer.getPath(), |
| metadataTransfer.getMetadata(), |
| metadataTransfer.getTrace()); |
| } else { |
| throw new IllegalArgumentException("Accepting only artifact or metadata transfers"); |
| } |
| } |
| |
| private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) { |
| TransferEvent.Builder builder = new TransferEvent.Builder(session, resource); |
| if (upload) { |
| builder.setRequestType(TransferEvent.RequestType.PUT); |
| } else if (!peek) { |
| builder.setRequestType(TransferEvent.RequestType.GET); |
| } else { |
| builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE); |
| } |
| return builder; |
| } |
| |
| private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) { |
| return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy); |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(repository); |
| } |
| |
| abstract class TaskRunner implements Runnable { |
| |
| protected final URI path; |
| |
| protected final TransferTransportListener<?> listener; |
| |
| TaskRunner(URI path, TransferTransportListener<?> listener) { |
| this.path = path; |
| this.listener = listener; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| listener.transferInitiated(); |
| runTask(); |
| listener.transferSucceeded(); |
| } catch (Exception e) { |
| listener.transferFailed(e, transporter.classify(e)); |
| } |
| } |
| |
| protected abstract void runTask() throws Exception; |
| } |
| |
| class PeekTaskRunner extends TaskRunner { |
| |
| PeekTaskRunner(URI path, TransferTransportListener<?> listener) { |
| super(path, listener); |
| } |
| |
| @Override |
| protected void runTask() throws Exception { |
| transporter.peek(new PeekTask(path)); |
| } |
| } |
| |
| class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher { |
| |
| private final Path file; |
| |
| private final ChecksumValidator checksumValidator; |
| |
| GetTaskRunner( |
| URI path, |
| Path file, |
| ChecksumPolicy checksumPolicy, |
| List<ChecksumAlgorithmFactory> checksumAlgorithmFactories, |
| List<RepositoryLayout.ChecksumLocation> checksumLocations, |
| Map<String, String> providedChecksums, |
| TransferTransportListener<?> listener) { |
| super(path, listener); |
| this.file = requireNonNull(file, "destination file cannot be null"); |
| checksumValidator = new ChecksumValidator( |
| file, |
| checksumAlgorithmFactories, |
| checksumProcessor, |
| this, |
| checksumPolicy, |
| providedChecksums, |
| safe(checksumLocations)); |
| } |
| |
| @Override |
| public boolean fetchChecksum(URI remote, Path local) throws Exception { |
| try { |
| transporter.get(new GetTask(remote).setDataPath(local)); |
| } catch (Exception e) { |
| if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) { |
| return false; |
| } |
| throw e; |
| } |
| return true; |
| } |
| |
| @Override |
| protected void runTask() throws Exception { |
| try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file)) { |
| final Path tmp = tempFile.getPath(); |
| listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp)); |
| for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) { |
| GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener); |
| transporter.get(task); |
| try { |
| checksumValidator.validate( |
| listener.getChecksums(), smartChecksums ? task.getChecksums() : null); |
| break; |
| } catch (ChecksumFailureException e) { |
| boolean retry = trial < lastTrial && e.isRetryWorthy(); |
| if (!retry && !checksumValidator.handle(e)) { |
| throw e; |
| } |
| listener.transferCorrupted(e); |
| if (retry) { |
| checksumValidator.retry(); |
| } else { |
| break; |
| } |
| } |
| } |
| tempFile.move(); |
| if (persistedChecksums) { |
| checksumValidator.commit(); |
| } |
| } |
| } |
| } |
| |
| class PutTaskRunner extends TaskRunner { |
| |
| private final Path file; |
| |
| private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations; |
| |
| PutTaskRunner( |
| URI path, |
| Path file, |
| List<RepositoryLayout.ChecksumLocation> checksumLocations, |
| TransferTransportListener<?> listener) { |
| super(path, listener); |
| this.file = requireNonNull(file, "source file cannot be null"); |
| this.checksumLocations = safe(checksumLocations); |
| } |
| |
| @SuppressWarnings("checkstyle:innerassignment") |
| @Override |
| protected void runTask() throws Exception { |
| transporter.put(new PutTask(path).setDataPath(file).setListener(listener)); |
| uploadChecksums(file, null); |
| } |
| |
| /** |
| * @param path source |
| * @param bytes transformed data from file or {@code null} |
| */ |
| private void uploadChecksums(Path path, byte[] bytes) { |
| if (checksumLocations.isEmpty()) { |
| return; |
| } |
| try { |
| ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>(); |
| for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) { |
| algorithms.add(checksumLocation.getChecksumAlgorithmFactory()); |
| } |
| |
| Map<String, String> sumsByAlgo; |
| if (bytes != null) { |
| sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms); |
| } else { |
| sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms); |
| } |
| |
| for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) { |
| uploadChecksum( |
| checksumLocation.getLocation(), |
| sumsByAlgo.get(checksumLocation |
| .getChecksumAlgorithmFactory() |
| .getName())); |
| } |
| } catch (IOException e) { |
| LOGGER.warn("Failed to upload checksums for {}", file, e); |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| private void uploadChecksum(URI location, Object checksum) { |
| try { |
| if (checksum instanceof Exception) { |
| throw (Exception) checksum; |
| } |
| transporter.put(new PutTask(location).setDataString((String) checksum)); |
| } catch (Exception e) { |
| LOGGER.warn("Failed to upload checksum to {}", location, e); |
| } |
| } |
| } |
| } |