NIFIREG-209 Rebuild metadata DB from FlowPersistenceProvider when empty DB AND when instance of MetadataAwareFlowPersistenceProvider
This closes #144.
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FlowMetadataSynchronizer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FlowMetadataSynchronizer.java
new file mode 100644
index 0000000..3b8a214
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FlowMetadataSynchronizer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.MetadataAwareFlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.service.MetadataService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+
+@Component
+public class FlowMetadataSynchronizer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowMetadataSynchronizer.class);
+
+ private MetadataService metadataService;
+ private FlowPersistenceProvider persistenceProvider;
+
+ @Autowired
+ public FlowMetadataSynchronizer(final MetadataService metadataService,
+ final FlowPersistenceProvider persistenceProvider) {
+ this.metadataService = metadataService;
+ this.persistenceProvider = persistenceProvider;
+ }
+
+ @EventListener(ContextRefreshedEvent.class)
+ public void synchronize() {
+ LOGGER.info("**************************************************");
+
+ if (!(persistenceProvider instanceof MetadataAwareFlowPersistenceProvider)) {
+ LOGGER.info("* FlowPersistenceProvider is not metadata-aware, nothing to synchronize");
+ LOGGER.info("**************************************************");
+ return;
+ } else {
+ LOGGER.info("* Found metadata-aware FlowPersistenceProvider...");
+ }
+
+ if (!metadataService.getAllBuckets().isEmpty()) {
+ LOGGER.info("* Found existing buckets, will not synchronize metadata");
+ LOGGER.info("**************************************************");
+ return;
+ }
+
+ final MetadataAwareFlowPersistenceProvider metadataAwareFlowPersistenceProvider = (MetadataAwareFlowPersistenceProvider) persistenceProvider;
+ LOGGER.info("* Synchronizing metadata from FlowPersistenceProvider to metadata database...");
+
+ final List<BucketMetadata> metadata = metadataAwareFlowPersistenceProvider.getMetadata();
+ LOGGER.info("* Synchronizing {} bucket(s)", new Object[]{metadata.size()});
+
+ for (final BucketMetadata bucketMetadata : metadata) {
+ final BucketEntity bucketEntity = new BucketEntity();
+ bucketEntity.setId(bucketMetadata.getIdentifier());
+ bucketEntity.setName(bucketMetadata.getName());
+ bucketEntity.setDescription(bucketMetadata.getDescription());
+ bucketEntity.setCreated(new Date());
+ metadataService.createBucket(bucketEntity);
+ createFlows(bucketMetadata);
+ }
+
+ LOGGER.info("* Done synchronizing metadata!");
+ LOGGER.info("**************************************************");
+ }
+
+ private void createFlows(final BucketMetadata bucketMetadata) {
+ LOGGER.info("* Synchronizing {} flow(s) for bucket {}",
+ new Object[]{bucketMetadata.getFlowMetadata().size(), bucketMetadata.getIdentifier()});
+
+ for (final FlowMetadata flowMetadata : bucketMetadata.getFlowMetadata()) {
+ final FlowEntity flowEntity = new FlowEntity();
+ flowEntity.setType(BucketItemEntityType.FLOW);
+ flowEntity.setId(flowMetadata.getIdentifier());
+ flowEntity.setName(flowMetadata.getName());
+ flowEntity.setDescription(flowMetadata.getDescription());
+ flowEntity.setBucketId(bucketMetadata.getIdentifier());
+ flowEntity.setCreated(new Date());
+ flowEntity.setModified(new Date());
+ metadataService.createFlow(flowEntity);
+
+ createFlowSnapshots(flowMetadata);
+ }
+ }
+
+ private void createFlowSnapshots(final FlowMetadata flowMetadata) {
+ LOGGER.info("* Synchronizing {} version(s) for flow {}",
+ new Object[]{flowMetadata.getFlowSnapshotMetadata().size(),
+ flowMetadata.getIdentifier()});
+
+ for (final FlowSnapshotMetadata snapshotMetadata : flowMetadata.getFlowSnapshotMetadata()) {
+ final FlowSnapshotEntity snapshotEntity = new FlowSnapshotEntity();
+ snapshotEntity.setFlowId(flowMetadata.getIdentifier());
+ snapshotEntity.setVersion(snapshotMetadata.getVersion());
+ snapshotEntity.setComments(snapshotMetadata.getComments());
+
+ String author = snapshotMetadata.getAuthor();
+ if (author == null) {
+ author = "unknown";
+ }
+ snapshotEntity.setCreatedBy(author);
+
+ Long created = snapshotMetadata.getCreated();
+ if (created == null) {
+ created = Long.valueOf(System.currentTimeMillis());
+ }
+ snapshotEntity.setCreated(new Date(created));
+
+ metadataService.createFlowSnapshot(snapshotEntity);
+ }
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
index 1728513..fcd35ee 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
@@ -31,6 +31,7 @@
private final String bucketName;
private final String flowId;
private final String flowName;
+ private final String flowDescription;
private final int version;
private final String comments;
private final String author;
@@ -41,6 +42,7 @@
this.bucketName = builder.bucketName;
this.flowId = builder.flowId;
this.flowName = builder.flowName;
+ this.flowDescription = builder.flowDescription;
this.version = builder.version;
this.comments = builder.comments;
this.author = builder.author;
@@ -75,6 +77,11 @@
}
@Override
+ public String getFlowDescription() {
+ return flowDescription;
+ }
+
+ @Override
public int getVersion() {
return version;
}
@@ -103,6 +110,7 @@
private String bucketName;
private String flowId;
private String flowName;
+ private String flowDescription;
private int version;
private String comments;
private String author;
@@ -117,6 +125,7 @@
bucketName(bucket.getName());
flowId(snapshotMetadata.getFlowIdentifier());
flowName(versionedFlow.getName());
+ flowDescription(versionedFlow.getDescription());
version(snapshotMetadata.getVersion());
comments(snapshotMetadata.getComments());
author(snapshotMetadata.getAuthor());
@@ -143,6 +152,11 @@
return this;
}
+ public Builder flowDescription(final String flowDescription) {
+ this.flowDescription = flowDescription;
+ return this;
+ }
+
public Builder version(final int version) {
this.version = version;
return this;
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
index 3595d84..5772d23 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
@@ -70,6 +70,10 @@
return flows.isEmpty();
}
+ Map<String, Flow> getFlows() {
+ return flows;
+ }
+
/**
* Serialize the latest version of this Bucket meta data.
* @return serialized bucket
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
index 1bc7f3f..488a619 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
@@ -47,11 +47,22 @@
versions.put(version, pointer);
}
+ Map<Integer, FlowPointer> getVersions() {
+ return versions;
+ }
+
public static class FlowPointer {
private String gitRev;
private String objectId;
private final String fileName;
+ // May not be populated pre-0.3.0
+ private String flowName;
+ private String flowDescription;
+ private String author;
+ private String comment;
+ private Long created;
+
/**
* Create new FlowPointer instance.
* @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so.
@@ -79,6 +90,46 @@
public void setObjectId(String objectId) {
this.objectId = objectId;
}
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
+
+ public String getFlowDescription() {
+ return flowDescription;
+ }
+
+ public void setFlowDescription(String flowDescription) {
+ this.flowDescription = flowDescription;
+ }
+
+ public String getAuthor() {
+ return author;
+ }
+
+ public void setAuthor(String author) {
+ this.author = author;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+
+ public Long getCreated() {
+ return created;
+ }
+
+ public void setCreated(Long created) {
+ this.created = created;
+ }
}
/**
@@ -91,9 +142,28 @@
if (!latestVerOpt.isPresent()) {
throw new IllegalStateException("Flow version is not added yet, can not be serialized.");
}
+
final Integer latestVer = latestVerOpt.get();
+ final Flow.FlowPointer latestFlowPointer = versions.get(latestVer);
+
map.put(GitFlowMetaData.VER, latestVer);
- map.put(GitFlowMetaData.FILE, versions.get(latestVer).fileName);
+ map.put(GitFlowMetaData.FILE, latestFlowPointer.fileName);
+
+ if (latestFlowPointer.flowName != null) {
+ map.put(GitFlowMetaData.FLOW_NAME, latestFlowPointer.flowName);
+ }
+ if (latestFlowPointer.flowDescription != null) {
+ map.put(GitFlowMetaData.FLOW_DESC, latestFlowPointer.flowDescription);
+ }
+ if (latestFlowPointer.author != null) {
+ map.put(GitFlowMetaData.AUTHOR, latestFlowPointer.author);
+ }
+ if (latestFlowPointer.comment != null) {
+ map.put(GitFlowMetaData.COMMENTS, latestFlowPointer.comment);
+ }
+ if (latestFlowPointer.created != null) {
+ map.put(GitFlowMetaData.CREATED, latestFlowPointer.created);
+ }
return map;
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
index 4faf007..c5ee277 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
@@ -67,6 +67,11 @@
static final String FLOWS = "flows";
static final String VER = "ver";
static final String FILE = "file";
+ static final String FLOW_NAME = "flowName";
+ static final String FLOW_DESC = "flowDesc";
+ static final String AUTHOR = "author";
+ static final String COMMENTS = "comments";
+ static final String CREATED = "created";
static final String BUCKET_FILENAME = "bucket.yml";
private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
@@ -318,6 +323,23 @@
}
pointer.setGitRev(commit.getName());
pointer.setObjectId(objectId.getName());
+
+ if (flowMeta.containsKey(FLOW_NAME)) {
+ pointer.setFlowName((String)flowMeta.get(FLOW_NAME));
+ }
+ if (flowMeta.containsKey(FLOW_DESC)) {
+ pointer.setFlowDescription((String)flowMeta.get(FLOW_DESC));
+ }
+ if (flowMeta.containsKey(AUTHOR)) {
+ pointer.setAuthor((String)flowMeta.get(AUTHOR));
+ }
+ if (flowMeta.containsKey(COMMENTS)) {
+ pointer.setComment((String)flowMeta.get(COMMENTS));
+ }
+ if (flowMeta.containsKey(CREATED)) {
+ pointer.setCreated((long)flowMeta.get(CREATED));
+ }
+
flow.putVersion(version, pointer);
}
}
@@ -341,6 +363,9 @@
return Optional.ofNullable(buckets.get(bucketId));
}
+ Map<String, Bucket> getBuckets() {
+ return buckets;
+ }
void saveBucket(final Bucket bucket, final File bucketDir) throws IOException {
final Yaml yaml = new Yaml();
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
index e34c86f..08fa467 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
@@ -17,8 +17,11 @@
package org.apache.nifi.registry.provider.flow.git;
import org.apache.nifi.registry.flow.FlowPersistenceException;
-import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.MetadataAwareFlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
import org.apache.nifi.registry.util.FileUtils;
@@ -30,6 +33,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -37,7 +43,7 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.registry.util.FileUtils.sanitizeFilename;
-public class GitFlowPersistenceProvider implements FlowPersistenceProvider {
+public class GitFlowPersistenceProvider implements MetadataAwareFlowPersistenceProvider {
private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
@@ -117,6 +123,12 @@
// Add new version.
final Flow.FlowPointer flowPointer = new Flow.FlowPointer(flowSnapshotFilename);
+ flowPointer.setFlowName(context.getFlowName());
+ flowPointer.setFlowDescription(context.getFlowDescription());
+ flowPointer.setAuthor(context.getAuthor());
+ flowPointer.setComment(context.getComments());
+ flowPointer.setCreated(context.getSnapshotTimestamp());
+
flow.putVersion(context.getVersion(), flowPointer);
final File bucketDir = new File(flowStorageDir, bucketDirName);
@@ -262,4 +274,77 @@
// TODO: Do nothing? This signature is not used. Actually there's nothing to do to the old versions as those exist in old commits even if this method is called.
}
+ @Override
+ public List<BucketMetadata> getMetadata() {
+ final Map<String, Bucket> gitBuckets = flowMetaData.getBuckets();
+ if (gitBuckets == null || gitBuckets.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<BucketMetadata> bucketMetadataList = new ArrayList<>();
+ for (Map.Entry<String,Bucket> bucketEntry : gitBuckets.entrySet()) {
+ final String bucketId = bucketEntry.getKey();
+ final Bucket gitBucket = bucketEntry.getValue();
+
+ final BucketMetadata bucketMetadata = new BucketMetadata();
+ bucketMetadata.setIdentifier(bucketId);
+ bucketMetadata.setName(gitBucket.getBucketDirName());
+ bucketMetadata.setFlowMetadata(createFlowMetadata(gitBucket));
+ bucketMetadataList.add(bucketMetadata);
+ }
+ return bucketMetadataList;
+ }
+
+ private List<FlowMetadata> createFlowMetadata(final Bucket bucket) {
+ if (bucket.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<FlowMetadata> flowMetadataList = new ArrayList<>();
+ for (Map.Entry<String, Flow> flowEntry : bucket.getFlows().entrySet()) {
+ final String flowId = flowEntry.getKey();
+ final Flow flow = flowEntry.getValue();
+
+ final Optional<Integer> latestVersion = flow.getLatestVersion();
+ if (latestVersion.isPresent()) {
+ final Flow.FlowPointer latestFlowPointer = flow.getFlowVersion(latestVersion.get());
+
+ String flowName = latestFlowPointer.getFlowName();
+ if (flowName == null) {
+ flowName = latestFlowPointer.getFileName();
+ if (flowName.endsWith(".snapshot")) {
+ flowName = flowName.substring(0, flowName.lastIndexOf("."));
+ }
+ }
+
+
+ final FlowMetadata flowMetadata = new FlowMetadata();
+ flowMetadata.setIdentifier(flowId);
+ flowMetadata.setName(flowName);
+ flowMetadata.setDescription(latestFlowPointer.getFlowDescription());
+ flowMetadata.setFlowSnapshotMetadata(createFlowSnapshotMetdata(flow));
+ flowMetadataList.add(flowMetadata);
+ }
+ }
+ return flowMetadataList;
+ }
+
+ private List<FlowSnapshotMetadata> createFlowSnapshotMetdata(final Flow flow) {
+ final List<FlowSnapshotMetadata> flowSnapshotMetadataList = new ArrayList<>();
+
+ final Map<Integer, Flow.FlowPointer> versions = flow.getVersions();
+ for (Map.Entry<Integer, Flow.FlowPointer> entry : versions.entrySet()) {
+ final Integer version = entry.getKey();
+ final Flow.FlowPointer flowPointer = entry.getValue();
+
+ final FlowSnapshotMetadata snapshotMetadata = new FlowSnapshotMetadata();
+ snapshotMetadata.setVersion(version);
+ snapshotMetadata.setAuthor(flowPointer.getAuthor());
+ snapshotMetadata.setComments(flowPointer.getComment());
+ snapshotMetadata.setCreated(flowPointer.getCreated());
+ flowSnapshotMetadataList.add(snapshotMetadata);
+ }
+
+ return flowSnapshotMetadataList;
+ }
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFlowMetadataSynchronizer.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFlowMetadataSynchronizer.java
new file mode 100644
index 0000000..6ee5e62
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFlowMetadataSynchronizer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.MetadataAwareFlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.service.MetadataService;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestFlowMetadataSynchronizer {
+
+ private MetadataService metadataService;
+ private MetadataAwareFlowPersistenceProvider metadataAwareflowPersistenceProvider;
+ private FlowPersistenceProvider standardFlowPersistenceProvider;
+ private List<BucketMetadata> metadata;
+ private FlowMetadataSynchronizer synchronizer;
+
+ @Before
+ public void setup() {
+ metadataService = mock(MetadataService.class);
+ metadataAwareflowPersistenceProvider = mock(MetadataAwareFlowPersistenceProvider.class);
+ standardFlowPersistenceProvider = mock(FlowPersistenceProvider.class);
+ synchronizer = new FlowMetadataSynchronizer(metadataService, metadataAwareflowPersistenceProvider);
+
+ final FlowSnapshotMetadata snapshotMetadata1 = new FlowSnapshotMetadata();
+ snapshotMetadata1.setVersion(1);
+ snapshotMetadata1.setAuthor("user1");
+ snapshotMetadata1.setComments("This is v1");
+ snapshotMetadata1.setCreated(System.currentTimeMillis());
+
+ final FlowSnapshotMetadata snapshotMetadata2 = new FlowSnapshotMetadata();
+ snapshotMetadata2.setVersion(2);
+ snapshotMetadata2.setAuthor("user1");
+ snapshotMetadata2.setComments("This is v2");
+ snapshotMetadata2.setCreated(System.currentTimeMillis());
+
+ final List<FlowSnapshotMetadata> snapshotMetadata = Arrays.asList(snapshotMetadata1, snapshotMetadata2);
+
+ final FlowMetadata flowMetadata1 = new FlowMetadata();
+ flowMetadata1.setIdentifier("1");
+ flowMetadata1.setName("Flow 1");
+ flowMetadata1.setDescription("This is flow 1");
+ flowMetadata1.setFlowSnapshotMetadata(snapshotMetadata);
+
+ final List<FlowMetadata> flowMetadata = Arrays.asList(flowMetadata1);
+
+ final BucketMetadata bucketMetadata = new BucketMetadata();
+ bucketMetadata.setIdentifier("1");
+ bucketMetadata.setName("Bucket 1");
+ bucketMetadata.setDescription("This is bucket 1");
+ bucketMetadata.setFlowMetadata(flowMetadata);
+
+ metadata = Arrays.asList(bucketMetadata);
+ when(metadataAwareflowPersistenceProvider.getMetadata()).thenReturn(metadata);
+ }
+
+ @Test
+ public void testWhenMetadataAwareAndHasDataShouldSynchronize() {
+ when(metadataService.getAllBuckets()).thenReturn(Collections.emptyList());
+
+ synchronizer.synchronize();
+ verify(metadataService, times(1)).createBucket(any(BucketEntity.class));
+ verify(metadataService, times(1)).createFlow(any(FlowEntity.class));
+ verify(metadataService, times(2)).createFlowSnapshot(any(FlowSnapshotEntity.class));
+ }
+
+ @Test
+ public void testWhenMetadataAwareAndDatabaseNotEmptyShouldNotSynchronize() {
+ final BucketEntity bucketEntity = new BucketEntity();
+ bucketEntity.setId("1");
+ when(metadataService.getAllBuckets()).thenReturn(Collections.singletonList(bucketEntity));
+
+ synchronizer.synchronize();
+ verify(metadataService, times(0)).createBucket(any(BucketEntity.class));
+ verify(metadataService, times(0)).createFlow(any(FlowEntity.class));
+ verify(metadataService, times(0)).createFlowSnapshot(any(FlowSnapshotEntity.class));
+ }
+
+ @Test
+ public void testWhenNotMetadataAwareShouldNotSynchronize() {
+ when(metadataService.getAllBuckets()).thenReturn(Collections.emptyList());
+ synchronizer = new FlowMetadataSynchronizer(metadataService, standardFlowPersistenceProvider);
+ synchronizer.synchronize();
+
+ verify(metadataService, times(0)).createBucket(any(BucketEntity.class));
+ verify(metadataService, times(0)).createFlow(any(FlowEntity.class));
+ verify(metadataService, times(0)).createFlowSnapshot(any(FlowSnapshotEntity.class));
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
index 9c2a818..569de8c 100644
--- a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
@@ -42,6 +42,11 @@
String getFlowName();
/**
+ * @return the description of the flow this snapshot belongs to
+ */
+ String getFlowDescription();
+
+ /**
* @return the version of the snapshot
*/
int getVersion();
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/MetadataAwareFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/MetadataAwareFlowPersistenceProvider.java
new file mode 100644
index 0000000..b769380
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/MetadataAwareFlowPersistenceProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.registry.metadata.BucketMetadata;
+
+import java.util.List;
+
+/**
+ * A FlowPersistenceProvider that is able to provide metadata about the flows and buckets.
+ *
+ * If the application is started with an empty metadata database AND a MetadataAwareFlowPersistenceProvider,
+ * then the application will use this information to rebuild the database.
+ *
+ * NOTE: Some information will be lost, such as created date, last modified date, and original author.
+ */
+public interface MetadataAwareFlowPersistenceProvider extends FlowPersistenceProvider {
+
+ /**
+ * @return the list of metadata for each bucket
+ */
+ List<BucketMetadata> getMetadata();
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/BucketMetadata.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/BucketMetadata.java
new file mode 100644
index 0000000..a145c5a
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/BucketMetadata.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import java.util.List;
+
+/**
+ * Meatadata about a bucket returned from MetadataAwareFlowPersistenceProvider.
+ */
+public class BucketMetadata {
+
+ private String identifier;
+
+ private String name;
+
+ private String description;
+
+ private List<FlowMetadata> flowMetadata;
+
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public List<FlowMetadata> getFlowMetadata() {
+ return flowMetadata;
+ }
+
+ public void setFlowMetadata(List<FlowMetadata> flowMetadata) {
+ this.flowMetadata = flowMetadata;
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowMetadata.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowMetadata.java
new file mode 100644
index 0000000..da49565
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowMetadata.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import java.util.List;
+
+/**
+ * Meatadata about a flow returned from MetadataAwareFlowPersistenceProvider.
+ */
+public class FlowMetadata {
+
+ private String identifier;
+
+ private String name;
+
+ private String description;
+
+ private List<FlowSnapshotMetadata> flowSnapshotMetadata;
+
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public List<FlowSnapshotMetadata> getFlowSnapshotMetadata() {
+ return flowSnapshotMetadata;
+ }
+
+ public void setFlowSnapshotMetadata(List<FlowSnapshotMetadata> flowSnapshotMetadata) {
+ this.flowSnapshotMetadata = flowSnapshotMetadata;
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowSnapshotMetadata.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowSnapshotMetadata.java
new file mode 100644
index 0000000..2d27df1
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/FlowSnapshotMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+/**
+ * Meatadata about a snapshot returned from MetadataAwareFlowPersistenceProvider.
+ */
+public class FlowSnapshotMetadata {
+
+ private Integer version;
+
+ private String author;
+
+ private String comments;
+
+ private Long created;
+
+
+ public Integer getVersion() {
+ return version;
+ }
+
+ public void setVersion(Integer version) {
+ this.version = version;
+ }
+
+ public String getAuthor() {
+ return author;
+ }
+
+ public void setAuthor(String author) {
+ this.author = author;
+ }
+
+ public String getComments() {
+ return comments;
+ }
+
+ public void setComments(String comments) {
+ this.comments = comments;
+ }
+
+ public Long getCreated() {
+ return created;
+ }
+
+ public void setCreated(Long created) {
+ this.created = created;
+ }
+}