blob: ae7ffa7bed772ac5c1ff3ee50d1d33c85bdc451a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.util;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
import org.apache.nifi.minifi.commons.schema.ControllerServiceSchema;
import org.apache.nifi.minifi.commons.schema.FunnelSchema;
import org.apache.nifi.minifi.commons.schema.PortSchema;
import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ReportingSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringBufferInputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConfigTransformerTest {
public static final Map<String, Integer> PG_ELEMENT_ORDER_MAP = generateOrderMap(
Arrays.asList("processor", "inputPort", "outputPort", "funnel", "processGroup", "remoteProcessGroup", "connection"));
private XPathFactory xPathFactory;
private Document document;
private Element config;
private DocumentBuilder documentBuilder;
@Rule
final public TemporaryFolder tempOutputFolder = new TemporaryFolder();
@Before
public void setup() throws ParserConfigurationException {
documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
document = documentBuilder.newDocument();
config = document.createElement("config");
xPathFactory = XPathFactory.newInstance();
}
@Test
public void testNullQueuePrioritizerNotWritten() throws ConfigurationChangeException, XPathExpressionException {
ConfigTransformer.addConnection(config, new ConnectionSchema(Collections.emptyMap()), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
XPath xpath = xPathFactory.newXPath();
String expression = "connection/queuePrioritizerClass";
assertNull(xpath.evaluate(expression, config, XPathConstants.NODE));
}
@Test
public void testEmptyQueuePrioritizerNotWritten() throws ConfigurationChangeException, XPathExpressionException {
Map<String, Object> map = new HashMap<>();
map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, "");
ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
XPath xpath = xPathFactory.newXPath();
String expression = "connection/queuePrioritizerClass";
assertNull(xpath.evaluate(expression, config, XPathConstants.NODE));
}
@Test
public void testQueuePrioritizerWritten() throws ConfigurationChangeException, XPathExpressionException {
Map<String, Object> map = new HashMap<>();
map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, "org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer");
ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
XPath xpath = xPathFactory.newXPath();
String expression = "connection/queuePrioritizerClass/text()";
assertEquals("org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer", xpath.evaluate(expression, config, XPathConstants.STRING));
}
@Test
public void testReportingTasksTransform() throws Exception {
testConfigFileTransform("config-reporting-task.yml");
}
@Test
public void testProcessGroupsTransform() throws Exception {
testConfigFileTransform("config-process-groups.yml");
}
@Test
public void testFunnelsTransform() throws Exception {
testConfigFileTransform("stress-test-framework-funnel.yml");
}
@Test
public void testFunnelAndRpgTransform() throws Exception {
testConfigFileTransform("config-funnel-and-rpg.yml");
}
@Test
public void testRpgTransform() throws Exception {
testConfigFileTransform("config-multiple-RPGs.yml");
}
@Test
public void testRpgProxyNoPassTransform() throws Exception {
testConfigFileTransform("InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml");
}
@Test
public void testRpgProxyPassTransform() throws Exception {
testConfigFileTransform("InvokeHttpMiNiFiProxyPasswordTemplateTest.yml");
}
@Test
public void testRpgOutputPort() throws Exception {
testConfigFileTransform("SimpleRPGToLogAttributes.yml");
}
@Test
public void testNifiPropertiesNoOverrides() throws IOException, ConfigurationChangeException, SchemaLoaderException {
Properties pre216Properties = new Properties();
try (InputStream pre216PropertiesStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-216/nifi.properties.before")) {
pre216Properties.load(pre216PropertiesStream);
}
pre216Properties.setProperty(ConfigTransformer.NIFI_VERSION_KEY, ConfigTransformer.NIFI_VERSION);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (InputStream configStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-216/config.yml")) {
ConfigTransformer.writeNiFiProperties(SchemaLoader.loadConfigSchemaFromYaml(configStream), outputStream);
}
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(outputStream.toByteArray()));
for (String name : pre216Properties.stringPropertyNames()) {
assertEquals("Property key " + name + " doesn't match.", pre216Properties.getProperty(name), properties.getProperty(name));
}
}
@Test
public void testNifiPropertiesOverrides() throws IOException, ConfigurationChangeException, SchemaLoaderException {
Properties pre216Properties = new Properties();
try (InputStream pre216PropertiesStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-216/nifi.properties.before")) {
pre216Properties.load(pre216PropertiesStream);
}
pre216Properties.setProperty(ConfigTransformer.NIFI_VERSION_KEY, ConfigTransformer.NIFI_VERSION);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (InputStream configStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-216/configOverrides.yml")) {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(configStream);
assertTrue(configSchema.getNifiPropertiesOverrides().size() > 0);
for (Map.Entry<String, String> entry : configSchema.getNifiPropertiesOverrides().entrySet()) {
pre216Properties.setProperty(entry.getKey(), entry.getValue());
}
ConfigTransformer.writeNiFiProperties(configSchema, outputStream);
}
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(outputStream.toByteArray()));
for (String name : pre216Properties.stringPropertyNames()) {
assertEquals("Property key " + name + " doesn't match.", pre216Properties.getProperty(name), properties.getProperty(name));
}
}
@Test
public void testNifiPropertiesVariableRegistry() throws IOException, ConfigurationChangeException, SchemaLoaderException {
Properties initialProperties = new Properties();
try (InputStream pre216PropertiesStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-277/nifi.properties")) {
initialProperties.load(pre216PropertiesStream);
}
initialProperties.setProperty(ConfigTransformer.NIFI_VERSION_KEY, ConfigTransformer.NIFI_VERSION);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (InputStream configStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-277/config.yml")) {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(configStream);
ConfigTransformer.writeNiFiProperties(configSchema, outputStream);
}
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(outputStream.toByteArray()));
for (String name : initialProperties.stringPropertyNames()) {
assertEquals("Property key " + name + " doesn't match.", initialProperties.getProperty(name), properties.getProperty(name));
}
}
@Test
public void doesTransformFile() throws Exception {
File inputFile = new File("./src/test/resources/config.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformV1File() throws Exception {
File inputFile = new File("./src/test/resources/config-v1.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformInputStream() throws Exception {
File inputFile = new File("./src/test/resources/config.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnDefaultFile() throws Exception {
File inputFile = new File("./src/test/resources/default.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnMultipleProcessors() throws Exception {
File inputFile = new File("./src/test/resources/config-multiple-processors.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnMultipleRemoteProcessGroups() throws Exception {
File inputFile = new File("./src/test/resources/config-multiple-RPGs.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnMultipleInputPorts() throws Exception {
File inputFile = new File("./src/test/resources/config-multiple-input-ports.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnMinimal() throws Exception {
File inputFile = new File("./src/test/resources/config-minimal.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnProvenanceRepository() throws Exception {
File inputFile = new File("./src/test/resources/config-provenance-repository.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
String nifi = FileUtils.readFileToString(nifiPropertiesFile, Charset.defaultCharset());
assertTrue(nifi.contains("nifi.provenance.repository.implementation=org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository"));
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void doesTransformOnCustomProvenanceRepository() throws Exception {
File inputFile = new File("./src/test/resources/config-provenance-custom-repository.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
String nifi = FileUtils.readFileToString(nifiPropertiesFile, Charset.defaultCharset());
assertTrue(nifi.contains("nifi.provenance.repository.implementation=org.apache.nifi.provenance.CustomProvenanceRepository"));
nifiPropertiesFile.deleteOnExit();
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
flowXml.deleteOnExit();
}
@Test
public void handleTransformInvalidFile() throws Exception {
try {
File inputFile = new File("./src/test/resources/config-invalid.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
fail("Invalid configuration file was not detected.");
} catch (SchemaLoaderException e){
assertEquals("Provided YAML configuration is not a Map", e.getMessage());
}
}
@Test
public void handleTransformMalformedField() throws Exception {
try {
File inputFile = new File("./src/test/resources/config-malformed-field.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
fail("Invalid configuration file was not detected.");
} catch (InvalidConfigurationException e){
assertEquals("Failed to transform config file due to:['threshold' in section 'Swap' because it is found but could not be parsed as a Number]", e.getMessage());
}
}
@Test
public void handleTransformEmptyFile() throws Exception {
try {
File inputFile = new File("./src/test/resources/config-empty.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
fail("Invalid configuration file was not detected.");
} catch (SchemaLoaderException e){
assertEquals("Provided YAML configuration is not a Map", e.getMessage());
}
}
@Test
public void handleTransformFileMissingRequiredField() throws Exception {
try {
File inputFile = new File("./src/test/resources/config-missing-required-field.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
fail("Invalid configuration file was not detected.");
} catch (InvalidConfigurationException e){
assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required]", e.getMessage());
}
}
@Test
public void handleTransformFileMultipleProblems() throws Exception {
try {
File inputFile = new File("./src/test/resources/config-multiple-problems.yml");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", null);
fail("Invalid configuration file was not detected.");
} catch (InvalidConfigurationException e){
assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " +
"['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " +
"['source name' in section 'Connections' because it was not found and it is required]", e.getMessage());
}
}
@Test
public void checkSSLOverrides() throws Exception {
File inputFile = new File("./src/test/resources/MINIFI-516/config.yml");
final Properties bootstrapProperties = getTestBootstrapProperties("MINIFI-516/bootstrap.conf");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", bootstrapProperties);
// nifi.properties testing
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
nifiPropertiesFile.deleteOnExit();
// flow.xml.gz testing
File flowXml = new File("./target/flow.xml.gz");
assertTrue(flowXml.exists());
assertTrue(flowXml.canRead());
String flow = loadFlowXML(new FileInputStream(flowXml));
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
Document xml = db.parse(new StringBufferInputStream(flow));
XPath xPath = XPathFactory.newInstance().newXPath();
String result = xPath.evaluate("/flowController/rootGroup/processor/property[name = \"SSL Context Service\"]/value/text()", xml);
assertEquals(result, "SSL-Context-Service");
flowXml.deleteOnExit();
}
public void testConfigFileTransform(String configFile) throws Exception {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ConfigTransformer.writeFlowXmlFile(configSchema, outputStream);
Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema());
testReportingTasks((Element) xPathFactory.newXPath().evaluate("flowController/reportingTasks", document, XPathConstants.NODE), configSchema.getReportingTasksSchema());
}
private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException {
assertEquals(processGroupSchema.getId(), getText(element, "id"));
assertEquals(processGroupSchema.getName(), getText(element, "name"));
assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment")));
checkOrderOfChildren(element, PG_ELEMENT_ORDER_MAP);
NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getProcessors().size(), processorElements.getLength());
for (int i = 0; i < processorElements.getLength(); i++) {
testProcessor((Element) processorElements.item(i), processGroupSchema.getProcessors().get(i));
}
NodeList controllerServiceElements = (NodeList) xPathFactory.newXPath().evaluate("controllerService", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getControllerServices().size(), controllerServiceElements.getLength());
for (int i = 0; i < controllerServiceElements.getLength(); i++) {
testControllerService((Element) controllerServiceElements.item(i), processGroupSchema.getControllerServices().get(i));
}
NodeList remoteProcessGroupElements = (NodeList) xPathFactory.newXPath().evaluate("remoteProcessGroup", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getRemoteProcessGroups().size(), remoteProcessGroupElements.getLength());
for (int i = 0; i < remoteProcessGroupElements.getLength(); i++) {
testRemoteProcessGroups((Element) remoteProcessGroupElements.item(i), processGroupSchema.getRemoteProcessGroups().get(i));
}
NodeList funnelElements = (NodeList) xPathFactory.newXPath().evaluate("funnel", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getFunnels().size(), funnelElements.getLength());
for (int i = 0; i < funnelElements.getLength(); i++) {
testFunnel((Element) funnelElements.item(i), processGroupSchema.getFunnels().get(i));
}
NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getInputPortSchemas().size(), inputPortElements.getLength());
for (int i = 0; i < inputPortElements.getLength(); i++) {
testPort((Element) inputPortElements.item(i), processGroupSchema.getInputPortSchemas().get(i));
}
NodeList outputPortElements = (NodeList) xPathFactory.newXPath().evaluate("outputPort", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getOutputPortSchemas().size(), outputPortElements.getLength());
for (int i = 0; i < outputPortElements.getLength(); i++) {
testPort((Element) outputPortElements.item(i), processGroupSchema.getOutputPortSchemas().get(i));
}
NodeList processGroupElements = (NodeList) xPathFactory.newXPath().evaluate("processGroup", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getProcessGroupSchemas().size(), processGroupElements.getLength());
for (int i = 0; i < processGroupElements.getLength(); i++) {
testProcessGroup((Element) processGroupElements.item(i), processGroupSchema.getProcessGroupSchemas().get(i));
}
NodeList connectionElements = (NodeList) xPathFactory.newXPath().evaluate("connection", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getConnections().size(), connectionElements.getLength());
for (int i = 0; i < connectionElements.getLength(); i++) {
testConnection((Element) connectionElements.item(i), processGroupSchema.getConnections().get(i));
}
}
private void testProcessor(Element element, ProcessorSchema processorSchema) throws XPathExpressionException {
assertEquals(processorSchema.getId(), getText(element, "id"));
assertEquals(processorSchema.getName(), getText(element, "name"));
assertEquals(processorSchema.getProcessorClass(), getText(element, "class"));
assertEquals(processorSchema.getMaxConcurrentTasks().toString(), getText(element, "maxConcurrentTasks"));
assertEquals(processorSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
assertEquals(processorSchema.getPenalizationPeriod(), getText(element, "penalizationPeriod"));
assertEquals(processorSchema.getYieldPeriod(), getText(element, "yieldPeriod"));
assertEquals(processorSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
assertEquals(processorSchema.getRunDurationNanos().toString(), getText(element, "runDurationNanos"));
assertEquals(processorSchema.getAnnotationData(), getText(element, "annotationData"));
testProperties(element, processorSchema.getProperties());
}
private void testReportingTasks(Element element, List<ReportingSchema> reportingSchemas) throws XPathExpressionException {
NodeList taskElements = (NodeList) xPathFactory.newXPath().evaluate("reportingTask", element, XPathConstants.NODESET);
assertEquals(reportingSchemas.size(), taskElements.getLength());
for (int i = 0; i < taskElements.getLength(); i++) {
testReportingTask((Element) taskElements.item(i), reportingSchemas.get(i));
}
}
private void testReportingTask(Element element, ReportingSchema reportingSchema) throws XPathExpressionException {
assertEquals(reportingSchema.getId(), getText(element, "id"));
assertEquals(reportingSchema.getName(), getText(element, "name"));
assertEquals(reportingSchema.getComment(), getText(element, "comment"));
assertEquals(reportingSchema.getReportingClass(), getText(element, "class"));
assertEquals(reportingSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
assertEquals(reportingSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
testProperties(element, reportingSchema.getProperties());
}
private void testControllerService(Element element, ControllerServiceSchema controllerServiceSchema) throws XPathExpressionException {
assertEquals(controllerServiceSchema.getId(), getText(element, "id"));
assertEquals(controllerServiceSchema.getName(), getText(element, "name"));
assertEquals(controllerServiceSchema.getServiceClass(), getText(element, "class"));
assertEquals(controllerServiceSchema.getAnnotationData(), getText(element, "annotationData"));
testProperties(element, controllerServiceSchema.getProperties());
}
private void testRemoteProcessGroups(Element element, RemoteProcessGroupSchema remoteProcessingGroupSchema) throws XPathExpressionException {
assertEquals(remoteProcessingGroupSchema.getId(), getText(element, "id"));
assertEquals(remoteProcessingGroupSchema.getName(), getText(element, "name"));
assertEquals(remoteProcessingGroupSchema.getComment(), getText(element, "comment"));
assertEquals(remoteProcessingGroupSchema.getUrls(), getText(element, "url"));
assertEquals(remoteProcessingGroupSchema.getTimeout(), getText(element, "timeout"));
assertEquals(remoteProcessingGroupSchema.getYieldPeriod(), getText(element, "yieldPeriod"));
assertEquals(remoteProcessingGroupSchema.getTransportProtocol(), getText(element, "transportProtocol"));
assertEquals(remoteProcessingGroupSchema.getProxyHost(), getText(element, "proxyHost"));
String proxyPortText = getText(element, "proxyPort");
assertEquals(remoteProcessingGroupSchema.getProxyPort(), StringUtil.isNullOrEmpty(proxyPortText) ? null : Integer.parseInt(proxyPortText));
assertEquals(remoteProcessingGroupSchema.getProxyUser(), getText(element, "proxyUser"));
assertEquals(remoteProcessingGroupSchema.getProxyPassword(), getText(element, "proxyPassword"));
NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
assertEquals(remoteProcessingGroupSchema.getInputPorts().size(), inputPortElements.getLength());
for (int i = 0; i < inputPortElements.getLength(); i++) {
testRemotePort((Element) inputPortElements.item(i), remoteProcessingGroupSchema.getInputPorts().get(i));
}
NodeList outputPortElements = (NodeList) xPathFactory.newXPath().evaluate("outputPort", element, XPathConstants.NODESET);
assertEquals(remoteProcessingGroupSchema.getOutputPorts().size(), outputPortElements.getLength());
for (int i = 0; i < outputPortElements.getLength(); i++) {
testRemotePort((Element) outputPortElements.item(i), remoteProcessingGroupSchema.getOutputPorts().get(i));
}
}
private void testRemotePort(Element element, RemotePortSchema remoteInputPortSchema) throws XPathExpressionException {
assertEquals(remoteInputPortSchema.getId(), getText(element, "id"));
assertEquals(remoteInputPortSchema.getName(), getText(element, "name"));
assertEquals(remoteInputPortSchema.getComment(), getText(element, "comment"));
assertEquals(remoteInputPortSchema.getMax_concurrent_tasks().toString(), getText(element, "maxConcurrentTasks"));
assertEquals(remoteInputPortSchema.getUseCompression(), Boolean.parseBoolean(getText(element, "useCompression")));
}
private void testPort(Element element, PortSchema portSchema) throws XPathExpressionException {
assertEquals(portSchema.getId(), getText(element, "id"));
assertEquals(portSchema.getName(), getText(element, "name"));
assertEquals("RUNNING", getText(element, "scheduledState"));
}
private void testFunnel(Element element, FunnelSchema funnelSchema) throws XPathExpressionException {
assertEquals(funnelSchema.getId(), getText(element, "id"));
}
private void testConnection(Element element, ConnectionSchema connectionSchema) throws XPathExpressionException {
assertEquals(connectionSchema.getId(), getText(element, "id"));
assertEquals(connectionSchema.getName(), getText(element, "name"));
assertEquals(connectionSchema.getSourceId(), getText(element, "sourceId"));
assertEquals(connectionSchema.getDestinationId(), getText(element, "destinationId"));
NodeList relationshipNodes = (NodeList) xPathFactory.newXPath().evaluate("relationship", element, XPathConstants.NODESET);
Set<String> sourceRelationships = new HashSet<>();
for (int i = 0; i < relationshipNodes.getLength(); i++) {
String textContent = relationshipNodes.item(i).getTextContent();
if (!StringUtil.isNullOrEmpty(textContent)) {
sourceRelationships.add(textContent);
}
}
assertEquals(new HashSet<>(connectionSchema.getSourceRelationshipNames()), sourceRelationships);
assertEquals(connectionSchema.getMaxWorkQueueSize().toString(), getText(element, "maxWorkQueueSize"));
assertEquals(connectionSchema.getMaxWorkQueueDataSize(), getText(element, "maxWorkQueueDataSize"));
assertEquals(connectionSchema.getFlowfileExpiration(), getText(element, "flowFileExpiration"));
assertEquals(connectionSchema.getQueuePrioritizerClass(), getText(element, "queuePrioritizerClass"));
}
private void testProperties(Element element, Map<String, Object> expected) throws XPathExpressionException {
NodeList propertyElements = (NodeList) xPathFactory.newXPath().evaluate("property", element, XPathConstants.NODESET);
Map<String, String> properties = new HashMap<>();
for (int i = 0; i < propertyElements.getLength(); i++) {
Element item = (Element) propertyElements.item(i);
properties.put(getText(item, "name"), getText(item, "value"));
}
assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry<String, Object>::getKey, e -> nullToEmpty(e.getValue()))), properties);
}
@Test
public void testContentRepoOverride() throws IOException, ConfigurationChangeException, SchemaLoaderException {
Properties pre216Properties = new Properties();
try (InputStream pre216PropertiesStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-245/nifi.properties.before")) {
pre216Properties.load(pre216PropertiesStream);
}
pre216Properties.setProperty(ConfigTransformer.NIFI_VERSION_KEY, ConfigTransformer.NIFI_VERSION);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (InputStream configStream = ConfigTransformerTest.class.getClassLoader().getResourceAsStream("MINIFI-245/config.yml")) {
ConfigTransformer.writeNiFiProperties(SchemaLoader.loadConfigSchemaFromYaml(configStream), outputStream);
}
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(outputStream.toByteArray()));
for (String name : pre216Properties.stringPropertyNames()) {
// Verify the Content Repo property was overridden
if("nifi.content.repository.implementation".equals(name)) {
assertNotEquals("Property key " + name + " was not overridden.", pre216Properties.getProperty(name), properties.getProperty(name));
} else {
assertEquals("Property key " + name + " doesn't match.", pre216Properties.getProperty(name), properties.getProperty(name));
}
}
}
private String getText(Element element, String path) throws XPathExpressionException {
return (String) xPathFactory.newXPath().evaluate(path + "/text()", element, XPathConstants.STRING);
}
private String nullToEmpty(Object val) {
return val == null ? "" : val.toString();
}
private static Map<String, Integer> generateOrderMap(List<String> elements) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < elements.size(); i++) {
map.put(elements.get(i), i);
}
return Collections.unmodifiableMap(map);
}
private static void checkOrderOfChildren(Element element, Map<String, Integer> orderMap) {
int elementOrderList = 0;
NodeList childNodes = element.getChildNodes();
String lastOrderedElementName = null;
for (int i = 0; i < childNodes.getLength(); i++) {
String nodeName = childNodes.item(i).getNodeName();
Integer index = orderMap.get(nodeName);
if (index != null) {
if (elementOrderList > index) {
fail("Found " + nodeName + " after " + lastOrderedElementName + "; expected all " + nodeName + " elements to come before the following elements: " + orderMap.entrySet().stream()
.filter(e -> e.getValue() > index ).sorted(Comparator.comparingInt(e -> e.getValue())).map(e -> e.getKey()).collect(Collectors.joining(", ")));
}
lastOrderedElementName = nodeName;
elementOrderList = index;
}
}
}
public static Properties getTestBootstrapProperties(final String fileName) throws IOException {
final Properties bootstrapProperties = new Properties();
try (final InputStream fis = ConfigTransformerTest.class.getClassLoader().getResourceAsStream(fileName)) {
bootstrapProperties.load(fis);
}
return bootstrapProperties;
}
public static String loadFlowXML(InputStream compressedData) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPInputStream gzipInputStream = new GZIPInputStream(compressedData);
byte[] buffer = new byte[1024];
int len;
while ((len = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, len);
}
return byteArrayOutputStream.toString();
}
}