blob: 21d3b43299d96ea7d78c2cfe76f1fd37e250da9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.serialization;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
public class TestFlowContentSerializer {
private FlowContentSerializer serializer;
@Before
public void setup() {
serializer = new FlowContentSerializer();
}
@Test
public void testSerializeDeserializeFlowContent() {
final VersionedProcessor processor1 = new VersionedProcessor();
processor1.setIdentifier("processor1");
processor1.setName("My Processor 1");
final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
processGroup1.setIdentifier("pg1");
processGroup1.setName("My Process Group");
processGroup1.getProcessors().add(processor1);
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
snapshot.setFlowContents(processGroup1);
final FlowContent flowContent = new FlowContent();
flowContent.setFlowSnapshot(snapshot);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.serializeFlowContent(flowContent, out);
//final String json = new String(out.toByteArray(), StandardCharsets.UTF_8);
//System.out.println(json);
final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
// make sure we can read the version from the input stream and it should be the current version
final Integer version = serializer.readDataModelVersion(in);
assertEquals(serializer.getCurrentDataModelVersion(), version);
assertEquals(false, serializer.isProcessGroupVersion(version));
// make sure we can deserialize back to FlowContent
final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
assertNotNull(deserializedFlowContent);
final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
assertNotNull(deserializedSnapshot);
final VersionedProcessGroup deserializedProcessGroup1 = deserializedSnapshot.getFlowContents();
assertNotNull(deserializedProcessGroup1);
assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier());
assertEquals(processGroup1.getName(), deserializedProcessGroup1.getName());
assertEquals(1, deserializedProcessGroup1.getProcessors().size());
final VersionedProcessor deserializedProcessor1 = deserializedProcessGroup1.getProcessors().iterator().next();
assertEquals(processor1.getIdentifier(), deserializedProcessor1.getIdentifier());
assertEquals(processor1.getName(), deserializedProcessor1.getName());
}
@Test
public void testSerializeDeserializeWithExternalServices() throws SerializationException {
final VersionedProcessGroup processGroup1 = new VersionedProcessGroup();
processGroup1.setIdentifier("pg1");
processGroup1.setName("My Process Group");
final ExternalControllerServiceReference serviceReference1 = new ExternalControllerServiceReference();
serviceReference1.setIdentifier("1");
serviceReference1.setName("Service 1");
final ExternalControllerServiceReference serviceReference2 = new ExternalControllerServiceReference();
serviceReference2.setIdentifier("2");
serviceReference2.setName("Service 2");
final Map<String,ExternalControllerServiceReference> serviceReferences = new HashMap<>();
serviceReferences.put(serviceReference1.getIdentifier(), serviceReference1);
serviceReferences.put(serviceReference2.getIdentifier(), serviceReference2);
final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
snapshot.setFlowContents(processGroup1);
snapshot.setExternalControllerServices(serviceReferences);
final FlowContent flowContent = new FlowContent();
flowContent.setFlowSnapshot(snapshot);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.serializeFlowContent(flowContent, out);
final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
// make sure we can read the version from the input stream and it should be the current version
final Integer version = serializer.readDataModelVersion(in);
assertEquals(serializer.getCurrentDataModelVersion(), version);
// make sure we can deserialize back to FlowContent
final FlowContent deserializedFlowContent = serializer.deserializeFlowContent(version, in);
assertNotNull(deserializedFlowContent);
final VersionedFlowSnapshot deserializedSnapshot = deserializedFlowContent.getFlowSnapshot();
assertNotNull(deserializedSnapshot);
final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents();
assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup.getIdentifier());
assertEquals(processGroup1.getName(), deserializedProcessGroup.getName());
final Map<String,ExternalControllerServiceReference> deserializedServiceReferences = deserializedSnapshot.getExternalControllerServices();
assertNotNull(deserializedServiceReferences);
assertEquals(2, deserializedServiceReferences.size());
final ExternalControllerServiceReference deserializedServiceReference1 = deserializedServiceReferences.get(serviceReference1.getIdentifier());
assertNotNull(deserializedServiceReference1);
assertEquals(serviceReference1.getIdentifier(), deserializedServiceReference1.getIdentifier());
assertEquals(serviceReference1.getName(), deserializedServiceReference1.getName());
}
@Test
public void testDeserializeJsonNonIntegerVersion() throws IOException {
final String file = "/serialization/json/non-integer-version.snapshot";
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
try {
serializer.readDataModelVersion(is);
fail("Should fail");
} catch (SerializationException e) {
assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
}
}
}
@Test
public void testDeserializeJsonNoVersion() throws IOException {
final String file = "/serialization/json/no-version.snapshot";
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
try {
serializer.readDataModelVersion(is);
fail("Should fail");
} catch (SerializationException e) {
assertEquals("Unable to read the data model version for the flow content.", e.getMessage());
}
}
}
@Test
public void testDeserializeVer1() throws IOException {
final String file = "/serialization/ver1.snapshot";
final VersionedProcessGroup processGroup;
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
final Integer version = serializer.readDataModelVersion(is);
assertNotNull(version);
assertEquals(1, version.intValue());
if (serializer.isProcessGroupVersion(version)) {
processGroup = serializer.deserializeProcessGroup(version, is);
} else {
processGroup = null;
}
}
assertNotNull(processGroup);
assertNotNull(processGroup.getProcessors());
assertTrue(processGroup.getProcessors().size() > 0);
//System.out.printf("processGroup=" + processGroup);
}
@Test
public void testDeserializeVer2() throws IOException {
final String file = "/serialization/ver2.snapshot";
final VersionedProcessGroup processGroup;
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
final Integer version = serializer.readDataModelVersion(is);
assertNotNull(version);
assertEquals(2, version.intValue());
if (serializer.isProcessGroupVersion(version)) {
processGroup = serializer.deserializeProcessGroup(version, is);
} else {
processGroup = null;
}
}
assertNotNull(processGroup);
assertNotNull(processGroup.getProcessors());
assertTrue(processGroup.getProcessors().size() > 0);
//System.out.printf("processGroup=" + processGroup);
}
@Test
public void testDeserializeVer3() throws IOException {
final String file = "/serialization/ver3.snapshot";
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
final Integer version = serializer.readDataModelVersion(is);
assertNotNull(version);
assertEquals(3, version.intValue());
assertFalse(serializer.isProcessGroupVersion(version));
final FlowContent flowContent = serializer.deserializeFlowContent(version, is);
assertNotNull(flowContent);
final VersionedFlowSnapshot flowSnapshot = flowContent.getFlowSnapshot();
assertNotNull(flowSnapshot);
final VersionedProcessGroup processGroup = flowSnapshot.getFlowContents();
assertNotNull(processGroup);
assertNotNull(processGroup.getProcessors());
assertEquals(1, processGroup.getProcessors().size());
}
}
@Test
public void testDeserializeVer9999() throws IOException {
final String file = "/serialization/ver9999.snapshot";
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
final Integer version = serializer.readDataModelVersion(is);
assertNotNull(version);
assertEquals(9999, version.intValue());
assertFalse(serializer.isProcessGroupVersion(version));
try {
serializer.deserializeFlowContent(version, is);
fail("Should fail");
} catch (IllegalArgumentException e) {
assertEquals("No FlowContent serializer exists for data model version: " + version, e.getMessage());
}
try {
serializer.deserializeProcessGroup(version, is);
fail("Should fail");
} catch (IllegalArgumentException e) {
assertEquals("No VersionedProcessGroup serializer exists for data model version: " + version, e.getMessage());
}
}
}
@Test
public void testDeserializeProcessGroupAsFlowContent() throws IOException {
final String file = "/serialization/ver2.snapshot";
try (final InputStream is = this.getClass().getResourceAsStream(file)) {
final Integer version = serializer.readDataModelVersion(is);
assertNotNull(version);
assertEquals(2, version.intValue());
assertTrue(serializer.isProcessGroupVersion(version));
final VersionedProcessGroup processGroup = serializer.deserializeProcessGroup(version, is);
assertNotNull(processGroup);
}
}
}