NIFIREG-267 Updating data model to capture parameter contexts
- Introducing v3 of flow serialization to switch to serializing VersionedFlowSnapshot instead of VersionedProcessGroup
- Moving external controller service references from VersionedProcessGroup to VersionedFlowSnapshot
- Adding boolean to track whether a VersionedPort allows remote access
This closes #181.
Signed-off-by: Kevin Doran <kdoran@apache.org>
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
index 76aceab..4611b9e 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
@@ -25,6 +25,7 @@
import javax.validation.constraints.NotNull;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
+import java.util.Map;
import java.util.Objects;
/**
@@ -47,13 +48,21 @@
@NotNull
private VersionedProcessGroup flowContents;
+ // optional map of external controller service references
+ private Map<String,ExternalControllerServiceReference> externalControllerServices;
+
+ // optional parameter contexts mapped by their name
+ private Map<String,VersionedParameterContext> parameterContexts;;
+
+ // optional encoding version that clients may specify to track how the flow contents are encoded
+ private String flowEncodingVersion;
+
// read-only, only populated from retrieval of a snapshot
private VersionedFlow flow;
// read-only, only populated from retrieval of a snapshot
private Bucket bucket;
-
@ApiModelProperty(value = "The metadata for this snapshot", required = true)
public VersionedFlowSnapshotMetadata getSnapshotMetadata() {
return snapshotMetadata;
@@ -72,6 +81,15 @@
this.flowContents = flowContents;
}
+ @ApiModelProperty("The information about controller services that exist outside this versioned flow, but are referenced by components within the versioned flow.")
+ public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
+ return externalControllerServices;
+ }
+
+ public void setExternalControllerServices(Map<String, ExternalControllerServiceReference> externalControllerServices) {
+ this.externalControllerServices = externalControllerServices;
+ }
+
@ApiModelProperty(value = "The flow this snapshot is for", readOnly = true)
public VersionedFlow getFlow() {
return flow;
@@ -90,6 +108,26 @@
this.bucket = bucket;
}
+ @ApiModelProperty(value = "The parameter contexts referenced by process groups in the flow contents. " +
+ "The mapping is from the name of the context to the context instance, and it is expected that any " +
+ "context in this map is referenced by at least one process group in this flow.")
+ public Map<String,VersionedParameterContext> getParameterContexts() {
+ return parameterContexts;
+ }
+
+ public void setParameterContexts(Map<String,VersionedParameterContext> parameterContexts) {
+ this.parameterContexts = parameterContexts;
+ }
+
+ @ApiModelProperty(value = "The optional encoding version of the flow contents.")
+ public String getFlowEncodingVersion() {
+ return flowEncodingVersion;
+ }
+
+ public void setFlowEncodingVersion(String flowEncodingVersion) {
+ this.flowEncodingVersion = flowEncodingVersion;
+ }
+
/**
* This is a convenience method that will return true when flow is populated and when the flow's versionCount
* is equal to the version of this snapshot.
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
new file mode 100644
index 0000000..857dd16
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Objects;
+
+public class VersionedParameter {
+
+ private String name;
+ private String description;
+ private boolean sensitive;
+ private String value;
+
+ @ApiModelProperty("The name of the parameter")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @ApiModelProperty("The description of the param")
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @ApiModelProperty("Whether or not the parameter value is sensitive")
+ public boolean isSensitive() {
+ return sensitive;
+ }
+
+ public void setSensitive(boolean sensitive) {
+ this.sensitive = sensitive;
+ }
+
+ @ApiModelProperty("The value of the parameter")
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VersionedParameter that = (VersionedParameter) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
new file mode 100644
index 0000000..5294c61
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Set;
+
+public class VersionedParameterContext {
+
+ private String name;
+ private Set<VersionedParameter> parameters;
+
+ @ApiModelProperty("The name of the context")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @ApiModelProperty("The parameters in the context")
+ public Set<VersionedParameter> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Set<VersionedParameter> parameters) {
+ this.parameters = parameters;
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
index 2b7cccd..947b52e 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedPort.java
@@ -23,6 +23,7 @@
private PortType type;
private Integer concurrentlySchedulableTaskCount;
private ScheduledState scheduledState;
+ private boolean allowRemoteAccess;
@ApiModelProperty("The number of tasks that should be concurrently scheduled for the port.")
public Integer getConcurrentlySchedulableTaskCount() {
@@ -51,6 +52,15 @@
this.scheduledState = scheduledState;
}
+ @ApiModelProperty("Whether or not this port allows remote access for site-to-site")
+ public boolean isAllowRemoteAccess() {
+ return allowRemoteAccess;
+ }
+
+ public void setAllowRemoteAccess(boolean allowRemoteAccess) {
+ this.allowRemoteAccess = allowRemoteAccess;
+ }
+
@Override
public ComponentType getComponentType() {
if (type == PortType.OUTPUT_PORT) {
diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
index 071574d..ef0d3fe 100644
--- a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
+++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedProcessGroup.java
@@ -40,7 +40,8 @@
private VersionedFlowCoordinates versionedFlowCoordinates = null;
private Map<String, String> variables = new HashMap<>();
- private Map<String,ExternalControllerServiceReference> externalControllerServices;
+
+ private String parameterContextName;
@ApiModelProperty("The child Process Groups")
public Set<VersionedProcessGroup> getProcessGroups() {
@@ -146,12 +147,13 @@
return versionedFlowCoordinates;
}
- @ApiModelProperty("The information about controller services that exist outside the versioned process group, but are referenced by components within the versioned process group.")
- public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
- return externalControllerServices;
+ @ApiModelProperty("The name of the parameter context used by this process group")
+ public String getParameterContextName() {
+ return parameterContextName;
}
- public void setExternalControllerServices(Map<String, ExternalControllerServiceReference> externalControllerServices) {
- this.externalControllerServices = externalControllerServices;
+ public void setParameterContextName(String parameterContextName) {
+ this.parameterContextName = parameterContextName;
}
+
}
diff --git a/nifi-registry-core/nifi-registry-framework/pom.xml b/nifi-registry-core/nifi-registry-framework/pom.xml
index f5f1d65..d427678 100644
--- a/nifi-registry-core/nifi-registry-framework/pom.xml
+++ b/nifi-registry-core/nifi-registry-framework/pom.xml
@@ -156,6 +156,7 @@
<exclude>src/test/resources/serialization/ver1.snapshot</exclude>
<exclude>src/test/resources/serialization/ver2.snapshot</exclude>
<exclude>src/test/resources/serialization/ver3.snapshot</exclude>
+ <exclude>src/test/resources/serialization/ver9999.snapshot</exclude>
<exclude>src/test/resources/extensions/ConsumeKafkaRecord_1_0.json</exclude>
</excludes>
</configuration>
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java
new file mode 100644
index 0000000..8340bdd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+
+/**
+ * Wrapper element to contain everything that is serialized for a given version of a flow.
+ */
+public class FlowContent {
+
+ private VersionedFlowSnapshot flowSnapshot;
+
+ public VersionedFlowSnapshot getFlowSnapshot() {
+ return flowSnapshot;
+ }
+
+ public void setFlowSnapshot(VersionedFlowSnapshot flowSnapshot) {
+ this.flowSnapshot = flowSnapshot;
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java
new file mode 100644
index 0000000..3466aa9
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowContentSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.serialization.jackson.JacksonFlowContentSerializer;
+import org.apache.nifi.registry.serialization.jackson.JacksonVersionedProcessGroupSerializer;
+import org.apache.nifi.registry.serialization.jaxb.JAXBVersionedProcessGroupSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializer that handles versioned serialization for flow content.
+ *
+ * <p>
+ * Current data model version is 3.
+ * Data Model Version Histories:
+ * <ul>
+ * <li>version 3: Serialized by {@link JacksonFlowContentSerializer}</li>
+ * <li>version 2: Serialized by {@link JacksonVersionedProcessGroupSerializer}</li>
+ * <li>version 1: Serialized by {@link JAXBVersionedProcessGroupSerializer}</li>
+ * </ul>
+ * </p>
+ */
+@Service
+public class FlowContentSerializer {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowContentSerializer.class);
+
+ static final Integer START_USING_SNAPSHOT_VERSION = 3;
+ static final Integer CURRENT_DATA_MODEL_VERSION = 3;
+
+ private final Map<Integer, VersionedSerializer<VersionedProcessGroup>> processGroupSerializers;
+ private final Map<Integer, VersionedSerializer<FlowContent>> flowContentSerializers;
+ private final Map<Integer, VersionedSerializer<?>> allSerializers;
+
+ private final List<Integer> descendingVersions;
+
+ public FlowContentSerializer() {
+ final Map<Integer, VersionedSerializer<FlowContent>> tempFlowContentSerializers = new HashMap<>();
+ tempFlowContentSerializers.put(3, new JacksonFlowContentSerializer());
+ flowContentSerializers = Collections.unmodifiableMap(tempFlowContentSerializers);
+
+ final Map<Integer, VersionedSerializer<VersionedProcessGroup>> tempProcessGroupSerializers = new HashMap<>();
+ tempProcessGroupSerializers.put(2, new JacksonVersionedProcessGroupSerializer());
+ tempProcessGroupSerializers.put(1, new JAXBVersionedProcessGroupSerializer());
+ processGroupSerializers = Collections.unmodifiableMap(tempProcessGroupSerializers);
+
+ final Map<Integer,VersionedSerializer<?>> tempAllSerializers = new HashMap<>();
+ tempAllSerializers.putAll(processGroupSerializers);
+ tempAllSerializers.putAll(flowContentSerializers);
+ allSerializers = Collections.unmodifiableMap(tempAllSerializers);
+
+ final List<Integer> sortedVersions = new ArrayList<>(allSerializers.keySet());
+ sortedVersions.sort(Collections.reverseOrder(Integer::compareTo));
+ this.descendingVersions = sortedVersions;
+ }
+
+ /**
+ * Tries to read a data model version using each VersionedSerializer, in descending version order.
+ * If no version could be read from any serializer, then a SerializationException is thrown.
+ *
+ * When deserializing, clients are expected to call this method to obtain the version, then call
+ * {@method isProcessGroupVersion}, which then determines if {@method deserializeProcessGroup}
+ * should be used, or if {@method deserializeFlowContent} should be used.
+ *
+ * @param input the input stream containing serialized flow content
+ * @return the data model version from the input stream
+ * @throws SerializationException if the data model version could not be read with any serializer
+ */
+ public int readDataModelVersion(final InputStream input) throws SerializationException {
+ final InputStream markSupportedInput = input.markSupported() ? input : new BufferedInputStream(input);
+
+ // Mark the beginning of the stream.
+ markSupportedInput.mark(SerializationConstants.MAX_HEADER_BYTES);
+
+ // Try each serializer in descending version order
+ for (final int serializerVersion : descendingVersions) {
+ final VersionedSerializer<?> serializer = allSerializers.get(serializerVersion);
+ try {
+ return serializer.readDataModelVersion(markSupportedInput);
+ } catch (SerializationException e) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Unable to read the data model version due to: {}", e.getMessage());
+ }
+ continue;
+ } finally {
+ // Reset the stream position.
+ try {
+ markSupportedInput.reset();
+ } catch (IOException resetException) {
+ // Should not happen.
+ logger.error("Unable to reset the input stream.", resetException);
+ }
+ }
+ }
+
+ throw new SerializationException("Unable to read the data model version for the flow content.");
+ }
+
+ public Integer getCurrentDataModelVersion() {
+ return CURRENT_DATA_MODEL_VERSION;
+ }
+
+ public boolean isProcessGroupVersion(final int dataModelVersion) {
+ return dataModelVersion < START_USING_SNAPSHOT_VERSION;
+ }
+
+ public VersionedProcessGroup deserializeProcessGroup(final int dataModelVersion, final InputStream input) throws SerializationException {
+ final VersionedSerializer<VersionedProcessGroup> serializer = processGroupSerializers.get(dataModelVersion);
+ if (serializer == null) {
+ throw new IllegalArgumentException("No VersionedProcessGroup serializer exists for data model version: " + dataModelVersion);
+ }
+
+ return serializer.deserialize(input);
+ }
+
+ public FlowContent deserializeFlowContent(final int dataModelVersion, final InputStream input) throws SerializationException {
+ final VersionedSerializer<FlowContent> serializer = flowContentSerializers.get(dataModelVersion);
+ if (serializer == null) {
+ throw new IllegalArgumentException("No FlowContent serializer exists for data model version: " + dataModelVersion);
+ }
+
+ return serializer.deserialize(input);
+ }
+
+ public void serializeFlowContent(final FlowContent flowContent, final OutputStream out) throws SerializationException {
+ final VersionedSerializer<FlowContent> serializer = flowContentSerializers.get(CURRENT_DATA_MODEL_VERSION);
+ serializer.serialize(CURRENT_DATA_MODEL_VERSION, flowContent, out);
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java
new file mode 100644
index 0000000..9c9c384
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+public interface SerializationConstants {
+
+ int MAX_HEADER_BYTES = 1024;
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java
deleted file mode 100644
index 8d7cb7b..0000000
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.serialization;
-
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.serialization.jackson.JacksonVersionedProcessGroupSerializer;
-import org.apache.nifi.registry.serialization.jaxb.JAXBVersionedProcessGroupSerializer;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * See {@link AbstractMultiVersionSerializer} for further information.
- *
- * <p>
- * Current data model version is 2.
- * Data Model Version Histories:
- * <ul>
- * <li>version 2: Serialized by {@link JacksonVersionedProcessGroupSerializer}</li>
- * <li>version 1: Serialized by {@link JAXBVersionedProcessGroupSerializer}</li>
- * </ul>
- * </p>
- */
-@Service
-public class VersionedProcessGroupSerializer extends AbstractMultiVersionSerializer<VersionedProcessGroup> {
-
- static final Integer CURRENT_DATA_MODEL_VERSION = 2;
-
- @Override
- protected Map<Integer, VersionedSerializer<VersionedProcessGroup>> createVersionedSerializers() {
- final Map<Integer, VersionedSerializer<VersionedProcessGroup>> tempSerializers = new HashMap<>();
- tempSerializers.put(2, new JacksonVersionedProcessGroupSerializer());
- tempSerializers.put(1, new JAXBVersionedProcessGroupSerializer());
- return tempSerializers;
- }
-
- @Override
- protected int getCurrentDataModelVersion() {
- return CURRENT_DATA_MODEL_VERSION;
- }
-
-}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java
new file mode 100644
index 0000000..77dd37a
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonFlowContentSerializer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jackson;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.SerializationException;
+
+/**
+ * A Jackson serializer for FlowContent.
+ */
+public class JacksonFlowContentSerializer extends JacksonSerializer<FlowContent> {
+
+ @Override
+ TypeReference<SerializationContainer<FlowContent>> getDeserializeTypeRef() throws SerializationException {
+ return new TypeReference<SerializationContainer<org.apache.nifi.registry.serialization.FlowContent>>() {};
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
index 4098c77..d159ea3 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java
@@ -18,6 +18,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.registry.serialization.SerializationConstants;
import org.apache.nifi.registry.serialization.SerializationException;
import org.apache.nifi.registry.serialization.VersionedSerializer;
import org.slf4j.Logger;
@@ -30,8 +31,6 @@
import java.util.Collections;
import java.util.HashMap;
-import static org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer.MAX_HEADER_BYTES;
-
/**
* A Serializer that uses Jackson for serializing/deserializing.
*/
@@ -80,7 +79,7 @@
@Override
public int readDataModelVersion(InputStream input) throws SerializationException {
- final byte[] headerBytes = new byte[MAX_HEADER_BYTES];
+ final byte[] headerBytes = new byte[SerializationConstants.MAX_HEADER_BYTES];
final int readHeaderBytes;
try {
readHeaderBytes = input.read(headerBytes);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
index 21bdecc..4bd763e 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java
@@ -21,11 +21,10 @@
import org.apache.nifi.registry.serialization.SerializationException;
/**
- * A Jackson serializer for VersionedFlowSnapshots.
+ * A Jackson serializer for VersionedProcessGroups.
*/
public class JacksonVersionedProcessGroupSerializer extends JacksonSerializer<VersionedProcessGroup> {
-
@Override
TypeReference<SerializationContainer<VersionedProcessGroup>> getDeserializeTypeRef() throws SerializationException {
return new TypeReference<SerializationContainer<VersionedProcessGroup>>() {};
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
index 7a87ec0..26057ae 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -37,10 +37,10 @@
import org.apache.nifi.registry.extension.bundle.BundleVersion;
import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams;
import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata;
-import org.apache.nifi.registry.extension.component.manifest.Extension;
import org.apache.nifi.registry.extension.component.ExtensionFilterParams;
import org.apache.nifi.registry.extension.component.ExtensionMetadata;
import org.apache.nifi.registry.extension.component.TagCount;
+import org.apache.nifi.registry.extension.component.manifest.Extension;
import org.apache.nifi.registry.extension.component.manifest.ProvidedServiceAPI;
import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact;
import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket;
@@ -62,7 +62,8 @@
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate;
import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
-import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.FlowContentSerializer;
import org.apache.nifi.registry.service.alias.RegistryUrlAliasService;
import org.apache.nifi.registry.service.extension.ExtensionService;
import org.apache.nifi.registry.service.mapper.BucketMappings;
@@ -111,7 +112,7 @@
private final MetadataService metadataService;
private final FlowPersistenceProvider flowPersistenceProvider;
private final BundlePersistenceProvider bundlePersistenceProvider;
- private final Serializer<VersionedProcessGroup> processGroupSerializer;
+ private final FlowContentSerializer flowContentSerializer;
private final ExtensionService extensionService;
private final Validator validator;
private final RegistryUrlAliasService registryUrlAliasService;
@@ -124,14 +125,14 @@
public RegistryService(final MetadataService metadataService,
final FlowPersistenceProvider flowPersistenceProvider,
final BundlePersistenceProvider bundlePersistenceProvider,
- final Serializer<VersionedProcessGroup> processGroupSerializer,
+ final FlowContentSerializer flowContentSerializer,
final ExtensionService extensionService,
final Validator validator,
final RegistryUrlAliasService registryUrlAliasService) {
this.metadataService = Validate.notNull(metadataService);
this.flowPersistenceProvider = Validate.notNull(flowPersistenceProvider);
this.bundlePersistenceProvider = Validate.notNull(bundlePersistenceProvider);
- this.processGroupSerializer = Validate.notNull(processGroupSerializer);
+ this.flowContentSerializer = Validate.notNull(flowContentSerializer);
this.extensionService = Validate.notNull(extensionService);
this.validator = Validate.notNull(validator);
this.registryUrlAliasService = Validate.notNull(registryUrlAliasService);
@@ -688,7 +689,14 @@
// serialize the snapshot
final ByteArrayOutputStream out = new ByteArrayOutputStream();
registryUrlAliasService.setInternal(flowSnapshot.getFlowContents());
- processGroupSerializer.serialize(flowSnapshot.getFlowContents(), out);
+
+ final FlowContent flowContent = new FlowContent();
+ flowContent.setFlowSnapshot(flowSnapshot);
+
+ // temporarily remove the metadata so it isn't serialized, but then put it back for returning the response
+ flowSnapshot.setSnapshotMetadata(null);
+ flowContentSerializer.serializeFlowContent(flowContent, out);
+ flowSnapshot.setSnapshotMetadata(snapshotMetadata);
// save the serialized snapshot to the persistence provider
final Bucket bucket = BucketMappings.map(existingBucket);
@@ -772,9 +780,9 @@
+ flowEntity.getId() + " and version " + version);
}
- // deserialize the contents
+ // deserialize the content
final InputStream input = new ByteArrayInputStream(serializedSnapshot);
- final VersionedProcessGroup flowContents = processGroupSerializer.deserialize(input);
+ final VersionedFlowSnapshot snapshot = deserializeFlowContent(input);
// map entities to data model
final Bucket bucket = BucketMappings.map(bucketEntity);
@@ -782,15 +790,29 @@
final VersionedFlowSnapshotMetadata snapshotMetadata = FlowMappings.map(bucketEntity, snapshotEntity);
// create the snapshot to return
- final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
- registryUrlAliasService.setExternal(flowContents);
- snapshot.setFlowContents(flowContents);
+ registryUrlAliasService.setExternal(snapshot.getFlowContents());
snapshot.setSnapshotMetadata(snapshotMetadata);
snapshot.setFlow(versionedFlow);
snapshot.setBucket(bucket);
return snapshot;
}
+ private VersionedFlowSnapshot deserializeFlowContent(final InputStream input) {
+ // attempt to read the version header from the serialized content
+ final int dataModelVersion = flowContentSerializer.readDataModelVersion(input);
+
+ // determine how to do deserialize based on the data model version
+ if (flowContentSerializer.isProcessGroupVersion(dataModelVersion)) {
+ final VersionedProcessGroup processGroup = flowContentSerializer.deserializeProcessGroup(dataModelVersion, input);
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(processGroup);
+ return snapshot;
+ } else {
+ final FlowContent flowContent = flowContentSerializer.deserializeFlowContent(dataModelVersion, input);
+ return flowContent.getFlowSnapshot();
+ }
+ }
+
/**
* Returns all versions of a flow, sorted newest to oldest.
*
@@ -1009,9 +1031,12 @@
// deserialize the contents
final InputStream inputA = new ByteArrayInputStream(serializedSnapshotA);
- final VersionedProcessGroup flowContentsA = processGroupSerializer.deserialize(inputA);
+ final VersionedFlowSnapshot snapshotA = deserializeFlowContent(inputA);
+ final VersionedProcessGroup flowContentsA = snapshotA.getFlowContents();
+
final InputStream inputB = new ByteArrayInputStream(serializedSnapshotB);
- final VersionedProcessGroup flowContentsB = processGroupSerializer.deserialize(inputB);
+ final VersionedFlowSnapshot snapshotB = deserializeFlowContent(inputB);
+ final VersionedProcessGroup flowContentsB = snapshotB.getFlowContents();
final ComparableDataFlow comparableFlowA = new StandardComparableDataFlow(String.format("Version %d", older), flowContentsA);
final ComparableDataFlow comparableFlowB = new StandardComparableDataFlow(String.format("Version %d", newer), flowContentsB);
@@ -1021,13 +1046,13 @@
null, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
- VersionedFlowDifference result = new VersionedFlowDifference();
+ final VersionedFlowDifference result = new VersionedFlowDifference();
result.setBucketId(bucketIdentifier);
result.setFlowId(flowIdentifier);
result.setVersionA(older);
result.setVersionB(newer);
- Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences());
+ final Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences());
result.setComponentDifferenceGroups(differenceGroups);
return result;
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
new file mode 100644
index 0000000..21d3b43
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+public class TestFlowContentSerializer {
+
+ private FlowContentSerializer serializer;
+
+ @Before
+ public void setup() {
+ serializer = new FlowContentSerializer();
+ }
+
+ @Test
+ public void testSerializeDeserializeFlowContent() {
+ final VersionedProcessor processor1 = new VersionedProcessor();
+ processor1.setIdentifier("processor1");
+ processor1.setName("My Processor 1");
+
+ final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
+ processGroup1.setIdentifier("pg1");
+ processGroup1.setName("My Process Group");
+ processGroup1.getProcessors().add(processor1);
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(processGroup1);
+
+ final FlowContent flowContent = new FlowContent();
+ flowContent.setFlowSnapshot(snapshot);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ serializer.serializeFlowContent(flowContent, out);
+
+ //final String json = new String(out.toByteArray(), StandardCharsets.UTF_8);
+ //System.out.println(json);
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+
+ // make sure we can read the version from the input stream and it should be the current version
+ final Integer version = serializer.readDataModelVersion(in);
+ assertEquals(serializer.getCurrentDataModelVersion(), version);
+ assertEquals(false, serializer.isProcessGroupVersion(version));
+
+ // make sure we can deserialize back to FlowContent
+ final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
+ assertNotNull(deserializedFlowContent);
+
+ final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
+ assertNotNull(deserializedSnapshot);
+
+ final VersionedProcessGroup deserializedProcessGroup1 = deserializedSnapshot.getFlowContents();
+ assertNotNull(deserializedProcessGroup1);
+ assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier());
+ assertEquals(processGroup1.getName(), deserializedProcessGroup1.getName());
+
+ assertEquals(1, deserializedProcessGroup1.getProcessors().size());
+
+ final VersionedProcessor deserializedProcessor1 = deserializedProcessGroup1.getProcessors().iterator().next();
+ assertEquals(processor1.getIdentifier(), deserializedProcessor1.getIdentifier());
+ assertEquals(processor1.getName(), deserializedProcessor1.getName());
+ }
+
+ @Test
+ public void testSerializeDeserializeWithExternalServices() throws SerializationException {
+ final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
+ processGroup1.setIdentifier("pg1");
+ processGroup1.setName("My Process Group");
+
+ final ExternalControllerServiceReference serviceReference1 = new ExternalControllerServiceReference();
+ serviceReference1.setIdentifier("1");
+ serviceReference1.setName("Service 1");
+
+ final ExternalControllerServiceReference serviceReference2 = new ExternalControllerServiceReference();
+ serviceReference2.setIdentifier("2");
+ serviceReference2.setName("Service 2");
+
+ final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
+ serviceReferences.put(serviceReference1.getIdentifier(), serviceReference1);
+ serviceReferences.put(serviceReference2.getIdentifier(), serviceReference2);
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(processGroup1);
+ snapshot.setExternalControllerServices(serviceReferences);
+
+ final FlowContent flowContent = new FlowContent();
+ flowContent.setFlowSnapshot(snapshot);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ serializer.serializeFlowContent(flowContent, out);
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+
+ // make sure we can read the version from the input stream and it should be the current version
+ final Integer version = serializer.readDataModelVersion(in);
+ assertEquals(serializer.getCurrentDataModelVersion(), version);
+
+ // make sure we can deserialize back to FlowContent
+ final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
+ assertNotNull(deserializedFlowContent);
+
+ final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
+ assertNotNull(deserializedSnapshot);
+
+ final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents();
+ assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup.getIdentifier());
+ assertEquals(processGroup1.getName(), deserializedProcessGroup.getName());
+
+ final Map<String,ExternalControllerServiceReference> deserializedServiceReferences = deserializedSnapshot.getExternalControllerServices();
+ assertNotNull(deserializedServiceReferences);
+ assertEquals(2, deserializedServiceReferences.size());
+
+ final ExternalControllerServiceReference deserializedServiceReference1 = deserializedServiceReferences.get(serviceReference1.getIdentifier());
+ assertNotNull(deserializedServiceReference1);
+ assertEquals(serviceReference1.getIdentifier(), deserializedServiceReference1.getIdentifier());
+ assertEquals(serviceReference1.getName(), deserializedServiceReference1.getName());
+ }
+
+ @Test
+ public void testDeserializeJsonNonIntegerVersion() throws IOException {
+ final String file = "/serialization/json/non-integer-version.snapshot";
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ try {
+ serializer.readDataModelVersion(is);
+ fail("Should fail");
+ } catch (SerializationException e) {
+ assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testDeserializeJsonNoVersion() throws IOException {
+ final String file = "/serialization/json/no-version.snapshot";
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ try {
+ serializer.readDataModelVersion(is);
+ fail("Should fail");
+ } catch (SerializationException e) {
+ assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testDeserializeVer1() throws IOException {
+ final String file = "/serialization/ver1.snapshot";
+ final VersionedProcessGroup processGroup;
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ final Integer version = serializer.readDataModelVersion(is);
+ assertNotNull(version);
+ assertEquals(1, version.intValue());
+
+ if (serializer.isProcessGroupVersion(version)) {
+ processGroup = serializer.deserializeProcessGroup(version, is);
+ } else {
+ processGroup = null;
+ }
+ }
+
+ assertNotNull(processGroup);
+ assertNotNull(processGroup.getProcessors());
+ assertTrue(processGroup.getProcessors().size() > 0);
+ //System.out.printf("processGroup=" + processGroup);
+ }
+
+ @Test
+ public void testDeserializeVer2() throws IOException {
+ final String file = "/serialization/ver2.snapshot";
+ final VersionedProcessGroup processGroup;
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ final Integer version = serializer.readDataModelVersion(is);
+ assertNotNull(version);
+ assertEquals(2, version.intValue());
+
+ if (serializer.isProcessGroupVersion(version)) {
+ processGroup = serializer.deserializeProcessGroup(version, is);
+ } else {
+ processGroup = null;
+ }
+ }
+
+ assertNotNull(processGroup);
+ assertNotNull(processGroup.getProcessors());
+ assertTrue(processGroup.getProcessors().size() > 0);
+ //System.out.printf("processGroup=" + processGroup);
+ }
+
+ @Test
+ public void testDeserializeVer3() throws IOException {
+ final String file = "/serialization/ver3.snapshot";
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ final Integer version = serializer.readDataModelVersion(is);
+ assertNotNull(version);
+ assertEquals(3, version.intValue());
+ assertFalse(serializer.isProcessGroupVersion(version));
+
+ final FlowContent flowContent = serializer.deserializeFlowContent(version, is);
+ assertNotNull(flowContent);
+
+ final VersionedFlowSnapshot flowSnapshot = flowContent.getFlowSnapshot();
+ assertNotNull(flowSnapshot);
+
+ final VersionedProcessGroup processGroup = flowSnapshot.getFlowContents();
+ assertNotNull(processGroup);
+ assertNotNull(processGroup.getProcessors());
+ assertEquals(1, processGroup.getProcessors().size());
+ }
+ }
+
+ @Test
+ public void testDeserializeVer9999() throws IOException {
+ final String file = "/serialization/ver9999.snapshot";
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ final Integer version = serializer.readDataModelVersion(is);
+ assertNotNull(version);
+ assertEquals(9999, version.intValue());
+ assertFalse(serializer.isProcessGroupVersion(version));
+
+ try {
+ serializer.deserializeFlowContent(version, is);
+ fail("Should fail");
+ } catch (IllegalArgumentException e) {
+ assertEquals("No FlowContent serializer exists for data model version: " + version, e.getMessage());
+ }
+
+ try {
+ serializer.deserializeProcessGroup(version, is);
+ fail("Should fail");
+ } catch (IllegalArgumentException e) {
+ assertEquals("No VersionedProcessGroup serializer exists for data model version: " + version, e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testDeserializeProcessGroupAsFlowContent() throws IOException {
+ final String file = "/serialization/ver2.snapshot";
+ try (final InputStream is = this.getClass().getResourceAsStream(file)) {
+ final Integer version = serializer.readDataModelVersion(is);
+ assertNotNull(version);
+ assertEquals(2, version.intValue());
+ assertTrue(serializer.isProcessGroupVersion(version));
+
+ final VersionedProcessGroup processGroup = serializer.deserializeProcessGroup(version, is);
+ assertNotNull(processGroup);
+ }
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
deleted file mode 100644
index 145204b..0000000
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.serialization;
-
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class TestVersionedProcessGroupSerializer {
-
- @Test
- public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
- final Serializer<VersionedProcessGroup> serializer = new VersionedProcessGroupSerializer();
-
- final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
- processGroup1.setIdentifier("pg1");
- processGroup1.setName("My Process Group");
-
- final VersionedProcessor processor1 = new VersionedProcessor();
- processor1.setIdentifier("processor1");
- processor1.setName("My Processor 1");
-
- // make sure nested objects are serialized/deserialized
- processGroup1.getProcessors().add(processor1);
-
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- serializer.serialize(processGroup1, out);
-
- final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
- final VersionedProcessGroup deserializedProcessGroup1 = serializer.deserialize(in);
-
- Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier());
- Assert.assertEquals(processGroup1.getName(), deserializedProcessGroup1.getName());
-
- Assert.assertEquals(1, deserializedProcessGroup1.getProcessors().size());
-
- final VersionedProcessor deserializedProcessor1 = deserializedProcessGroup1.getProcessors().iterator().next();
- Assert.assertEquals(processor1.getIdentifier(), deserializedProcessor1.getIdentifier());
- Assert.assertEquals(processor1.getName(), deserializedProcessor1.getName());
-
- }
-
- @Test
- public void testSerializeDeserializeWithExternalServices() throws SerializationException {
- final Serializer<VersionedProcessGroup> serializer = new VersionedProcessGroupSerializer();
-
- final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
- processGroup1.setIdentifier("pg1");
- processGroup1.setName("My Process Group");
-
- final ExternalControllerServiceReference serviceReference1 = new ExternalControllerServiceReference();
- serviceReference1.setIdentifier("1");
- serviceReference1.setName("Service 1");
-
- final ExternalControllerServiceReference serviceReference2 = new ExternalControllerServiceReference();
- serviceReference2.setIdentifier("2");
- serviceReference2.setName("Service 2");
-
- final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
- serviceReferences.put(serviceReference1.getIdentifier(), serviceReference1);
- serviceReferences.put(serviceReference2.getIdentifier(), serviceReference2);
-
- processGroup1.setExternalControllerServices(serviceReferences);
-
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- serializer.serialize(processGroup1, out);
-
- final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
- final VersionedProcessGroup deserializedProcessGroup = serializer.deserialize(in);
-
- Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup.getIdentifier());
- Assert.assertEquals(processGroup1.getName(), deserializedProcessGroup.getName());
-
- final Map<String,ExternalControllerServiceReference> deserializedServiceReferences = deserializedProcessGroup.getExternalControllerServices();
- Assert.assertNotNull(deserializedServiceReferences);
- Assert.assertEquals(2, deserializedServiceReferences.size());
-
- final ExternalControllerServiceReference deserializedServiceReference1 = deserializedServiceReferences.get(serviceReference1.getIdentifier());
- Assert.assertNotNull(deserializedServiceReference1);
- Assert.assertEquals(serviceReference1.getIdentifier(), deserializedServiceReference1.getIdentifier());
- Assert.assertEquals(serviceReference1.getName(), deserializedServiceReference1.getName());
- }
-
- @Test
- public void testDeserializeJsonNonIntegerVersion() throws IOException {
- final String file = "/serialization/json/non-integer-version.snapshot";
- final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
- try (final InputStream is = this.getClass().getResourceAsStream(file)) {
- try {
- serializer.deserialize(is);
- fail("Should fail");
- } catch (SerializationException e) {
- assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
- }
- }
- }
-
- @Test
- public void testDeserializeJsonNoVersion() throws IOException {
- final String file = "/serialization/json/no-version.snapshot";
- final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
- try (final InputStream is = this.getClass().getResourceAsStream(file)) {
- try {
- serializer.deserialize(is);
- fail("Should fail");
- } catch (SerializationException e) {
- assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
- }
- }
- }
-
- @Test
- public void testDeserializeVer1() throws IOException {
- final String file = "/serialization/ver1.snapshot";
- final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
- final VersionedProcessGroup processGroup;
- try (final InputStream is = this.getClass().getResourceAsStream(file)) {
- processGroup = serializer.deserialize(is);
- }
- System.out.printf("processGroup=" + processGroup);
- }
-
- @Test
- public void testDeserializeVer2() throws IOException {
- final String file = "/serialization/ver2.snapshot";
- final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
- final VersionedProcessGroup processGroup;
- try (final InputStream is = this.getClass().getResourceAsStream(file)) {
- processGroup = serializer.deserialize(is);
- }
- System.out.printf("processGroup=" + processGroup);
- }
-
- @Test
- public void testDeserializeVer3() throws IOException {
- final String file = "/serialization/ver3.snapshot";
- final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer();
- try (final InputStream is = this.getClass().getResourceAsStream(file)) {
- try {
- serializer.deserialize(is);
- fail("Should fail");
- } catch (SerializationException e) {
- assertEquals("Unable to find a serializer compatible with the input.", e.getMessage());
- }
- }
- }
-
-}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
index baeb949..4e43645 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
@@ -31,8 +31,8 @@
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.apache.nifi.registry.serialization.Serializer;
-import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer;
+import org.apache.nifi.registry.serialization.FlowContent;
+import org.apache.nifi.registry.serialization.FlowContentSerializer;
import org.apache.nifi.registry.service.alias.RegistryUrlAliasService;
import org.apache.nifi.registry.service.extension.ExtensionService;
import org.apache.nifi.registry.service.extension.StandardExtensionService;
@@ -77,7 +77,7 @@
private MetadataService metadataService;
private FlowPersistenceProvider flowPersistenceProvider;
private BundlePersistenceProvider bundlePersistenceProvider;
- private Serializer<VersionedProcessGroup> snapshotSerializer;
+ private FlowContentSerializer flowContentSerializer;
private ExtensionService extensionService;
private Validator validator;
private RegistryUrlAliasService registryUrlAliasService;
@@ -89,7 +89,7 @@
metadataService = mock(MetadataService.class);
flowPersistenceProvider = mock(FlowPersistenceProvider.class);
bundlePersistenceProvider = mock(BundlePersistenceProvider.class);
- snapshotSerializer = mock(VersionedProcessGroupSerializer.class);
+ flowContentSerializer = mock(FlowContentSerializer.class);
extensionService = mock(StandardExtensionService.class);
registryUrlAliasService = mock(RegistryUrlAliasService.class);
@@ -97,7 +97,7 @@
validator = validatorFactory.getValidator();
registryService = new RegistryService(metadataService, flowPersistenceProvider, bundlePersistenceProvider,
- snapshotSerializer, extensionService, validator, registryUrlAliasService);
+ flowContentSerializer, extensionService, validator, registryUrlAliasService);
}
// ---------------------- Test Bucket methods ---------------------------------------------
@@ -800,7 +800,7 @@
assertNotNull(createdSnapshot.getFlow());
assertNotNull(createdSnapshot.getBucket());
- verify(snapshotSerializer, times(1)).serialize(eq(snapshot.getFlowContents()), any(OutputStream.class));
+ verify(flowContentSerializer, times(1)).serializeFlowContent(any(FlowContent.class), any(OutputStream.class));
verify(flowPersistenceProvider, times(1)).saveFlowContent(any(), any());
verify(metadataService, times(1)).createFlowSnapshot(any(FlowSnapshotEntity.class));
}
@@ -1150,8 +1150,10 @@
existingSnapshot.getVersion()
)).thenReturn(new byte[10]);
- final VersionedFlowSnapshot snapshotToDeserialize = createSnapshot();
- when(snapshotSerializer.deserialize(any(InputStream.class))).thenReturn(snapshotToDeserialize.getFlowContents());
+ final FlowContent flowContent = new FlowContent();
+ flowContent.setFlowSnapshot(createSnapshot());
+ when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(3);
+ when(flowContentSerializer.deserializeFlowContent(eq(3), any(InputStream.class))).thenReturn(flowContent);
final VersionedFlowSnapshot returnedSnapshot = registryService.getFlowSnapshot(
existingBucket.getId(), existingSnapshot.getFlowId(), existingSnapshot.getVersion());
@@ -1253,7 +1255,9 @@
final VersionedProcessGroup pgA = createVersionedProcessGroupA();
final VersionedProcessGroup pgB = createVersionedProcessGroupB();
- when(snapshotSerializer.deserialize(any())).thenReturn(pgA, pgB);
+ when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(2);
+ when(flowContentSerializer.isProcessGroupVersion(eq(2))).thenReturn(true);
+ when(flowContentSerializer.deserializeProcessGroup(eq(2),any())).thenReturn(pgA, pgB);
final VersionedFlowDifference diff = registryService.getFlowDiff(
"bucketIdentifier", "flowIdentifier", 1, 2);
@@ -1274,7 +1278,9 @@
final VersionedProcessGroup pgA = createVersionedProcessGroupA();
final VersionedProcessGroup pgB = createVersionedProcessGroupB();
- when(snapshotSerializer.deserialize(any())).thenReturn(pgA, pgB);
+ when(flowContentSerializer.readDataModelVersion(any(InputStream.class))).thenReturn(2);
+ when(flowContentSerializer.isProcessGroupVersion(eq(2))).thenReturn(true);
+ when(flowContentSerializer.deserializeProcessGroup(eq(2),any())).thenReturn(pgA, pgB);
// getFlowDiff orders the changes in ascending order of version number regardless of param order
final VersionedFlowDifference diff = registryService.getFlowDiff(
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
index 574fe56..3611014 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
+++ b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
@@ -1,6 +1,28 @@
{
- "header": {
- "dataModelVersion": 3
- },
- "content": {}
+ "header" : {
+ "dataModelVersion" : "3"
+ },
+ "content" : {
+ "flowSnapshot" : {
+ "flowContents" : {
+ "componentType" : "PROCESS_GROUP",
+ "connections" : [ ],
+ "controllerServices" : [ ],
+ "funnels" : [ ],
+ "identifier" : "pg1",
+ "inputPorts" : [ ],
+ "labels" : [ ],
+ "name" : "My Process Group",
+ "outputPorts" : [ ],
+ "processGroups" : [ ],
+ "processors" : [ {
+ "componentType" : "PROCESSOR",
+ "identifier" : "processor1",
+ "name" : "My Processor 1"
+ } ],
+ "remoteProcessGroups" : [ ],
+ "variables" : { }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot
new file mode 100644
index 0000000..743401b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/resources/serialization/ver9999.snapshot
@@ -0,0 +1,6 @@
+{
+ "header": {
+ "dataModelVersion": 9999
+ },
+ "content": {}
+}
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
index e16fa79..d5e0bc7 100644
--- a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
@@ -60,9 +60,12 @@
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
import org.apache.nifi.registry.field.Fields;
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
@@ -86,12 +89,15 @@
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -270,6 +276,9 @@
final VersionedFlow snapshotFlow = flow1;
final VersionedFlowSnapshot snapshot1 = createSnapshot(snapshotClient, snapshotFlow, 1);
+ assertNotNull(snapshot1);
+ assertNotNull(snapshot1.getSnapshotMetadata());
+ assertEquals(snapshotFlow.getIdentifier(), snapshot1.getSnapshotMetadata().getFlowIdentifier());
LOGGER.info("Created snapshot # 1 with version " + snapshot1.getSnapshotMetadata().getVersion());
final VersionedFlowSnapshot snapshot2 = createSnapshot(snapshotClient, snapshotFlow, 2);
@@ -805,6 +814,78 @@
}
+ @Test
+ public void testFlowSnapshotsWithParameterContextAndEncodingVersion() throws IOException, NiFiRegistryException {
+ // Create a bucket
+ final Bucket bucket = new Bucket();
+ bucket.setName("Test Bucket");
+
+ final Bucket createdBucket = client.getBucketClient().create(bucket);
+ assertNotNull(createdBucket);
+
+ // Create the flow
+ final VersionedFlow flow = new VersionedFlow();
+ flow.setName("My Flow");
+ flow.setBucketIdentifier(createdBucket.getIdentifier());
+
+ final VersionedFlow createdFlow = client.getFlowClient().create(flow);
+ assertNotNull(createdFlow);
+
+ // Create a param context
+ final VersionedParameter param1 = new VersionedParameter();
+ param1.setName("Param 1");
+ param1.setValue("Param 1 Value");
+ param1.setDescription("Description");
+
+ final VersionedParameter param2 = new VersionedParameter();
+ param2.setName("Param 2");
+ param2.setValue("Param 2 Value");
+ param2.setDescription("Description");
+ param2.setSensitive(true);
+
+ final VersionedParameterContext context1 = new VersionedParameterContext();
+ context1.setName("Parameter Context 1");
+ context1.setParameters(new HashSet<>(Arrays.asList(param1, param2)));
+
+ final Map<String,VersionedParameterContext> contexts = new HashMap<>();
+ contexts.put(context1.getName(), context1);
+
+ // Create an external controller service reference
+ final ExternalControllerServiceReference serviceReference = new ExternalControllerServiceReference();
+ serviceReference.setName("External Service 1");
+ serviceReference.setIdentifier(UUID.randomUUID().toString());
+
+ final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
+ serviceReferences.put(serviceReference.getIdentifier(), serviceReference);
+
+ // Create the snapshot
+ final VersionedFlowSnapshot snapshot = buildSnapshot(createdFlow, 1);
+ snapshot.setFlowEncodingVersion("2.0.0");
+ snapshot.setParameterContexts(contexts);
+ snapshot.setExternalControllerServices(serviceReferences);
+
+ final VersionedFlowSnapshot createdSnapshot = client.getFlowSnapshotClient().create(snapshot);
+ assertNotNull(createdSnapshot);
+ assertNotNull(createdSnapshot.getFlowEncodingVersion());
+ assertNotNull(createdSnapshot.getParameterContexts());
+ assertNotNull(createdSnapshot.getExternalControllerServices());
+ assertEquals(snapshot.getFlowEncodingVersion(), createdSnapshot.getFlowEncodingVersion());
+ assertEquals(1, createdSnapshot.getParameterContexts().size());
+ assertEquals(1, createdSnapshot.getExternalControllerServices().size());
+
+ // Retrieve the snapshot
+ final VersionedFlowSnapshot retrievedSnapshot = client.getFlowSnapshotClient().get(
+ createdSnapshot.getSnapshotMetadata().getFlowIdentifier(),
+ createdSnapshot.getSnapshotMetadata().getVersion());
+ assertNotNull(retrievedSnapshot);
+ assertNotNull(retrievedSnapshot.getFlowEncodingVersion());
+ assertNotNull(retrievedSnapshot.getParameterContexts());
+ assertNotNull(retrievedSnapshot.getExternalControllerServices());
+ assertEquals(snapshot.getFlowEncodingVersion(), retrievedSnapshot.getFlowEncodingVersion());
+ assertEquals(1, retrievedSnapshot.getParameterContexts().size());
+ assertEquals(1, retrievedSnapshot.getExternalControllerServices().size());
+ }
+
private void checkExtensionMetadata(Collection<ExtensionMetadata> extensions) {
extensions.forEach(e -> {
assertNotNull(e.getBundleInfo());