NIFIREG-267 Updating data model to capture parameter contexts

- Introducing v3 of flow serialization to switch to serializing VersionedFlowSnapshot instead of VersionedProcessGroup
- Moving external controller service references from VersionedProcessGroup to VersionedFlowSnapshot
- Adding boolean to track whether a VersionedPort allows remote access

This closes #181.

Signed-off-by: Kevin Doran <kdoran@apache.org>
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
index 76aceab..4611b9e 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
@@ -25,6 +25,7 @@
 import javax.validation.constraints.NotNull;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -47,13 +48,21 @@
     @NotNull
     private VersionedProcessGroup flowContents;
 
+    // optional map of external controller service references
+    private Map<String,ExternalControllerServiceReference> externalControllerServices;
+
+    // optional parameter contexts mapped by their name
+    private Map<String,VersionedParameterContext> parameterContexts;;
+
+    // optional encoding version that clients may specify to track how the flow contents are encoded
+    private String flowEncodingVersion;
+
     // read-only, only populated from retrieval of a snapshot
     private VersionedFlow flow;
 
     // read-only, only populated from retrieval of a snapshot
     private Bucket bucket;
 
-
     @ApiModelProperty(value = "The metadata for this snapshot", required = true)
     public VersionedFlowSnapshotMetadata getSnapshotMetadata() {
         return snapshotMetadata;
@@ -72,6 +81,15 @@
         this.flowContents = flowContents;
     }
 
+    @ApiModelProperty("The information about controller services that exist outside this versioned flow, but are referenced by components within the versioned flow.")
+    public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
+        return externalControllerServices;
+    }
+
+    public void setExternalControllerServices(Map<String, ExternalControllerServiceReference> externalControllerServices) {
+        this.externalControllerServices = externalControllerServices;
+    }
+
     @ApiModelProperty(value = "The flow this snapshot is for", readOnly = true)
     public VersionedFlow getFlow() {
         return flow;
@@ -90,6 +108,26 @@
         this.bucket = bucket;
     }
 
