| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.util; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.zip.GZIPInputStream; |
| import java.util.zip.GZIPOutputStream; |
| import javax.xml.XMLConstants; |
| import javax.xml.parsers.DocumentBuilder; |
| import javax.xml.parsers.ParserConfigurationException; |
| import javax.xml.transform.Result; |
| import javax.xml.transform.Source; |
| import javax.xml.transform.TransformerException; |
| import javax.xml.transform.TransformerFactory; |
| import javax.xml.transform.dom.DOMSource; |
| import javax.xml.transform.stream.StreamResult; |
| import javax.xml.validation.Schema; |
| import javax.xml.validation.SchemaFactory; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.nifi.controller.serialization.FlowFromDOMFactory; |
| import org.apache.nifi.security.xml.XmlUtils; |
| import org.apache.nifi.util.file.FileUtils; |
| import org.apache.nifi.web.api.dto.PortDTO; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| import org.w3c.dom.NodeList; |
| import org.xml.sax.SAXException; |
| |
| /** |
| * Parses a flow and returns the root group id and root group ports. |
| */ |
| public class FlowParser { |
| |
| private static final Logger logger = LoggerFactory.getLogger(FlowParser.class); |
| |
| private static final String FLOW_XSD = "/FlowConfiguration.xsd"; |
| |
| private final Schema flowSchema; |
| |
| public FlowParser() throws SAXException { |
| SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); |
| flowSchema = schemaFactory.newSchema(FlowParser.class.getResource(FLOW_XSD)); |
| } |
| |
| /** |
| * Extracts the root group id from the flow configuration file provided in nifi.properties, and extracts |
| * the root group input ports and output ports, and their access controls. |
| * |
| */ |
| public FlowInfo parse(final File flowConfigurationFile) { |
| if (flowConfigurationFile == null) { |
| logger.debug("Flow Configuration file was null"); |
| return null; |
| } |
| |
| // if the flow doesn't exist or is 0 bytes, then return null |
| final Path flowPath = flowConfigurationFile.toPath(); |
| try { |
| if (!Files.exists(flowPath) || Files.size(flowPath) == 0) { |
| logger.warn("Flow Configuration does not exist or was empty"); |
| return null; |
| } |
| } catch (IOException e) { |
| logger.error("An error occurred determining the size of the Flow Configuration file"); |
| return null; |
| } |
| |
| // otherwise create the appropriate input streams to read the file |
| try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ); |
| final InputStream gzipIn = new GZIPInputStream(in)) { |
| |
| final byte[] flowBytes = IOUtils.toByteArray(gzipIn); |
| if (flowBytes == null || flowBytes.length == 0) { |
| logger.warn("Could not extract root group id because Flow Configuration File was empty"); |
| return null; |
| } |
| |
| // create validating document builder |
| final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(flowSchema); |
| docBuilder.setErrorHandler(new LoggingXmlParserErrorHandler("Flow Configuration", logger)); |
| |
| // parse the flow |
| final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes)); |
| |
| // extract the root group id |
| final Element rootElement = document.getDocumentElement(); |
| |
| final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); |
| if (rootGroupElement == null) { |
| logger.warn("rootGroup element not found in Flow Configuration file"); |
| return null; |
| } |
| |
| final Element rootGroupIdElement = (Element) rootGroupElement.getElementsByTagName("id").item(0); |
| if (rootGroupIdElement == null) { |
| logger.warn("id element not found under rootGroup in Flow Configuration file"); |
| return null; |
| } |
| |
| final String rootGroupId = rootGroupIdElement.getTextContent(); |
| |
| final List<PortDTO> ports = new ArrayList<>(); |
| ports.addAll(getPorts(rootGroupElement, "inputPort")); |
| ports.addAll(getPorts(rootGroupElement, "outputPort")); |
| |
| return new FlowInfo(rootGroupId, ports); |
| |
| } catch (final SAXException | ParserConfigurationException | IOException ex) { |
| logger.error("Unable to parse flow {} due to {}", new Object[] { flowPath.toAbsolutePath(), ex }); |
| return null; |
| } |
| } |
| |
| /** |
| * Generates a {@link Document} from the flow configuration file provided |
| */ |
| public Document parseDocument(final File flowConfigurationFile) { |
| if (flowConfigurationFile == null) { |
| logger.debug("Flow Configuration file was null"); |
| return null; |
| } |
| |
| // if the flow doesn't exist or is 0 bytes, then return null |
| final Path flowPath = flowConfigurationFile.toPath(); |
| try { |
| if (!Files.exists(flowPath) || Files.size(flowPath) == 0) { |
| logger.warn("Flow Configuration does not exist or was empty"); |
| return null; |
| } |
| } catch (IOException e) { |
| logger.error("An error occurred determining the size of the Flow Configuration file"); |
| return null; |
| } |
| |
| // otherwise create the appropriate input streams to read the file |
| try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ); |
| final InputStream gzipIn = new GZIPInputStream(in)) { |
| |
| final byte[] flowBytes = IOUtils.toByteArray(gzipIn); |
| if (flowBytes == null || flowBytes.length == 0) { |
| logger.warn("Could not extract root group id because Flow Configuration File was empty"); |
| return null; |
| } |
| |
| // create validating document builder |
| final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(flowSchema); |
| docBuilder.setErrorHandler(new LoggingXmlParserErrorHandler("Flow Configuration", logger)); |
| return docBuilder.parse(new ByteArrayInputStream(flowBytes)); |
| |
| } catch (final SAXException | ParserConfigurationException | IOException ex) { |
| logger.error("Unable to parse flow {} due to {}", new Object[]{flowPath.toAbsolutePath(), ex}); |
| return null; |
| } |
| } |
| |
| /** |
| * Gets the ports that are direct children of the given element. |
| * |
| * @param element the element containing ports |
| * @param type the type of port to find (inputPort or outputPort) |
| * @return a list of PortDTOs representing the found ports |
| */ |
| private List<PortDTO> getPorts(final Element element, final String type) { |
| final List<PortDTO> ports = new ArrayList<>(); |
| |
| // add input ports |
| final List<Element> portNodeList = getChildrenByTagName(element, type); |
| for (final Element portElement : portNodeList) { |
| final PortDTO portDTO = FlowFromDOMFactory.getPort(portElement); |
| portDTO.setType(type); |
| ports.add(portDTO); |
| } |
| |
| return ports; |
| } |
| |
| /** |
| * Writes a given XML Flow out to the specified path. |
| * |
| * @param flowDocument flowDocument of the associated XML content to write to disk |
| * @param flowXmlPath path on disk to write the flow |
| * @throws IOException if there are issues in accessing the target destination for the flow |
| * @throws TransformerException if there are issues in the xml transformation process |
| */ |
| public void writeFlow(final Document flowDocument, final Path flowXmlPath) throws IOException, TransformerException { |
| final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| final Source xmlSource = new DOMSource(flowDocument); |
| final Result outputTarget = new StreamResult(outputStream); |
| TransformerFactory.newInstance().newTransformer().transform(xmlSource, outputTarget); |
| final InputStream is = new ByteArrayInputStream(outputStream.toByteArray()); |
| |
| try (final OutputStream output = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE); |
| final OutputStream gzipOut = new GZIPOutputStream(output)) { |
| FileUtils.copy(is, gzipOut); |
| } |
| } |
| |
| /** |
| * Finds child elements with the given tagName. |
| * |
| * @param element the parent element |
| * @param tagName the child element name to find |
| * @return a list of matching child elements |
| */ |
| public static List<Element> getChildrenByTagName(final Element element, final String tagName) { |
| final List<Element> matches = new ArrayList<>(); |
| final NodeList nodeList = element.getChildNodes(); |
| for (int i = 0; i < nodeList.getLength(); i++) { |
| final Node node = nodeList.item(i); |
| if (!(node instanceof Element)) { |
| continue; |
| } |
| |
| final Element child = (Element) nodeList.item(i); |
| if (child.getNodeName().equals(tagName)) { |
| matches.add(child); |
| } |
| } |
| return matches; |
| } |
| } |