| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.registry.service; |
| |
| import org.apache.commons.lang3.ObjectUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.Validate; |
| import org.apache.nifi.registry.bucket.Bucket; |
| import org.apache.nifi.registry.bucket.BucketItem; |
| import org.apache.nifi.registry.db.entity.BucketEntity; |
| import org.apache.nifi.registry.db.entity.BucketItemEntity; |
| import org.apache.nifi.registry.db.entity.BundleEntity; |
| import org.apache.nifi.registry.db.entity.FlowEntity; |
| import org.apache.nifi.registry.db.entity.FlowSnapshotEntity; |
| import org.apache.nifi.registry.diff.ComponentDifferenceGroup; |
| import org.apache.nifi.registry.diff.VersionedFlowDifference; |
| import org.apache.nifi.registry.exception.ResourceNotFoundException; |
| import org.apache.nifi.registry.extension.BundleCoordinate; |
| import org.apache.nifi.registry.extension.BundlePersistenceProvider; |
| import org.apache.nifi.registry.extension.bundle.Bundle; |
| import org.apache.nifi.registry.extension.bundle.BundleFilterParams; |
| import org.apache.nifi.registry.extension.bundle.BundleType; |
| import org.apache.nifi.registry.extension.bundle.BundleVersion; |
| import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams; |
| import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata; |
| import org.apache.nifi.registry.extension.component.ExtensionFilterParams; |
| import org.apache.nifi.registry.extension.component.ExtensionMetadata; |
| import org.apache.nifi.registry.extension.component.TagCount; |
| import org.apache.nifi.registry.extension.component.manifest.Extension; |
| import org.apache.nifi.registry.extension.component.manifest.ProvidedServiceAPI; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary; |
| import org.apache.nifi.registry.flow.FlowPersistenceProvider; |
| import org.apache.nifi.registry.flow.FlowSnapshotContext; |
| import org.apache.nifi.registry.flow.VersionedComponent; |
| import org.apache.nifi.registry.flow.VersionedFlow; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshot; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; |
| import org.apache.nifi.registry.flow.VersionedProcessGroup; |
| import org.apache.nifi.registry.flow.diff.ComparableDataFlow; |
| import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; |
| import org.apache.nifi.registry.flow.diff.FlowComparator; |
| import org.apache.nifi.registry.flow.diff.FlowComparison; |
| import org.apache.nifi.registry.flow.diff.FlowDifference; |
| import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; |
| import org.apache.nifi.registry.flow.diff.StandardFlowComparator; |
| import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate; |
| import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; |
| import org.apache.nifi.registry.serialization.FlowContent; |
| import org.apache.nifi.registry.serialization.FlowContentSerializer; |
| import org.apache.nifi.registry.service.alias.RegistryUrlAliasService; |
| import org.apache.nifi.registry.service.extension.ExtensionService; |
| import org.apache.nifi.registry.service.mapper.BucketMappings; |
| import org.apache.nifi.registry.service.mapper.ExtensionMappings; |
| import org.apache.nifi.registry.service.mapper.FlowMappings; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import javax.validation.ConstraintViolation; |
| import javax.validation.ConstraintViolationException; |
| import javax.validation.Validator; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Main service for all back-end operations, REST resources should only interact with this service. |
| * |
| * This service is marked as @Transactional so that Spring will automatically start a transaction upon entering |
| * any method, and will rollback the transaction if any Exception is thrown out of a method. |
| * |
| */ |
| @Service |
| @Transactional(rollbackFor = Exception.class) |
| public class RegistryService { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(RegistryService.class); |
| |
| private final MetadataService metadataService; |
| private final FlowPersistenceProvider flowPersistenceProvider; |
| private final BundlePersistenceProvider bundlePersistenceProvider; |
| private final FlowContentSerializer flowContentSerializer; |
| private final ExtensionService extensionService; |
| private final Validator validator; |
| private final RegistryUrlAliasService registryUrlAliasService; |
| |
| private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final Lock readLock = lock.readLock(); |
| private final Lock writeLock = lock.writeLock(); |
| |
| @Autowired |
| public RegistryService(final MetadataService metadataService, |
| final FlowPersistenceProvider flowPersistenceProvider, |
| final BundlePersistenceProvider bundlePersistenceProvider, |
| final FlowContentSerializer flowContentSerializer, |
| final ExtensionService extensionService, |
| final Validator validator, |
| final RegistryUrlAliasService registryUrlAliasService) { |
| this.metadataService = Validate.notNull(metadataService); |
| this.flowPersistenceProvider = Validate.notNull(flowPersistenceProvider); |
| this.bundlePersistenceProvider = Validate.notNull(bundlePersistenceProvider); |
| this.flowContentSerializer = Validate.notNull(flowContentSerializer); |
| this.extensionService = Validate.notNull(extensionService); |
| this.validator = Validate.notNull(validator); |
| this.registryUrlAliasService = Validate.notNull(registryUrlAliasService); |
| } |
| |
| private <T> void validate(T t, String invalidMessage) { |
| final Set<ConstraintViolation<T>> violations = validator.validate(t); |
| if (violations.size() > 0) { |
| throw new ConstraintViolationException(invalidMessage, violations); |
| } |
| } |
| |
| // ---------------------- Bucket methods --------------------------------------------- |
| |
| public Bucket createBucket(final Bucket bucket) { |
| if (bucket == null) { |
| throw new IllegalArgumentException("Bucket cannot be null"); |
| } |
| |
| // set an id, the created time, and clear out the flows since its read-only |
| bucket.setIdentifier(UUID.randomUUID().toString()); |
| bucket.setCreatedTimestamp(System.currentTimeMillis()); |
| |
| if (bucket.isAllowBundleRedeploy() == null) { |
| bucket.setAllowBundleRedeploy(false); |
| } |
| |
| if (bucket.isAllowPublicRead() == null) { |
| bucket.setAllowPublicRead(false); |
| } |
| |
| validate(bucket, "Cannot create Bucket"); |
| |
| writeLock.lock(); |
| try { |
| final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName()); |
| if (bucketsWithSameName.size() > 0) { |
| throw new IllegalStateException("A bucket with the same name already exists"); |
| } |
| |
| final BucketEntity createdBucket = metadataService.createBucket(BucketMappings.map(bucket)); |
| return BucketMappings.map(createdBucket); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public Bucket getBucket(final String bucketIdentifier) { |
| if (bucketIdentifier == null) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null"); |
| } |
| |
| readLock.lock(); |
| try { |
| final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier); |
| if (bucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| return BucketMappings.map(bucket); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public Bucket getBucketByName(final String bucketName) { |
| if (bucketName == null) { |
| throw new IllegalArgumentException("Bucket name cannot be null"); |
| } |
| |
| readLock.lock(); |
| try { |
| final List<BucketEntity> buckets = metadataService.getBucketsByName(bucketName); |
| if (buckets.isEmpty()) { |
| LOGGER.warn("The specified bucket name [{}] does not exist.", bucketName); |
| throw new ResourceNotFoundException("The specified bucket name does not exist in this registry."); |
| } |
| |
| return BucketMappings.map(buckets.get(0)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public List<Bucket> getBuckets() { |
| readLock.lock(); |
| try { |
| final List<BucketEntity> buckets = metadataService.getAllBuckets(); |
| return buckets.stream().map(b -> BucketMappings.map(b)).collect(Collectors.toList()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public List<Bucket> getBuckets(final Set<String> bucketIds) { |
| readLock.lock(); |
| try { |
| final List<BucketEntity> buckets = metadataService.getBuckets(bucketIds); |
| return buckets.stream().map(b -> BucketMappings.map(b)).collect(Collectors.toList()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public Bucket updateBucket(final Bucket bucket) { |
| if (bucket == null) { |
| throw new IllegalArgumentException("Bucket cannot be null"); |
| } |
| |
| if (bucket.getIdentifier() == null) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null"); |
| } |
| |
| if (bucket.getName() != null && StringUtils.isBlank(bucket.getName())) { |
| throw new IllegalArgumentException("Bucket name cannot be blank"); |
| } |
| |
| writeLock.lock(); |
| try { |
| // ensure a bucket with the given id exists |
| final BucketEntity existingBucketById = metadataService.getBucketById(bucket.getIdentifier()); |
| if (existingBucketById == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucket.getIdentifier()); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure a different bucket with the same name does not exist |
| // since we're allowing partial updates here, only check this if a non-null name is provided |
| if (StringUtils.isNotBlank(bucket.getName())) { |
| final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName()); |
| if (bucketsWithSameName != null) { |
| for (final BucketEntity bucketWithSameName : bucketsWithSameName) { |
| if (!bucketWithSameName.getId().equals(existingBucketById.getId())){ |
| throw new IllegalStateException("A bucket with the same name already exists - " + bucket.getName()); |
| } |
| } |
| } |
| } |
| |
| // transfer over the new values to the existing bucket |
| if (StringUtils.isNotBlank(bucket.getName())) { |
| existingBucketById.setName(bucket.getName()); |
| } |
| |
| if (bucket.getDescription() != null) { |
| existingBucketById.setDescription(bucket.getDescription()); |
| } |
| |
| if (bucket.isAllowBundleRedeploy() != null) { |
| existingBucketById.setAllowExtensionBundleRedeploy(bucket.isAllowBundleRedeploy()); |
| } |
| |
| if (bucket.isAllowPublicRead() != null) { |
| existingBucketById.setAllowPublicRead(bucket.isAllowPublicRead()); |
| } |
| |
| // perform the actual update |
| final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById); |
| return BucketMappings.map(updatedBucket); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public Bucket deleteBucket(final String bucketIdentifier) { |
| if (bucketIdentifier == null) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null"); |
| } |
| |
| writeLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // for each flow in the bucket, delete all snapshots from the flow persistence provider |
| for (final FlowEntity flowEntity : metadataService.getFlowsByBucket(existingBucket.getId())) { |
| flowPersistenceProvider.deleteAllFlowContent(bucketIdentifier, flowEntity.getId()); |
| } |
| |
| // for each bundle in the bucket, delete all versions from the bundle persistence provider |
| for (final BundleEntity bundleEntity : metadataService.getBundlesByBucket(existingBucket.getId())) { |
| final BundleCoordinate bundleCoordinate = new StandardBundleCoordinate.Builder() |
| .bucketId(bundleEntity.getBucketId()) |
| .groupId(bundleEntity.getGroupId()) |
| .artifactId(bundleEntity.getArtifactId()) |
| .build(); |
| bundlePersistenceProvider.deleteAllBundleVersions(bundleCoordinate); |
| } |
| |
| // now delete the bucket from the metadata provider, which deletes all flows referencing it |
| metadataService.deleteBucket(existingBucket); |
| |
| return BucketMappings.map(existingBucket); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| // ---------------------- BucketItem methods --------------------------------------------- |
| |
| public List<BucketItem> getBucketItems(final String bucketIdentifier) { |
| if (bucketIdentifier == null) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null"); |
| } |
| |
| readLock.lock(); |
| try { |
| final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier); |
| if (bucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| final List<BucketItem> bucketItems = new ArrayList<>(); |
| metadataService.getBucketItems(bucket.getId()).stream().forEach(b -> addBucketItem(bucketItems, b)); |
| return bucketItems; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public List<BucketItem> getBucketItems(final Set<String> bucketIdentifiers) { |
| if (bucketIdentifiers == null || bucketIdentifiers.isEmpty()) { |
| throw new IllegalArgumentException("Bucket identifiers cannot be null or empty"); |
| } |
| |
| readLock.lock(); |
| try { |
| final List<BucketItem> bucketItems = new ArrayList<>(); |
| metadataService.getBucketItems(bucketIdentifiers).stream().forEach(b -> addBucketItem(bucketItems, b)); |
| return bucketItems; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private void addBucketItem(final List<BucketItem> bucketItems, final BucketItemEntity itemEntity) { |
| // Currently we don't populate the bucket name for items so we pass in null in the map methods |
| if (itemEntity instanceof FlowEntity) { |
| final FlowEntity flowEntity = (FlowEntity) itemEntity; |
| bucketItems.add(FlowMappings.map(null, flowEntity)); |
| } else if (itemEntity instanceof BundleEntity) { |
| final BundleEntity bundleEntity = (BundleEntity) itemEntity; |
| bucketItems.add(ExtensionMappings.map(null, bundleEntity)); |
| } else { |
| LOGGER.error("Unknown type of BucketItemEntity: " + itemEntity.getClass().getCanonicalName()); |
| } |
| } |
| |
| // ---------------------- VersionedFlow methods --------------------------------------------- |
| |
| public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (versionedFlow == null) { |
| throw new IllegalArgumentException("Versioned flow cannot be null"); |
| } |
| |
| if (versionedFlow.getBucketIdentifier() != null && !bucketIdentifier.equals(versionedFlow.getBucketIdentifier())) { |
| throw new IllegalArgumentException("Bucket identifiers must match"); |
| } |
| |
| if (versionedFlow.getBucketIdentifier() == null) { |
| versionedFlow.setBucketIdentifier(bucketIdentifier); |
| } |
| |
| versionedFlow.setIdentifier(UUID.randomUUID().toString()); |
| |
| final long timestamp = System.currentTimeMillis(); |
| versionedFlow.setCreatedTimestamp(timestamp); |
| versionedFlow.setModifiedTimestamp(timestamp); |
| |
| validate(versionedFlow, "Cannot create versioned flow"); |
| |
| writeLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure another flow with the same name doesn't exist |
| final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName()); |
| if (flowsWithSameName != null && flowsWithSameName.size() > 0) { |
| throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket"); |
| } |
| |
| // convert from dto to entity and set the bucket relationship |
| final FlowEntity flowEntity = FlowMappings.map(versionedFlow); |
| flowEntity.setBucketId(existingBucket.getId()); |
| |
| // persist the flow and return the created entity |
| final FlowEntity createdFlow = metadataService.createFlow(flowEntity); |
| return FlowMappings.map(existingBucket, createdFlow); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public VersionedFlow getFlow(final String bucketIdentifier, final String flowIdentifier) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| return FlowMappings.map(existingBucket, existingFlow); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VersionedFlow getFlow(final String flowIdentifier) { |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist."); |
| } |
| |
| final BucketEntity existingBucket = metadataService.getBucketById(existingFlow.getBucketId()); |
| return FlowMappings.map(existingBucket, existingFlow); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public List<VersionedFlow> getFlows(final String bucketId) { |
| if (StringUtils.isBlank(bucketId)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null"); |
| } |
| |
| readLock.lock(); |
| try { |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketId); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketId); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // return non-verbose set of flows for the given bucket |
| final List<FlowEntity> flows = metadataService.getFlowsByBucket(existingBucket.getId()); |
| return flows.stream().map(f -> FlowMappings.map(existingBucket, f)).collect(Collectors.toList()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VersionedFlow updateFlow(final VersionedFlow versionedFlow) { |
| if (versionedFlow == null) { |
| throw new IllegalArgumentException("Versioned flow cannot be null"); |
| } |
| |
| if (StringUtils.isBlank(versionedFlow.getIdentifier())) { |
| throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(versionedFlow.getBucketIdentifier())) { |
| throw new IllegalArgumentException("Versioned flow bucket identifier cannot be null or blank"); |
| } |
| |
| if (versionedFlow.getName() != null && StringUtils.isBlank(versionedFlow.getName())) { |
| throw new IllegalArgumentException("Versioned flow name cannot be blank"); |
| } |
| |
| writeLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(versionedFlow.getBucketIdentifier()); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", versionedFlow.getBucketIdentifier()); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(versionedFlow.getIdentifier()); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", versionedFlow.getIdentifier()); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| // ensure a different flow with the same name does not exist |
| // since we're allowing partial updates here, only check this if a non-null name is provided |
| if (StringUtils.isNotBlank(versionedFlow.getName())) { |
| final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName()); |
| if (flowsWithSameName != null) { |
| for (final FlowEntity flowWithSameName : flowsWithSameName) { |
| if(!flowWithSameName.getId().equals(existingFlow.getId())) { |
| throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket"); |
| } |
| } |
| } |
| } |
| |
| // transfer over the new values to the existing flow |
| if (StringUtils.isNotBlank(versionedFlow.getName())) { |
| existingFlow.setName(versionedFlow.getName()); |
| } |
| |
| if (versionedFlow.getDescription() != null) { |
| existingFlow.setDescription(versionedFlow.getDescription()); |
| } |
| |
| // perform the actual update |
| final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow); |
| return FlowMappings.map(existingBucket, updatedFlow); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public VersionedFlow deleteFlow(final String bucketIdentifier, final String flowIdentifier) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| writeLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| // delete all snapshots from the flow persistence provider |
| flowPersistenceProvider.deleteAllFlowContent(existingFlow.getBucketId(), existingFlow.getId()); |
| |
| // now delete the flow from the metadata provider |
| metadataService.deleteFlow(existingFlow); |
| |
| return FlowMappings.map(existingBucket, existingFlow); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| // ---------------------- VersionedFlowSnapshot methods --------------------------------------------- |
| |
| public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) { |
| if (flowSnapshot == null) { |
| throw new IllegalArgumentException("Versioned flow snapshot cannot be null"); |
| } |
| |
| // validation will ensure that the metadata and contents are not null |
| if (flowSnapshot.getSnapshotMetadata() != null) { |
| flowSnapshot.getSnapshotMetadata().setTimestamp(System.currentTimeMillis()); |
| } |
| |
| // these fields aren't used for creation |
| flowSnapshot.setFlow(null); |
| flowSnapshot.setBucket(null); |
| |
| validate(flowSnapshot, "Cannot create versioned flow snapshot"); |
| |
| writeLock.lock(); |
| try { |
| final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); |
| |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(snapshotMetadata.getBucketIdentifier()); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", snapshotMetadata.getBucketIdentifier()); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(snapshotMetadata.getFlowIdentifier()); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", snapshotMetadata.getFlowIdentifier()); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| if (snapshotMetadata.getVersion() == 0) { |
| throw new IllegalArgumentException("Version must be greater than zero, or use -1 to indicate latest version"); |
| } |
| |
| // convert the set of FlowSnapshotEntity to set of VersionedFlowSnapshotMetadata |
| final SortedSet<VersionedFlowSnapshotMetadata> sortedSnapshots = new TreeSet<>(); |
| final List<FlowSnapshotEntity> existingFlowSnapshots = metadataService.getSnapshots(existingFlow.getId()); |
| if (existingFlowSnapshots != null) { |
| existingFlowSnapshots.stream().forEach(s -> sortedSnapshots.add(FlowMappings.map(existingBucket, s))); |
| } |
| |
| // if we already have snapshots we need to verify the new one has the correct version |
| if (sortedSnapshots.size() > 0) { |
| final VersionedFlowSnapshotMetadata lastSnapshot = sortedSnapshots.last(); |
| |
| // if we have existing versions and a client sends -1, then make this the latest version |
| if (snapshotMetadata.getVersion() == -1) { |
| snapshotMetadata.setVersion(lastSnapshot.getVersion() + 1); |
| } else if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) { |
| throw new IllegalStateException("A Versioned flow snapshot with the same version already exists: " + snapshotMetadata.getVersion()); |
| } else if (snapshotMetadata.getVersion() > (lastSnapshot.getVersion() + 1)) { |
| throw new IllegalStateException("Version must be a one-up number, last version was " + lastSnapshot.getVersion() |
| + " and version for this snapshot was " + snapshotMetadata.getVersion()); |
| } |
| |
| } else if (snapshotMetadata.getVersion() == -1) { |
| // if we have no existing versions and a client sends -1, then this is the first version |
| snapshotMetadata.setVersion(1); |
| } else if (snapshotMetadata.getVersion() != 1) { |
| throw new IllegalStateException("Version of first snapshot must be 1"); |
| } |
| |
| // serialize the snapshot |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| registryUrlAliasService.setInternal(flowSnapshot.getFlowContents()); |
| |
| final FlowContent flowContent = new FlowContent(); |
| flowContent.setFlowSnapshot(flowSnapshot); |
| |
| // temporarily remove the metadata so it isn't serialized, but then put it back for returning the response |
| flowSnapshot.setSnapshotMetadata(null); |
| flowContentSerializer.serializeFlowContent(flowContent, out); |
| flowSnapshot.setSnapshotMetadata(snapshotMetadata); |
| |
| // save the serialized snapshot to the persistence provider |
| final Bucket bucket = BucketMappings.map(existingBucket); |
| final VersionedFlow versionedFlow = FlowMappings.map(existingBucket, existingFlow); |
| final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, versionedFlow, snapshotMetadata).build(); |
| flowPersistenceProvider.saveFlowContent(context, out.toByteArray()); |
| |
| // create snapshot in the metadata provider |
| metadataService.createFlowSnapshot(FlowMappings.map(snapshotMetadata)); |
| |
| // update the modified date on the flow |
| metadataService.updateFlow(existingFlow); |
| |
| // get the updated flow, we need to use "with counts" here so we can return this is a part of the response |
| final FlowEntity updatedFlow = metadataService.getFlowByIdWithSnapshotCounts(snapshotMetadata.getFlowIdentifier()); |
| if (updatedFlow == null) { |
| throw new ResourceNotFoundException("Versioned flow does not exist for identifier " + snapshotMetadata.getFlowIdentifier()); |
| } |
| final VersionedFlow updatedVersionedFlow = FlowMappings.map(existingBucket, updatedFlow); |
| |
| flowSnapshot.setBucket(bucket); |
| flowSnapshot.setFlow(updatedVersionedFlow); |
| registryUrlAliasService.setExternal(flowSnapshot.getFlowContents()); |
| return flowSnapshot; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public VersionedFlowSnapshot getFlowSnapshot(final String bucketIdentifier, final String flowIdentifier, final Integer version) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| if (version == null) { |
| throw new IllegalArgumentException("Version cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // we need to populate the version count here so we have to do this retrieval instead of snapshotEntity.getFlow() |
| final FlowEntity flowEntityWithCount = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier); |
| if (flowEntityWithCount == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(flowEntityWithCount.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| return getVersionedFlowSnapshot(existingBucket, flowEntityWithCount, version); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private VersionedFlowSnapshot getVersionedFlowSnapshot(final BucketEntity bucketEntity, final FlowEntity flowEntity, final Integer version) { |
| // ensure the snapshot exists |
| final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowEntity.getId(), version); |
| if (snapshotEntity == null) { |
| LOGGER.warn("The specified flow snapshot id [{}] does not exist for version [{}].", flowEntity.getId(), version); |
| throw new ResourceNotFoundException("The specified versioned flow snapshot does not exist for this flow."); |
| } |
| |
| // get the serialized bytes of the snapshot |
| final byte[] serializedSnapshot = flowPersistenceProvider.getFlowContent(bucketEntity.getId(), flowEntity.getId(), version); |
| |
| if (serializedSnapshot == null || serializedSnapshot.length == 0) { |
| throw new IllegalStateException("No serialized content found for snapshot with flow identifier " |
| + flowEntity.getId() + " and version " + version); |
| } |
| |
| // deserialize the content |
| final InputStream input = new ByteArrayInputStream(serializedSnapshot); |
| final VersionedFlowSnapshot snapshot = deserializeFlowContent(input); |
| |
| // map entities to data model |
| final Bucket bucket = BucketMappings.map(bucketEntity); |
| final VersionedFlow versionedFlow = FlowMappings.map(bucketEntity, flowEntity); |
| final VersionedFlowSnapshotMetadata snapshotMetadata = FlowMappings.map(bucketEntity, snapshotEntity); |
| |
| // create the snapshot to return |
| registryUrlAliasService.setExternal(snapshot.getFlowContents()); |
| snapshot.setSnapshotMetadata(snapshotMetadata); |
| snapshot.setFlow(versionedFlow); |
| snapshot.setBucket(bucket); |
| return snapshot; |
| } |
| |
| private VersionedFlowSnapshot deserializeFlowContent(final InputStream input) { |
| // attempt to read the version header from the serialized content |
| final int dataModelVersion = flowContentSerializer.readDataModelVersion(input); |
| |
| // determine how to do deserialize based on the data model version |
| if (flowContentSerializer.isProcessGroupVersion(dataModelVersion)) { |
| final VersionedProcessGroup processGroup = flowContentSerializer.deserializeProcessGroup(dataModelVersion, input); |
| final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); |
| snapshot.setFlowContents(processGroup); |
| return snapshot; |
| } else { |
| final FlowContent flowContent = flowContentSerializer.deserializeFlowContent(dataModelVersion, input); |
| return flowContent.getFlowSnapshot(); |
| } |
| } |
| |
| /** |
| * Returns all versions of a flow, sorted newest to oldest. |
| * |
| * @param bucketIdentifier the id of the bucket to search for the flowIdentifier |
| * @param flowIdentifier the id of the flow to retrieve from the specified bucket |
| * @return all versions of the specified flow, sorted newest to oldest |
| */ |
| public SortedSet<VersionedFlowSnapshotMetadata> getFlowSnapshots(final String bucketIdentifier, final String flowIdentifier) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| // convert the set of FlowSnapshotEntity to set of VersionedFlowSnapshotMetadata, ordered by version descending |
| final SortedSet<VersionedFlowSnapshotMetadata> sortedSnapshots = new TreeSet<>(Collections.reverseOrder()); |
| final List<FlowSnapshotEntity> existingFlowSnapshots = metadataService.getSnapshots(existingFlow.getId()); |
| if (existingFlowSnapshots != null) { |
| existingFlowSnapshots.stream().forEach(s -> sortedSnapshots.add(FlowMappings.map(existingBucket, s))); |
| } |
| |
| return sortedSnapshots; |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VersionedFlowSnapshotMetadata getLatestFlowSnapshotMetadata(final String bucketIdentifier, final String flowIdentifier) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| // get latest snapshot for the flow |
| final FlowSnapshotEntity latestSnapshot = metadataService.getLatestSnapshot(existingFlow.getId()); |
| if (latestSnapshot == null) { |
| throw new ResourceNotFoundException("The specified flow ID has no versions"); |
| } |
| |
| return FlowMappings.map(existingBucket, latestSnapshot); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VersionedFlowSnapshotMetadata getLatestFlowSnapshotMetadata(final String flowIdentifier) { |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| readLock.lock(); |
| try { |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(existingFlow.getBucketId()); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", existingFlow.getBucketId()); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // get latest snapshot for the flow |
| final FlowSnapshotEntity latestSnapshot = metadataService.getLatestSnapshot(existingFlow.getId()); |
| if (latestSnapshot == null) { |
| throw new ResourceNotFoundException("The specified flow ID has no versions"); |
| } |
| |
| return FlowMappings.map(existingBucket, latestSnapshot); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String bucketIdentifier, final String flowIdentifier, final Integer version) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| if (version == null) { |
| throw new IllegalArgumentException("Version cannot be null or blank"); |
| } |
| |
| writeLock.lock(); |
| try { |
| // ensure the bucket exists |
| final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); |
| if (existingBucket == null) { |
| LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); |
| throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); |
| } |
| |
| // ensure the flow exists |
| final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier); |
| if (existingFlow == null) { |
| LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier); |
| throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket."); |
| } |
| |
| if (!existingBucket.getId().equals(existingFlow.getBucketId())) { |
| throw new IllegalStateException("The requested flow is not located in the given bucket"); |
| } |
| |
| // ensure the snapshot exists |
| final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version); |
| if (snapshotEntity == null) { |
| throw new ResourceNotFoundException("Versioned flow snapshot does not exist for flow " |
| + flowIdentifier + " and version " + version); |
| } |
| |
| // delete the content of the snapshot |
| flowPersistenceProvider.deleteFlowContent(bucketIdentifier, flowIdentifier, version); |
| |
| // delete the snapshot itself |
| metadataService.deleteFlowSnapshot(snapshotEntity); |
| return FlowMappings.map(existingBucket, snapshotEntity); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns the differences between two specified versions of a flow. |
| * |
| * @param bucketIdentifier the id of the bucket the flow exists in |
| * @param flowIdentifier the flow to be examined |
| * @param versionA the first version of the comparison |
| * @param versionB the second version of the comparison |
| * @return The differences between two specified versions, grouped by component. |
| */ |
| public VersionedFlowDifference getFlowDiff(final String bucketIdentifier, final String flowIdentifier, |
| final Integer versionA, final Integer versionB) { |
| if (StringUtils.isBlank(bucketIdentifier)) { |
| throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); |
| } |
| |
| if (StringUtils.isBlank(flowIdentifier)) { |
| throw new IllegalArgumentException("Flow identifier cannot be null or blank"); |
| } |
| |
| if (versionA == null || versionB == null) { |
| throw new IllegalArgumentException("Version cannot be null or blank"); |
| } |
| // older version is always the lower, regardless of the order supplied |
| final Integer older = Math.min(versionA, versionB); |
| final Integer newer = Math.max(versionA, versionB); |
| |
| readLock.lock(); |
| try { |
| // Get the content for both versions of the flow |
| final byte[] serializedSnapshotA = flowPersistenceProvider.getFlowContent(bucketIdentifier, flowIdentifier, older); |
| if (serializedSnapshotA == null || serializedSnapshotA.length == 0) { |
| throw new IllegalStateException("No serialized content found for snapshot with flow identifier " |
| + flowIdentifier + " and version " + older); |
| } |
| |
| final byte[] serializedSnapshotB = flowPersistenceProvider.getFlowContent(bucketIdentifier, flowIdentifier, newer); |
| if (serializedSnapshotB == null || serializedSnapshotB.length == 0) { |
| throw new IllegalStateException("No serialized content found for snapshot with flow identifier " |
| + flowIdentifier + " and version " + newer); |
| } |
| |
| // deserialize the contents |
| final InputStream inputA = new ByteArrayInputStream(serializedSnapshotA); |
| final VersionedFlowSnapshot snapshotA = deserializeFlowContent(inputA); |
| final VersionedProcessGroup flowContentsA = snapshotA.getFlowContents(); |
| |
| final InputStream inputB = new ByteArrayInputStream(serializedSnapshotB); |
| final VersionedFlowSnapshot snapshotB = deserializeFlowContent(inputB); |
| final VersionedProcessGroup flowContentsB = snapshotB.getFlowContents(); |
| |
| final ComparableDataFlow comparableFlowA = new StandardComparableDataFlow(String.format("Version %d", older), flowContentsA); |
| final ComparableDataFlow comparableFlowB = new StandardComparableDataFlow(String.format("Version %d", newer), flowContentsB); |
| |
| // Compare the two versions of the flow |
| final FlowComparator flowComparator = new StandardFlowComparator(comparableFlowA, comparableFlowB, |
| null, new ConciseEvolvingDifferenceDescriptor()); |
| final FlowComparison flowComparison = flowComparator.compare(); |
| |
| final VersionedFlowDifference result = new VersionedFlowDifference(); |
| result.setBucketId(bucketIdentifier); |
| result.setFlowId(flowIdentifier); |
| result.setVersionA(older); |
| result.setVersionB(newer); |
| |
| final Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences()); |
| result.setComponentDifferenceGroups(differenceGroups); |
| |
| return result; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Group the differences in the comparison by component |
| * @param flowDifferences The differences to group together by component |
| * @return A set of componentDifferenceGroups where each entry contains a set of differences specific to that group |
| */ |
| private Set<ComponentDifferenceGroup> getStringComponentDifferenceGroupMap(Set<FlowDifference> flowDifferences) { |
| Map<String, ComponentDifferenceGroup> differenceGroups = new HashMap<>(); |
| for (FlowDifference diff : flowDifferences) { |
| ComponentDifferenceGroup group; |
| // A component may only exist on only one version for new/removed components |
| VersionedComponent component = ObjectUtils.firstNonNull(diff.getComponentA(), diff.getComponentB()); |
| if(differenceGroups.containsKey(component.getIdentifier())){ |
| group = differenceGroups.get(component.getIdentifier()); |
| }else{ |
| group = FlowMappings.map(component); |
| differenceGroups.put(component.getIdentifier(), group); |
| } |
| group.getDifferences().add(FlowMappings.map(diff)); |
| } |
| return differenceGroups.values().stream().collect(Collectors.toSet()); |
| } |
| |
| // ---------------------- Bundle methods --------------------------------------------- |
| |
| public BundleVersion createBundleVersion(final String bucketIdentifier, final BundleType bundleType, |
| final InputStream inputStream, final String clientSha256) throws IOException { |
| writeLock.lock(); |
| try { |
| return extensionService.createBundleVersion(bucketIdentifier, bundleType, inputStream, clientSha256); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public List<Bundle> getBundles(final Set<String> bucketIdentifiers, final BundleFilterParams filterParams) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundles(bucketIdentifiers, filterParams); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public List<Bundle> getBundlesByBucket(final String bucketIdentifier) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundlesByBucket(bucketIdentifier); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public Bundle getBundle(final String extensionBundleId) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundle(extensionBundleId); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public Bundle deleteBundle(final Bundle bundle) { |
| writeLock.lock(); |
| try { |
| return extensionService.deleteBundle(bundle); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public SortedSet<BundleVersionMetadata> getBundleVersions(final Set<String> bucketIdentifiers, final BundleVersionFilterParams filterParams) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundleVersions(bucketIdentifiers, filterParams); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| |
| public SortedSet<BundleVersionMetadata> getBundleVersions(final String extensionBundleIdentifier) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundleVersions(extensionBundleIdentifier); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public BundleVersion getBundleVersion(final String bucketId, final String bundleId, final String version) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundleVersion(bucketId, bundleId, version); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public BundleVersion getBundleVersion(final String bucketId, final String groupId, final String artifactId, final String version) { |
| readLock.lock(); |
| try { |
| return extensionService.getBundleVersion(bucketId, groupId, artifactId, version); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void writeBundleVersionContent(final BundleVersion bundleVersion, final OutputStream out) { |
| readLock.lock(); |
| try { |
| extensionService.writeBundleVersionContent(bundleVersion, out); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public BundleVersion deleteBundleVersion(final BundleVersion bundleVersion) { |
| writeLock.lock(); |
| try { |
| return extensionService.deleteBundleVersion(bundleVersion); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| // ---------------------- Extension methods --------------------------------------------- |
| |
| public SortedSet<ExtensionMetadata> getExtensionMetadata(final Set<String> bucketIdentifiers, final ExtensionFilterParams filterParams) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionMetadata(bucketIdentifiers, filterParams); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<ExtensionMetadata> getExtensionMetadata(final Set<String> bucketIdentifiers, final ProvidedServiceAPI serviceAPI) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionMetadata(bucketIdentifiers, serviceAPI); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<ExtensionMetadata> getExtensionMetadata(final BundleVersion bundleVersion) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionMetadata(bundleVersion); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public Extension getExtension(final BundleVersion bundleVersion, final String name) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtension(bundleVersion, name); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void writeExtensionDocs(final BundleVersion bundleVersion, final String name, final OutputStream outputStream) |
| throws IOException { |
| readLock.lock(); |
| try { |
| extensionService.writeExtensionDocs(bundleVersion, name, outputStream); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void writeAdditionalDetailsDocs(final BundleVersion bundleVersion, final String name, final OutputStream outputStream) |
| throws IOException { |
| readLock.lock(); |
| try { |
| extensionService.writeAdditionalDetailsDocs(bundleVersion, name, outputStream); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<TagCount> getExtensionTags() { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionTags(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| // ---------------------- Extension Repository methods --------------------------------------------- |
| |
| public SortedSet<ExtensionRepoBucket> getExtensionRepoBuckets(final Set<String> bucketIds) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionRepoBuckets(bucketIds); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<ExtensionRepoGroup> getExtensionRepoGroups(final Bucket bucket) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionRepoGroups(bucket); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<ExtensionRepoArtifact> getExtensionRepoArtifacts(final Bucket bucket, final String groupId) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionRepoArtifacts(bucket, groupId); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public SortedSet<ExtensionRepoVersionSummary> getExtensionRepoVersions(final Bucket bucket, final String groupId, final String artifactId) { |
| readLock.lock(); |
| try { |
| return extensionService.getExtensionRepoVersions(bucket, groupId, artifactId); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| // ---------------------- Field methods --------------------------------------------- |
| |
| public Set<String> getBucketFields() { |
| return metadataService.getBucketFields(); |
| } |
| |
| public Set<String> getBucketItemFields() { |
| return metadataService.getBucketItemFields(); |
| } |
| |
| public Set<String> getFlowFields() { |
| return metadataService.getFlowFields(); |
| } |
| |
| } |