+    @ApiModelProperty(value = "The parameter contexts referenced by process groups in the flow contents. " +
+            "The mapping is from the name of the context to the context instance, and it is expected that any " +
+            "context in this map is referenced by at least one process group in this flow.")
+    public Map<String,VersionedParameterContext> getParameterContexts() {
+        return parameterContexts;
+    }
+
+    public void setParameterContexts(Map<String,VersionedParameterContext> parameterContexts) {
+        this.parameterContexts = parameterContexts;
+    }
+
+    @ApiModelProperty(value = "The optional encoding version of the flow contents.")
+    public String getFlowEncodingVersion() {
+        return flowEncodingVersion;
+    }
+
+    public void setFlowEncodingVersion(String flowEncodingVersion) {
+        this.flowEncodingVersion = flowEncodingVersion;
+    }
+
     /**
      * This is a convenience method that will return true when flow is populated and when the flow's versionCount
      * is equal to the version of this snapshot.
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
new file mode 100644
index 0000000..857dd16
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Objects;
+
+public class VersionedParameter {
+
+    private String name;
+    private String description;
+    private boolean sensitive;
+    private String value;
+
+    @ApiModelProperty("The name of the parameter")
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @ApiModelProperty("The description of the param")
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    @ApiModelProperty("Whether or not the parameter value is sensitive")
+    public boolean isSensitive() {
+        return sensitive;
+    }
+
+    public void setSensitive(boolean sensitive) {
+        this.sensitive = sensitive;
+    }
+
+    @ApiModelProperty("The value of the parameter")
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        VersionedParameter that = (VersionedParameter) o;
+        return Objects.equals(name, that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name);
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
new file mode 100644
index 0000000..5294c61
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Set;
+
+public class VersionedParameterContext {
+
+    private String name;
+    private Set<VersionedParameter> parameters;
+
+    @ApiModelProperty("The name of the context")
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @ApiModelProperty("The parameters in the context")
+    public Set<VersionedParameter> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Set<VersionedParameter> parameters) {
+        this.parameters = parameters;
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
index 2b7cccd..947b52e 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
@@ -23,6 +23,7 @@
     private PortType type;
     private Integer concurrentlySchedulableTaskCount;
     private ScheduledState scheduledState;
+    private boolean allowRemoteAccess;
 
     @ApiModelProperty("The number of tasks that should be concurrently scheduled for the port.")
     public Integer getConcurrentlySchedulableTaskCount() {
@@ -51,6 +52,15 @@
         this.scheduledState = scheduledState;
     }
 
+    @ApiModelProperty("Whether or not this port allows remote access for site-to-site")
+    public boolean isAllowRemoteAccess() {
+        return allowRemoteAccess;
+    }
+
+    public void setAllowRemoteAccess(boolean allowRemoteAccess) {
+        this.allowRemoteAccess = allowRemoteAccess;
+    }
+
     @Override
     public ComponentType getComponentType() {
         if (type == PortType.OUTPUT_PORT) {
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
index 071574d..ef0d3fe 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
@@ -40,7 +40,8 @@
     private VersionedFlowCoordinates versionedFlowCoordinates = null;
 
     private Map<String, String> variables = new HashMap<>();
-    private Map<String,ExternalControllerServiceReference> externalControllerServices;
+
+    private String parameterContextName;
 
     @ApiModelProperty("The child Process Groups")
     public Set<VersionedProcessGroup> getProcessGroups() {
@@ -146,12 +147,13 @@
         return versionedFlowCoordinates;
     }
 
-    @ApiModelProperty("The information about controller services that exist outside the versioned process group, but are referenced by components within the versioned process group.")
-    public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
-        return externalControllerServices;
+    @ApiModelProperty("The name of the parameter context used by this process group")
+    public String getParameterContextName() {
+        return parameterContextName;
     }
 
-    public void setExternalControllerServices(Map<String, ExternalControllerServiceReference> externalControllerServices) {
-        this.externalControllerServices = externalControllerServices;
+    public void setParameterContextName(String parameterContextName) {
+        this.parameterContextName = parameterContextName;
     }
+
 }
diff --git a/nifi-registry-core/nifi-registry-framework/pom.xml b/nifi-registry-core/nifi-registry-framework/pom.xml
index f5f1d65..d427678 100644
--- a/nifi-registry-core/nifi-registry-framework/pom.xml
+++ b/nifi-registry-core/nifi-registry-framework/pom.xml
@@ -156,6 +156,7 @@
                         <exclude>src/test/resources/serialization/ver1.snapshot</exclude>
                         <exclude>src/test/resources/serialization/ver2.snapshot</exclude>
                         <exclude>src/test/resources/serialization/ver3.snapshot</exclude>
+                        <exclude>src/test/resources/serialization/ver9999.snapshot</exclude>
                         <exclude>src/test/resources/extensions/ConsumeKafkaRecord_1_0.json</exclude>
                     </excludes>
                 </configuration>
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java
new file mode 100644
index 0000000..8340bdd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+
+/**
+ * Wrapper element to contain everything that is serialized for a given version of a flow.
+ */
+public class FlowContent {
+
+    private VersionedFlowSnapshot flowSnapshot;
+
+    public VersionedFlowSnapshot getFlowSnapshot() {
+        return flowSnapshot;
+    }
+
+    public void setFlowSnapshot(VersionedFlowSnapshot flowSnapshot) {
+        this.flowSnapshot = flowSnapshot;
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java
new file mode 100644
index 0000000..3466aa9
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.serialization.jackson.JacksonFlowContentSerializer;
+import org.apache.nifi.registry.serialization.jackson.JacksonVersionedProcessGroupSerializer;
+import org.apache.nifi.registry.serialization.jaxb.JAXBVersionedProcessGroupSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializer that handles versioned serialization for flow content.
+ *
+ * <p>
+ * Current data model version is 3.
+ * Data Model Version Histories:
+ * <ul>
+ *     <li>version 3: Serialized by {@link JacksonFlowContentSerializer}</li>
+ *     <li>version 2: Serialized by {@link JacksonVersionedProcessGroupSerializer}</li>
+ *     <li>version 1: Serialized by {@link JAXBVersionedProcessGroupSerializer}</li>
+ * </ul>
+ * </p>
+ */
+@Service
+public class FlowContentSerializer {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlowContentSerializer.class);
+
+    static final Integer START_USING_SNAPSHOT_VERSION = 3;
+    static final Integer CURRENT_DATA_MODEL_VERSION = 3;
+
+    private final Map<Integer, VersionedSerializer<VersionedProcessGroup>> processGroupSerializers;
+    private final Map<Integer, VersionedSerializer<FlowContent>> flowContentSerializers;
+    private final Map<Integer, VersionedSerializer<?>> allSerializers;
+
+    private final List<Integer> descendingVersions;
+
+    public FlowContentSerializer() {
+        final Map<Integer, VersionedSerializer<FlowContent>> tempFlowContentSerializers = new HashMap<>();
+        tempFlowContentSerializers.put(3, new JacksonFlowContentSerializer());
+        flowContentSerializers = Collections.unmodifiableMap(tempFlowContentSerializers);
+
+        final Map<Integer, VersionedSerializer<VersionedProcessGroup>> tempProcessGroupSerializers = new HashMap<>();
+        tempProcessGroupSerializers.put(2, new JacksonVersionedProcessGroupSerializer());
+        tempProcessGroupSerializers.put(1, new JAXBVersionedProcessGroupSerializer());
+        processGroupSerializers = Collections.unmodifiableMap(tempProcessGroupSerializers);
+
+        final Map<Integer,VersionedSerializer<?>> tempAllSerializers = new HashMap<>();
+        tempAllSerializers.putAll(processGroupSerializers);
+        tempAllSerializers.putAll(flowContentSerializers);
+        allSerializers = Collections.unmodifiableMap(tempAllSerializers);
+
+        final List<Integer> sortedVersions = new ArrayList<>(allSerializers.keySet());
+        sortedVersions.sort(Collections.reverseOrder(Integer::compareTo));
+        this.descendingVersions = sortedVersions;
+    }
+
+    /**
+     * Tries to read a data model version using each VersionedSerializer, in descending version order.
+     * If no version could be read from any serializer, then a SerializationException is thrown.
+     *
+     * When deserializing, clients are expected to call this method to obtain the version, then call
+     * {@method isProcessGroupVersion}, which then determines if {@method deserializeProcessGroup}
+     * should be used, or if {@method deserializeFlowContent} should be used.
+     *
+     * @param input the input stream containing serialized flow content
+     * @return the data model version from the input stream
+     * @throws SerializationException if the data model version could not be read with any serializer
+     */
+    public int readDataModelVersion(final InputStream input) throws SerializationException {
+        final InputStream markSupportedInput = input.markSupported() ? input : new BufferedInputStream(input);
+
+        // Mark the beginning of the stream.
+        markSupportedInput.mark(SerializationConstants.MAX_HEADER_BYTES);
+
+        // Try each serializer in descending version order
+        for (final int serializerVersion : descendingVersions) {
+            final VersionedSerializer<?> serializer = allSerializers.get(serializerVersion);
+            try {
+                return serializer.readDataModelVersion(markSupportedInput);
+            } catch (SerializationException e) {
+                if (logger.isDebugEnabled()) {
+                    logger.error("Unable to read the data model version due to: {}", e.getMessage());
+                }
+                continue;
+            } finally {
+                // Reset the stream position.
+                try {
+                    markSupportedInput.reset();
+                } catch (IOException resetException) {
+                    // Should not happen.
+                    logger.error("Unable to reset the input stream.", resetException);
+                }
+            }
+        }
+
+        throw new SerializationException("Unable to read the data model version for the flow content.");
+    }
+
+    public Integer getCurrentDataModelVersion() {
+        return CURRENT_DATA_MODEL_VERSION;
+    }
+
+    public boolean isProcessGroupVersion(final int dataModelVersion) {
+        return dataModelVersion < START_USING_SNAPSHOT_VERSION;
+    }
+
+    public VersionedProcessGroup deserializeProcessGroup(final int dataModelVersion, final InputStream input) throws SerializationException {
+        final VersionedSerializer<VersionedProcessGroup> serializer = processGroupSerializers.get(dataModelVersion);
+        if (serializer == null) {
+            throw new IllegalArgumentException("No VersionedProcessGroup serializer exists for data model version: " + dataModelVersion);
+        }
+
+        return serializer.deserialize(input);
+    }
+
+    public FlowContent deserializeFlowContent(final int dataModelVersion, final InputStream input) throws SerializationException {
+        final VersionedSerializer<FlowContent> serializer = flowContentSerializers.get(dataModelVersion);
+        if (serializer == null) {
+            throw new IllegalArgumentException("No FlowContent serializer exists for data model version: " + dataModelVersion);
+        }
+
+        return serializer.deserialize(input);
+    }
+
+    public void serializeFlowContent(final FlowContent flowContent, final OutputStream out) throws SerializationException {
+        final VersionedSerializer<FlowContent> serializer = flowContentSerializers.get(CURRENT_DATA_MODEL_VERSION);
+        serializer.serialize(CURRENT_DATA_MODEL_VERSION, flowContent, out);
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java
new file mode 100644
index 0000000..9c9c384
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+public interface SerializationConstants {
+
+    int MAX_HEADER_BYTES = 1024;
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java
deleted file mode 100644
index 8d7cb7b..0000000
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.serialization;
-
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.serialization.jackson.JacksonVersionedProcessGroupSerializer;
-import org.apache.nifi.registry.serialization.jaxb.JAXBVersionedProcessGroupSerializer;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * See {@link AbstractMultiVersionSerializer} for further information.
- *
- * <p>
- * Current data model version is 2.
- * Data Model Version Histories:
- * <ul>
- *     <li>version 2: Serialized by {@link JacksonVersionedProcessGroupSerializer}</li>
- *     <li>version 1: Serialized by {@link JAXBVersionedProcessGroupSerializer}</li>
- * </ul>
- * </p>
- */
-@Service
-public class VersionedProcessGroupSerializer extends AbstractMultiVersionSerializer<VersionedProcessGroup> {
-
-    static final Integer CURRENT_DATA_MODEL_VERSION = 2;
-
-    @Override
-    protected Map<Integer, VersionedSerializer<VersionedProcessGroup>> createVersionedSerializers() {
-        final Map<Integer, VersionedSerializer<VersionedProcessGroup>> tempSerializers = new HashMap<>();
-        tempSerializers.put(2, new JacksonVersionedProcessGroupSerializer());
-        tempSerializers.put(1, new JAXBVersionedProcessGroupSerializer());
-        return tempSerializers;
-    }
-
-    @Override
-    protected int getCurrentDataModelVersion() {
-        return CURRENT_DATA_MODEL_VERSION;
-    }
-
-}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java
new file mode 100644
index 0000000..77dd37a
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jackson;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.SerializationException;
+
+/**
+ * A Jackson serializer for FlowContent.
+ */
+public class JacksonFlowContentSerializer extends JacksonSerializer<FlowContent> {
+
+    @Override
+    TypeReference<SerializationContainer<FlowContent>> getDeserializeTypeRef() throws SerializationException {
+        return new TypeReference<SerializationContainer<org.apache.nifi.registry.serialization.FlowContent>>() {};
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
index 4098c77..d159ea3 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
@@ -18,6 +18,7 @@
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.registry.serialization.SerializationConstants;
 import org.apache.nifi.registry.serialization.SerializationException;
 import org.apache.nifi.registry.serialization.VersionedSerializer;
 import org.slf4j.Logger;
@@ -30,8 +31,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 
-import static org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer.MAX_HEADER_BYTES;
-
 /**
  * A Serializer that uses Jackson for serializing/deserializing.
  */
@@ -80,7 +79,7 @@
 
     @Override
     public int readDataModelVersion(InputStream input) throws SerializationException {
-        final byte[] headerBytes = new byte[MAX_HEADER_BYTES];
+        final byte[] headerBytes = new byte[SerializationConstants.MAX_HEADER_BYTES];
         final int readHeaderBytes;
         try {
             readHeaderBytes = input.read(headerBytes);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
index 21bdecc..4bd763e 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
@@ -21,11 +21,10 @@
 import org.apache.nifi.registry.serialization.SerializationException;
 
 /**
- * A Jackson serializer for VersionedFlowSnapshots.
+ * A Jackson serializer for VersionedProcessGroups.
  */
 public class JacksonVersionedProcessGroupSerializer extends JacksonSerializer<VersionedProcessGroup> {
 
-
     @Override
     TypeReference<SerializationContainer<VersionedProcessGroup>> getDeserializeTypeRef() throws SerializationException {
         return new TypeReference<SerializationContainer<VersionedProcessGroup>>() {};
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
index 7a87ec0..26057ae 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -37,10 +37,10 @@
 import org.apache.nifi.registry.extension.bundle.BundleVersion;
 import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams;
 import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata;
-import org.apache.nifi.registry.extension.component.manifest.Extension;
 import org.apache.nifi.registry.extension.component.ExtensionFilterParams;
 import org.apache.nifi.registry.extension.component.ExtensionMetadata;
 import org.apache.nifi.registry.extension.component.TagCount;
+import org.apache.nifi.registry.extension.component.manifest.Extension;
 import org.apache.nifi.registry.extension.component.manifest.ProvidedServiceAPI;
 import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact;
 import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket;
@@ -62,7 +62,8 @@
 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
 import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate;
 import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
-import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.FlowContentSerializer;
 import org.apache.nifi.registry.service.alias.RegistryUrlAliasService;
 import org.apache.nifi.registry.service.extension.ExtensionService;
 import org.apache.nifi.registry.service.mapper.BucketMappings;
@@ -111,7 +112,7 @@
     private final MetadataService metadataService;
     private final FlowPersistenceProvider flowPersistenceProvider;
     private final BundlePersistenceProvider bundlePersistenceProvider;
-    private final Serializer<VersionedProcessGroup> processGroupSerializer;
+    private final FlowContentSerializer flowContentSerializer;
     private final ExtensionService extensionService;
     private final Validator validator;
     private final RegistryUrlAliasService registryUrlAliasService;
@@ -124,14 +125,14 @@
     public RegistryService(final MetadataService metadataService,
                            final FlowPersistenceProvider flowPersistenceProvider,
                            final BundlePersistenceProvider bundlePersistenceProvider,
-                           final Serializer<VersionedProcessGroup> processGroupSerializer,
+                           final FlowContentSerializer flowContentSerializer,
                            final ExtensionService extensionService,
                            final Validator validator,
                            final RegistryUrlAliasService registryUrlAliasService) {
         this.metadataService = Validate.notNull(metadataService);
         this.flowPersistenceProvider = Validate.notNull(flowPersistenceProvider);
         this.bundlePersistenceProvider = Validate.notNull(bundlePersistenceProvider);
-        this.processGroupSerializer = Validate.notNull(processGroupSerializer);
+        this.flowContentSerializer = Validate.notNull(flowContentSerializer);
         this.extensionService = Validate.notNull(extensionService);
         this.validator = Validate.notNull(validator);
         this.registryUrlAliasService = Validate.notNull(registryUrlAliasService);
@@ -688,7 +689,14 @@
             // serialize the snapshot
             final ByteArrayOutputStream out = new ByteArrayOutputStream();
             registryUrlAliasService.setInternal(flowSnapshot.getFlowContents());
-            processGroupSerializer.serialize(flowSnapshot.getFlowContents(), out);
+
+            final FlowContent flowContent = new FlowContent();
+            flowContent.setFlowSnapshot(flowSnapshot);
+
+            // temporarily remove the metadata so it isn't serialized, but then put it back for returning the response
+            flowSnapshot.setSnapshotMetadata(null);
+            flowContentSerializer.serializeFlowContent(flowContent, out);
+            flowSnapshot.setSnapshotMetadata(snapshotMetadata);
 
             // save the serialized snapshot to the persistence provider
             final Bucket bucket = BucketMappings.map(existingBucket);
@@ -772,9 +780,9 @@
                     + flowEntity.getId() + " and version " + version);
         }
 
-        // deserialize the contents
+        // deserialize the content
         final InputStream input = new ByteArrayInputStream(serializedSnapshot);
-        final VersionedProcessGroup flowContents = processGroupSerializer.deserialize(input);
+        final VersionedFlowSnapshot snapshot = deserializeFlowContent(input);
 
         // map entities to data model
         final Bucket bucket = BucketMappings.map(bucketEntity);
@@ -782,15 +790,29 @@
         final VersionedFlowSnapshotMetadata snapshotMetadata = FlowMappings.map(bucketEntity, snapshotEntity);
 
         // create the snapshot to return
-        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
-        registryUrlAliasService.setExternal(flowContents);
-        snapshot.setFlowContents(flowContents);
+        registryUrlAliasService.setExternal(snapshot.getFlowContents());
         snapshot.setSnapshotMetadata(snapshotMetadata);
         snapshot.setFlow(versionedFlow);
         snapshot.setBucket(bucket);
         return snapshot;
     }
 
+    private VersionedFlowSnapshot deserializeFlowContent(final InputStream input) {
+        // attempt to read the version header from the serialized content
+        final int dataModelVersion = flowContentSerializer.readDataModelVersion(input);
+
+        // determine how to do deserialize based on the data model version
+        if (flowContentSerializer.isProcessGroupVersion(dataModelVersion)) {
+            final VersionedProcessGroup processGroup = flowContentSerializer.deserializeProcessGroup(dataModelVersion, input);
+            final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+            snapshot.setFlowContents(processGroup);
+            return snapshot;
+        } else {
+            final FlowContent flowContent = flowContentSerializer.deserializeFlowContent(dataModelVersion, input);
+            return flowContent.getFlowSnapshot();
+        }
+    }
+
     /**
      * Returns all versions of a flow, sorted newest to oldest.
      *
@@ -1009,9 +1031,12 @@
 
             // deserialize the contents
             final InputStream inputA = new ByteArrayInputStream(serializedSnapshotA);
-            final VersionedProcessGroup flowContentsA = processGroupSerializer.deserialize(inputA);
+            final VersionedFlowSnapshot snapshotA = deserializeFlowContent(inputA);
+            final VersionedProcessGroup flowContentsA = snapshotA.getFlowContents();
+
             final InputStream inputB = new ByteArrayInputStream(serializedSnapshotB);
-            final VersionedProcessGroup flowContentsB = processGroupSerializer.deserialize(inputB);
+            final VersionedFlowSnapshot snapshotB = deserializeFlowContent(inputB);
+            final VersionedProcessGroup flowContentsB = snapshotB.getFlowContents();
 
             final ComparableDataFlow comparableFlowA = new StandardComparableDataFlow(String.format("Version %d", older), flowContentsA);
             final ComparableDataFlow comparableFlowB = new StandardComparableDataFlow(String.format("Version %d", newer), flowContentsB);
@@ -1021,13 +1046,13 @@
                     null, new ConciseEvolvingDifferenceDescriptor());
             final FlowComparison flowComparison = flowComparator.compare();
 
-            VersionedFlowDifference result = new VersionedFlowDifference();
+            final VersionedFlowDifference result = new VersionedFlowDifference();
             result.setBucketId(bucketIdentifier);
             result.setFlowId(flowIdentifier);
             result.setVersionA(older);
             result.setVersionB(newer);
 
-            Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences());
+            final Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences());
             result.setComponentDifferenceGroups(differenceGroups);
 
             return result;
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
new file mode 100644
index 0000000..21d3b43
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+public class TestFlowContentSerializer {
+
+    private FlowContentSerializer serializer;
+
+    @Before
+    public void setup() {
+        serializer = new FlowContentSerializer();
+    }
+
+    @Test
+    public void testSerializeDeserializeFlowContent() {
+        final VersionedProcessor processor1 = new VersionedProcessor();
+        processor1.setIdentifier("processor1");
+        processor1.setName("My Processor 1");
+
+        final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
+        processGroup1.setIdentifier("pg1");
+        processGroup1.setName("My Process Group");
+        processGroup1.getProcessors().add(processor1);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(processGroup1);
+
+        final FlowContent flowContent = new FlowContent();
+        flowContent.setFlowSnapshot(snapshot);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        serializer.serializeFlowContent(flowContent, out);
+
+        //final String json = new String(out.toByteArray(), StandardCharsets.UTF_8);
+        //System.out.println(json);
+
+        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+
+        // make sure we can read the version from the input stream and it should be the current version
+        final Integer version = serializer.readDataModelVersion(in);
+        assertEquals(serializer.getCurrentDataModelVersion(), version);
+        assertEquals(false, serializer.isProcessGroupVersion(version));
+
+        // make sure we can deserialize back to FlowContent
+        final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
+        assertNotNull(deserializedFlowContent);
+
+        final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
+        assertNotNull(deserializedSnapshot);
+
+        final VersionedProcessGroup deserializedProcessGroup1 = deserializedSnapshot.getFlowContents();
+        assertNotNull(deserializedProcessGroup1);
+        assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier());
+        assertEquals(processGroup1.getName(), deserializedProcessGroup1.getName());
+
+        assertEquals(1, deserializedProcessGroup1.getProcessors().size());
+
+        final VersionedProcessor deserializedProcessor1 = deserializedProcessGroup1.getProcessors().iterator().next();
+        assertEquals(processor1.getIdentifier(), deserializedProcessor1.getIdentifier());
+        assertEquals(processor1.getName(), deserializedProcessor1.getName());
+    }
+
+    @Test
+    public void testSerializeDeserializeWithExternalServices() throws SerializationException {
+        final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
+        processGroup1.setIdentifier("pg1");
+        processGroup1.setName("My Process Group");
+
+        final ExternalControllerServiceReference serviceReference1 = new ExternalControllerServiceReference();
+        serviceReference1.setIdentifier("1");
+        serviceReference1.setName("Service 1");
+
+        final ExternalControllerServiceReference serviceReference2 = new ExternalControllerServiceReference();
+        serviceReference2.setIdentifier("2");
+        serviceReference2.setName("Service 2");
+
+        final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
+        serviceReferences.put(serviceReference1.getIdentifier(), serviceReference1);
+        serviceReferences.put(serviceReference2.getIdentifier(), serviceReference2);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(processGroup1);
+        snapshot.setExternalControllerServices(serviceReferences);
+
+        final FlowContent flowContent = new FlowContent();
+        flowContent.setFlowSnapshot(snapshot);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        serializer.serializeFlowContent(flowContent, out);
+
+        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+
+        // make sure we can read the version from the input stream and it should be the current version
+        final Integer version = serializer.readDataModelVersion(in);
+        assertEquals(serializer.getCurrentDataModelVersion(), version);
+
+        // make sure we can deserialize back to FlowContent
+        final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
+        assertNotNull(deserializedFlowContent);
+
+        final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
+        assertNotNull(deserializedSnapshot);
+
+        final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents();
+        assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup.getIdentifier());
+        assertEquals(processGroup1.getName(), deserializedProcessGroup.getName());
+
+        final Map<String,ExternalControllerServiceReference> deserializedServiceReferences = deserializedSnapshot.getExternalControllerServices();
+        assertNotNull(deserializedServiceReferences);
+        assertEquals(2, deserializedServiceReferences.size());
+
+        final ExternalControllerServiceReference deserializedServiceReference1 = deserializedServiceReferences.get(serviceReference1.getIdentifier());
+        assertNotNull(deserializedServiceReference1);
+        assertEquals(serviceReference1.getIdentifier(), deserializedServiceReference1.getIdentifier());
+        assertEquals(serviceReference1.getName(), deserializedServiceReference1.getName());
+    }
+
+    @Test
+    public void testDeserializeJsonNonIntegerVersion() throws IOException {
+        final String file = "/serialization/json/non-integer-version.snapshot";
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            try {
+                serializer.readDataModelVersion(is);
+                fail("Should fail");
+            } catch (SerializationException e) {
+                assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testDeserializeJsonNoVersion() throws IOException {
+        final String file = "/serialization/json/no-version.snapshot";
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            try {
+                serializer.readDataModelVersion(is);
+                fail("Should fail");
+            } catch (SerializationException e) {
+                assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testDeserializeVer1() throws IOException {
+        final String file = "/serialization/ver1.snapshot";
+        final VersionedProcessGroup processGroup;
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            final Integer version = serializer.readDataModelVersion(is);
+            assertNotNull(version);
+            assertEquals(1, version.intValue());
+
+            if (serializer.isProcessGroupVersion(version)) {
+                processGroup = serializer.deserializeProcessGroup(version, is);
+            } else {
+                processGroup = null;
+            }
+        }
+
+        assertNotNull(processGroup);
+        assertNotNull(processGroup.getProcessors());
+        assertTrue(processGroup.getProcessors().size() > 0);
+        //System.out.printf("processGroup=" + processGroup);
+    }
+
+    @Test
+    public void testDeserializeVer2() throws IOException {
+        final String file = "/serialization/ver2.snapshot";
+        final VersionedProcessGroup processGroup;
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            final Integer version = serializer.readDataModelVersion(is);
+            assertNotNull(version);
+            assertEquals(2, version.intValue());
+
+            if (serializer.isProcessGroupVersion(version)) {
+                processGroup = serializer.deserializeProcessGroup(version, is);
+            } else {
+                processGroup = null;
+            }
+        }
+
+        assertNotNull(processGroup);
+        assertNotNull(processGroup.getProcessors());
+        assertTrue(processGroup.getProcessors().size() > 0);
+        //System.out.printf("processGroup=" + processGroup);
+    }
+
+    @Test
+    public void testDeserializeVer3() throws IOException {
+        final String file = "/serialization/ver3.snapshot";
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            final Integer version = serializer.readDataModelVersion(is);
+            assertNotNull(version);
+            assertEquals(3, version.intValue());
+            assertFalse(serializer.isProcessGroupVersion(version));
+
+            final FlowContent flowContent = serializer.deserializeFlowContent(version, is);
+            assertNotNull(flowContent);
+
+            final VersionedFlowSnapshot flowSnapshot = flowContent.getFlowSnapshot();
+            assertNotNull(flowSnapshot);
+
+            final VersionedProcessGroup processGroup = flowSnapshot.getFlowContents();
+            assertNotNull(processGroup);
+            assertNotNull(processGroup.getProcessors());
+            assertEquals(1, processGroup.getProcessors().size());
+        }
+    }
+
+    @Test
+    public void testDeserializeVer9999() throws IOException {
+        final String file = "/serialization/ver9999.snapshot";
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            final Integer version = serializer.readDataModelVersion(is);
+            assertNotNull(version);
+            assertEquals(9999, version.intValue());
+            assertFalse(serializer.isProcessGroupVersion(version));
+
+            try {
+                serializer.deserializeFlowContent(version, is);
+                fail("Should fail");
+            } catch (IllegalArgumentException e) {
+                assertEquals("No FlowContent serializer exists for data model version: " + version, e.getMessage());
+            }
+
+            try {
+                serializer.deserializeProcessGroup(version, is);
+                fail("Should fail");
+            } catch (IllegalArgumentException e) {
+                assertEquals("No VersionedProcessGroup serializer exists for data model version: " + version, e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testDeserializeProcessGroupAsFlowContent() throws IOException {
+        final String file = "/serialization/ver2.snapshot";
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+            final Integer version = serializer.readDataModelVersion(is);
+            assertNotNull(version);
+            assertEquals(2, version.intValue());
+            assertTrue(serializer.isProcessGroupVersion(version));
+
+            final VersionedProcessGroup processGroup = serializer.deserializeProcessGroup(version, is);
+            assertNotNull(processGroup);
+        }
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
deleted file mode 100644
index 145204b..0000000
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.serialization;
-
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class TestVersionedProcessGroupSerializer {
-
-    @Test
-    public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
-        final Serializer<VersionedProcessGroup> serializer = new VersionedProcessGroupSerializer();
-
-        final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
-        processGroup1.setIdentifier("pg1");
-        processGroup1.setName("My Process Group");
-
-        final VersionedProcessor processor1 = new VersionedProcessor();
-        processor1.setIdentifier("processor1");
-        processor1.setName("My Processor 1");
-
-        // make sure nested objects are serialized/deserialized
-        processGroup1.getProcessors().add(processor1);
-
-        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-        serializer.serialize(processGroup1, out);
-
-        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-        final VersionedProcessGroup deserializedProcessGroup1 = serializer.deserialize(in);
-
-        Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier());
-        Assert.assertEquals(processGroup1.getName(), deserializedProcessGroup1.getName());
-
-        Assert.assertEquals(1, deserializedProcessGroup1.getProcessors().size());
-
-        final VersionedProcessor deserializedProcessor1 = deserializedProcessGroup1.getProcessors().iterator().next();
-        Assert.assertEquals(processor1.getIdentifier(), deserializedProcessor1.getIdentifier());
-        Assert.assertEquals(processor1.getName(), deserializedProcessor1.getName());
-
-    }
-
-    @Test
-    public void testSerializeDeserializeWithExternalServices() throws SerializationException {
-        final Serializer<VersionedProcessGroup> serializer = new VersionedProcessGroupSerializer();
-
-        final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
-        processGroup1.setIdentifier("pg1");
-        processGroup1.setName("My Process Group");
-
-        final ExternalControllerServiceReference serviceReference1 = new ExternalControllerServiceReference();
-        serviceReference1.setIdentifier("1");
-        serviceReference1.setName("Service 1");
-
-        final ExternalControllerServiceReference serviceReference2 = new ExternalControllerServiceReference();
-        serviceReference2.setIdentifier("2");
-        serviceReference2.setName("Service 2");
-
-        final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
-        serviceReferences.put(serviceReference1.getIdentifier(), serviceReference1);
-        serviceReferences.put(serviceReference2.getIdentifier(), serviceReference2);
-
-        processGroup1.setExternalControllerServices(serviceReferences);
-
-        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-        serializer.serialize(processGroup1, out);
-
-        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-        final VersionedProcessGroup deserializedProcessGroup = serializer.deserialize(in);
-
-        Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup.getIdentifier());
-        Assert.assertEquals(processGroup1.getName(), deserializedProcessGroup.getName());
-
-        final Map<String,ExternalControllerServiceReference> deserializedServiceReferences = deserializedProcessGroup.getExternalControllerServices();
-        Assert.assertNotNull(deserializedServiceReferences);
-        Assert.assertEquals(2, deserializedServiceReferences.size());
-
-        final ExternalControllerServiceReference deserializedServiceReference1 = deserializedServiceReferences.get(serviceReference1.getIdentifier());
-        Assert.assertNotNull(deserializedServiceReference1);
-        Assert.assertEquals(serviceReference1.getIdentifier(), deserializedServiceReference1.getIdentifier());
-        Assert.assertEquals(serviceReference1.getName(), deserializedServiceReference1.getName());
-    }
-
-    @Test
-    public void testDeserializeJsonNonIntegerVersion() throws IOException {
-        final String file = "/serialization/json/non-integer-version.snapshot";
-        final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
-        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
-            try {
-                serializer.deserialize(is);
-                fail("Should fail");
-            } catch (SerializationException e) {
-                assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
-            }
-        }
-    }
-
-    @Test
-    public void testDeserializeJsonNoVersion() throws IOException {
-        final String file = "/serialization/json/no-version.snapshot";
-        final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
-        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
-            try {
-                serializer.deserialize(is);
-                fail("Should fail");
-            } catch (SerializationException e) {
-                assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
-            }
-        }
-    }
-
-    @Test
-    public void testDeserializeVer1() throws IOException {
-        final String file = "/serialization/ver1.snapshot";
-        final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
-        final VersionedProcessGroup processGroup;
-        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
-            processGroup = serializer.deserialize(is);
-        }
-        System.out.printf("processGroup=" + processGroup);
-    }
-
-    @Test
-    public void testDeserializeVer2() throws IOException {
-        final String file = "/serialization/ver2.snapshot";
-        final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
-        final VersionedProcessGroup processGroup;
-        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
-            processGroup = serializer.deserialize(is);
-        }
-        System.out.printf("processGroup=" + processGroup);
-    }
-
-    @Test
-    public void testDeserializeVer3() throws IOException {
-        final String file = "/serialization/ver3.snapshot";
-        final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
-        try (final InputStream is = this.getClass().getResourceAsStream(file)) {
-            try {
-                serializer.deserialize(is);
-                fail("Should fail");
-            } catch (SerializationException e) {
-                assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
-            }
-        }
-    }
-
-}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
index baeb949..4e43645 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
@@ -31,8 +31,8 @@
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.apache.nifi.registry.serialization.Serializer;
-import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.FlowContentSerializer;
 import org.apache.nifi.registry.service.alias.RegistryUrlAliasService;
 import org.apache.nifi.registry.service.extension.ExtensionService;
 import org.apache.nifi.registry.service.extension.StandardExtensionService;
@@ -77,7 +77,7 @@
     private MetadataService metadataService;
     private FlowPersistenceProvider flowPersistenceProvider;
     private BundlePersistenceProvider bundlePersistenceProvider;
-    private Serializer<VersionedProcessGroup> snapshotSerializer;
+    private FlowContentSerializer flowContentSerializer;
     private ExtensionService extensionService;
     private Validator validator;
     private RegistryUrlAliasService registryUrlAliasService;
@@ -89,7 +89,7 @@
         metadataService = mock(MetadataService.class);
         flowPersistenceProvider = mock(FlowPersistenceProvider.class);
         bundlePersistenceProvider = mock(BundlePersistenceProvider.class);
-        snapshotSerializer = mock(VersionedProcessGroupSerializer.class);
+        flowContentSerializer = mock(FlowContentSerializer.class);
         extensionService = mock(StandardExtensionService.class);
         registryUrlAliasService = mock(RegistryUrlAliasService.class);
 
@@ -97,7 +97,7 @@
         validator = validatorFactory.getValidator();
 
         registryService = new RegistryService(metadataService, flowPersistenceProvider, bundlePersistenceProvider,
-                snapshotSerializer, extensionService, validator, registryUrlAliasService);
+                flowContentSerializer, extensionService, validator, registryUrlAliasService);
     }
 
     // ---------------------- Test Bucket methods ---------------------------------------------
@@ -800,7 +800,7 @@
         assertNotNull(createdSnapshot.getFlow());
         assertNotNull(createdSnapshot.getBucket());
 
-        verify(snapshotSerializer, times(1)).serialize(eq(snapshot.getFlowContents()), any(OutputStream.class));
+        verify(flowContentSerializer, times(1)).serializeFlowContent(any(FlowContent.class), any(OutputStream.class));
         verify(flowPersistenceProvider, times(1)).saveFlowContent(any(), any());
         verify(metadataService, times(1)).createFlowSnapshot(any(FlowSnapshotEntity.class));
     }
@@ -1150,8 +1150,10 @@
                 existingSnapshot.getVersion()
         )).thenReturn(new byte[10]);
 
-        final VersionedFlowSnapshot snapshotToDeserialize = createSnapshot();
-        when(snapshotSerializer.deserialize(any(InputStream.class))).thenReturn(snapshotToDeserialize.getFlowContents());
+        final FlowContent flowContent = new FlowContent();
+        flowContent.setFlowSnapshot(createSnapshot());
+        when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(3);
+        when(flowContentSerializer.deserializeFlowContent(eq(3), any(InputStream.class))).thenReturn(flowContent);
 
         final VersionedFlowSnapshot returnedSnapshot = registryService.getFlowSnapshot(
                 existingBucket.getId(), existingSnapshot.getFlowId(), existingSnapshot.getVersion());
@@ -1253,7 +1255,9 @@
 
         final VersionedProcessGroup pgA = createVersionedProcessGroupA();
         final VersionedProcessGroup pgB = createVersionedProcessGroupB();
-        when(snapshotSerializer.deserialize(any())).thenReturn(pgA, pgB);
+        when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(2);
+        when(flowContentSerializer.isProcessGroupVersion(eq(2))).thenReturn(true);
+        when(flowContentSerializer.deserializeProcessGroup(eq(2),any())).thenReturn(pgA, pgB);
 
         final VersionedFlowDifference diff = registryService.getFlowDiff(
                 "bucketIdentifier", "flowIdentifier", 1, 2);
@@ -1274,7 +1278,9 @@
 
         final VersionedProcessGroup pgA = createVersionedProcessGroupA();
         final VersionedProcessGroup pgB = createVersionedProcessGroupB();
-        when(snapshotSerializer.deserialize(any())).thenReturn(pgA, pgB);
+        when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(2);
+        when(flowContentSerializer.isProcessGroupVersion(eq(2))).thenReturn(true);
+        when(flowContentSerializer.deserializeProcessGroup(eq(2),any())).thenReturn(pgA, pgB);
 
         // getFlowDiff orders the changes in ascending order of version number regardless of param order
         final VersionedFlowDifference diff = registryService.getFlowDiff(
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
index 574fe56..3611014 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
+++ b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
@@ -1,6 +1,28 @@
 {
-  "header": {
-    "dataModelVersion": 3
-  },
-  "content": {}
+    "header" : {
+        "dataModelVersion" : "3"
+    },
+    "content" : {
+        "flowSnapshot" : {
+            "flowContents" : {
+                "componentType" : "PROCESS_GROUP",
+                "connections" : [ ],
+                "controllerServices" : [ ],
+                "funnels" : [ ],
+                "identifier" : "pg1",
+                "inputPorts" : [ ],
+                "labels" : [ ],
+                "name" : "My Process Group",
+                "outputPorts" : [ ],
+                "processGroups" : [ ],
+                "processors" : [ {
+                    "componentType" : "PROCESSOR",
+                    "identifier" : "processor1",
+                    "name" : "My Processor 1"
+                } ],
+                "remoteProcessGroups" : [ ],
+                "variables" : { }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot
new file mode 100644
index 0000000..743401b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot
@@ -0,0 +1,6 @@
+{
+    "header": {
+        "dataModelVersion": 9999
+    },
+    "content": {}
+}
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
index e16fa79..d5e0bc7 100644
--- a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
@@ -60,9 +60,12 @@
 import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion;
 import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
 import org.apache.nifi.registry.field.Fields;
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.VersionedProcessor;
 import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
@@ -86,12 +89,15 @@
 import java.nio.charset.StandardCharsets;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -270,6 +276,9 @@
         final VersionedFlow snapshotFlow = flow1;
 
         final VersionedFlowSnapshot snapshot1 = createSnapshot(snapshotClient, snapshotFlow, 1);
+        assertNotNull(snapshot1);
+        assertNotNull(snapshot1.getSnapshotMetadata());
+        assertEquals(snapshotFlow.getIdentifier(), snapshot1.getSnapshotMetadata().getFlowIdentifier());
         LOGGER.info("Created snapshot # 1 with version " + snapshot1.getSnapshotMetadata().getVersion());
 
         final VersionedFlowSnapshot snapshot2 = createSnapshot(snapshotClient, snapshotFlow, 2);
@@ -805,6 +814,78 @@
 
     }
 
+    @Test
+    public void testFlowSnapshotsWithParameterContextAndEncodingVersion() throws IOException, NiFiRegistryException {
+        // Create a bucket
+        final Bucket bucket = new Bucket();
+        bucket.setName("Test Bucket");
+
+        final Bucket createdBucket = client.getBucketClient().create(bucket);
+        assertNotNull(createdBucket);
+
+        // Create the flow
+        final VersionedFlow flow = new VersionedFlow();
+        flow.setName("My Flow");
+        flow.setBucketIdentifier(createdBucket.getIdentifier());
+
+        final VersionedFlow createdFlow = client.getFlowClient().create(flow);
+        assertNotNull(createdFlow);
+
+        // Create a param context
+        final VersionedParameter param1 = new VersionedParameter();
+        param1.setName("Param 1");
+        param1.setValue("Param 1 Value");
+        param1.setDescription("Description");
+
+        final VersionedParameter param2 = new VersionedParameter();
+        param2.setName("Param 2");
+        param2.setValue("Param 2 Value");
+        param2.setDescription("Description");
+        param2.setSensitive(true);
+
+        final VersionedParameterContext context1 = new VersionedParameterContext();
+        context1.setName("Parameter Context 1");
+        context1.setParameters(new HashSet<>(Arrays.asList(param1, param2)));
+
+        final Map<String,VersionedParameterContext> contexts = new HashMap<>();
+        contexts.put(context1.getName(), context1);
+
+        // Create an external controller service reference
+        final ExternalControllerServiceReference serviceReference = new ExternalControllerServiceReference();
+        serviceReference.setName("External Service 1");
+        serviceReference.setIdentifier(UUID.randomUUID().toString());
+
+        final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
+        serviceReferences.put(serviceReference.getIdentifier(), serviceReference);
+
+        // Create the snapshot
+        final VersionedFlowSnapshot snapshot = buildSnapshot(createdFlow, 1);
+        snapshot.setFlowEncodingVersion("2.0.0");
+        snapshot.setParameterContexts(contexts);
+        snapshot.setExternalControllerServices(serviceReferences);
+
+        final VersionedFlowSnapshot createdSnapshot = client.getFlowSnapshotClient().create(snapshot);
+        assertNotNull(createdSnapshot);
+        assertNotNull(createdSnapshot.getFlowEncodingVersion());
+        assertNotNull(createdSnapshot.getParameterContexts());
+        assertNotNull(createdSnapshot.getExternalControllerServices());
+        assertEquals(snapshot.getFlowEncodingVersion(), createdSnapshot.getFlowEncodingVersion());
+        assertEquals(1, createdSnapshot.getParameterContexts().size());
+        assertEquals(1, createdSnapshot.getExternalControllerServices().size());
+
+        // Retrieve the snapshot
+        final VersionedFlowSnapshot retrievedSnapshot = client.getFlowSnapshotClient().get(
+                createdSnapshot.getSnapshotMetadata().getFlowIdentifier(),
+                createdSnapshot.getSnapshotMetadata().getVersion());
+        assertNotNull(retrievedSnapshot);
+        assertNotNull(retrievedSnapshot.getFlowEncodingVersion());
+        assertNotNull(retrievedSnapshot.getParameterContexts());
+        assertNotNull(retrievedSnapshot.getExternalControllerServices());
+        assertEquals(snapshot.getFlowEncodingVersion(), retrievedSnapshot.getFlowEncodingVersion());
+        assertEquals(1, retrievedSnapshot.getParameterContexts().size());
+        assertEquals(1, retrievedSnapshot.getExternalControllerServices().size());
+    }
+
     private void checkExtensionMetadata(Collection<ExtensionMetadata> extensions) {
         extensions.forEach(e -> {
             assertNotNull(e.getBundleInfo());