blob: a332e053f64ece8ae89d093804c603386c7eaf25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TemplateManager {
private static final Logger logger = LoggerFactory.getLogger(TemplateManager.class);
private final Path directory;
private final Map<String, Template> templateMap = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final FileFilter templateFileFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().toLowerCase().endsWith(".template");
}
};
public TemplateManager(final Path storageLocation) throws IOException {
directory = storageLocation;
if (!Files.exists(directory)) {
Files.createDirectories(directory);
} else {
if (!Files.isDirectory(directory)) {
throw new IllegalArgumentException(directory.toString() + " is not a directory");
}
// use toFile().canXXX, rather than Files.is... because on Windows 7, we sometimes get the wrong result for Files.is... (running Java 7 update 9)
if (!directory.toFile().canExecute() || !directory.toFile().canWrite()) {
throw new IOException("Invalid permissions for directory " + directory.toString());
}
}
}
/**
* Adds a template to this manager. The contents of this template must be part of the current flow. This is going create a template based on a snippet of this flow. Any sensitive properties in the
* TemplateDTO will be removed.
*
* @param dto dto
* @return a copy of the given DTO
* @throws IOException if an I/O error occurs when persisting the Template
* @throws NullPointerException if the DTO is null
* @throws IllegalArgumentException if does not contain all required information, such as the template name or a processor's configuration element
*/
public Template addTemplate(final TemplateDTO dto) throws IOException {
scrubTemplate(dto.getSnippet());
return importTemplate(dto);
}
private void verifyCanImport(final TemplateDTO dto) {
// ensure the template is specified
if (dto == null || dto.getSnippet() == null) {
throw new IllegalArgumentException("Template details not specified.");
}
// ensure the name is specified
if (StringUtils.isBlank(dto.getName())) {
throw new IllegalArgumentException("Template name cannot be blank.");
}
readLock.lock();
try {
for (final Template template : templateMap.values()) {
final TemplateDTO existingDto = template.getDetails();
// ensure a template with this name doesnt already exist
if (dto.getName().equals(existingDto.getName())) {
throw new IllegalStateException(String.format("A template named '%s' already exists.", dto.getName()));
}
}
} finally {
readLock.unlock();
}
}
/**
* Clears all Templates from the TemplateManager
*
* @throws java.io.IOException ioe
*/
public void clear() throws IOException {
writeLock.lock();
try {
templateMap.clear();
final File[] files = directory.toFile().listFiles(templateFileFilter);
if (files == null) {
return;
}
for (final File file : files) {
boolean successful = false;
for (int i = 0; i < 10; i++) {
if (file.delete()) {
successful = true;
break;
}
}
if (!successful && file.exists()) {
throw new IOException("Failed to delete template file " + file.getAbsolutePath());
}
}
} finally {
writeLock.unlock();
}
}
/**
* @param id template id
* @return the template with the given id, if it exists; else, returns null
*/
public Template getTemplate(final String id) {
readLock.lock();
try {
return templateMap.get(id);
} finally {
readLock.unlock();
}
}
/**
* Loads the templates from disk
*
* @throws IOException ioe
*/
public void loadTemplates() throws IOException {
writeLock.lock();
try {
final File[] files = directory.toFile().listFiles(templateFileFilter);
if (files == null) {
return;
}
for (final File file : files) {
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
final TemplateDTO templateDto = TemplateDeserializer.deserialize(bis);
templateMap.put(templateDto.getId(), new Template(templateDto));
}
}
} finally {
writeLock.unlock();
}
}
public Template importTemplate(final TemplateDTO dto) throws IOException {
// ensure we can add this template
verifyCanImport(dto);
writeLock.lock();
try {
if (requireNonNull(dto).getId() == null) {
dto.setId(UUID.randomUUID().toString());
}
final Template template = new Template(dto);
persistTemplate(template);
templateMap.put(dto.getId(), template);
return template;
} finally {
writeLock.unlock();
}
}
/**
* Persists the given template to disk
*
* @param template template
* @throws IOException ioe
*/
private void persistTemplate(final Template template) throws IOException {
final Path path = directory.resolve(template.getDetails().getId() + ".template");
Files.write(path, TemplateSerializer.serialize(template.getDetails()), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
/**
* Scrubs the template prior to persisting in order to remove fields that shouldn't be included or are unnecessary.
*
* @param snippet snippet
*/
private void scrubTemplate(final FlowSnippetDTO snippet) {
// ensure that contents have been specified
if (snippet != null) {
// go through each processor if specified
if (snippet.getProcessors() != null) {
scrubProcessors(snippet.getProcessors());
}
// go through each connection if specified
if (snippet.getConnections() != null) {
scrubConnections(snippet.getConnections());
}
// go through each remote process group if specified
if (snippet.getRemoteProcessGroups() != null) {
scrubRemoteProcessGroups(snippet.getRemoteProcessGroups());
}
// go through each process group if specified
if (snippet.getProcessGroups() != null) {
scrubProcessGroups(snippet.getProcessGroups());
}
// go through each controller service if specified
if (snippet.getControllerServices() != null) {
scrubControllerServices(snippet.getControllerServices());
}
}
}
/**
* Scrubs process groups prior to saving.
*
* @param processGroups groups
*/
private void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) {
// go through each process group
for (final ProcessGroupDTO processGroupDTO : processGroups) {
scrubTemplate(processGroupDTO.getContents());
}
}
/**
* Scrubs processors prior to saving. This includes removing sensitive properties, validation errors, property descriptors, etc.
*
* @param processors procs
*/
private void scrubProcessors(final Set<ProcessorDTO> processors) {
// go through each processor
for (final ProcessorDTO processorDTO : processors) {
final ProcessorConfigDTO processorConfig = processorDTO.getConfig();
// ensure that some property configuration have been specified
if (processorConfig != null) {
// if properties have been specified, remove sensitive ones
if (processorConfig.getProperties() != null) {
Map<String, String> processorProperties = processorConfig.getProperties();
// look for sensitive properties and remove them
if (processorConfig.getDescriptors() != null) {
final Collection<PropertyDescriptorDTO> descriptors = processorConfig.getDescriptors().values();
for (PropertyDescriptorDTO descriptor : descriptors) {
if (descriptor.isSensitive()) {
processorProperties.put(descriptor.getName(), null);
}
}
}
}
processorConfig.setCustomUiUrl(null);
}
// remove validation errors
processorDTO.setValidationErrors(null);
processorDTO.setInputRequirement(null);
}
}
private void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) {
for (final ControllerServiceDTO serviceDTO : controllerServices) {
final Map<String, String> properties = serviceDTO.getProperties();
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
if (properties != null && descriptors != null) {
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
if (descriptor.isSensitive()) {
properties.put(descriptor.getName(), null);
}
}
}
serviceDTO.setCustomUiUrl(null);
serviceDTO.setValidationErrors(null);
}
}
/**
* Scrubs connections prior to saving. This includes removing available relationships.
*
* @param connections conns
*/
private void scrubConnections(final Set<ConnectionDTO> connections) {
// go through each connection
for (final ConnectionDTO connectionDTO : connections) {
connectionDTO.setAvailableRelationships(null);
scrubConnectable(connectionDTO.getSource());
scrubConnectable(connectionDTO.getDestination());
}
}
/**
* Remove unnecessary fields in connectables prior to saving.
*
* @param connectable connectable
*/
private void scrubConnectable(final ConnectableDTO connectable) {
if (connectable != null) {
connectable.setComments(null);
connectable.setExists(null);
connectable.setRunning(null);
connectable.setTransmitting(null);
connectable.setName(null);
}
}
/**
* Remove unnecessary fields in remote groups prior to saving.
*
* @param remoteGroups groups
*/
private void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) {
// go through each remote process group
for (final RemoteProcessGroupDTO remoteProcessGroupDTO : remoteGroups) {
remoteProcessGroupDTO.setFlowRefreshed(null);
remoteProcessGroupDTO.setInputPortCount(null);
remoteProcessGroupDTO.setOutputPortCount(null);
remoteProcessGroupDTO.setTransmitting(null);
// if this remote process group has contents
if (remoteProcessGroupDTO.getContents() != null) {
RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
// scrub any remote input ports
if (contents.getInputPorts() != null) {
scrubRemotePorts(contents.getInputPorts());
}
// scrub and remote output ports
if (contents.getOutputPorts() != null) {
scrubRemotePorts(contents.getOutputPorts());
}
}
}
}
/**
* Remove unnecessary fields in remote ports prior to saving.
*
* @param remotePorts ports
*/
private void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) {
for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) {
final RemoteProcessGroupPortDTO remotePortDTO = remotePortIter.next();
// if the flow is not connected to this remote port, remove it
if (remotePortDTO.isConnected() == null || !remotePortDTO.isConnected().booleanValue()) {
remotePortIter.remove();
continue;
}
remotePortDTO.setExists(null);
remotePortDTO.setTargetRunning(null);
}
}
/**
* Removes the template with the given ID.
*
* @param id the ID of the template to remove
* @throws NullPointerException if the argument is null
* @throws IllegalStateException if no template exists with the given ID
* @throws IOException if template could not be removed
*/
public void removeTemplate(final String id) throws IOException, IllegalStateException {
writeLock.lock();
try {
final Template removed = templateMap.remove(requireNonNull(id));
// ensure the template exists
if (removed == null) {
throw new IllegalStateException("No template with ID " + id + " exists");
} else {
try {
// remove the template from the archive directory
final Path path = directory.resolve(removed.getDetails().getId() + ".template");
Files.delete(path);
} catch (final NoSuchFileException e) {
logger.warn(String.format("Template file for template %s not found when attempting to remove. Continuing...", id));
} catch (final IOException e) {
logger.error(String.format("Unable to remove template file for template %s.", id));
// since the template file existed and we were unable to remove it, rollback
// by returning it to the template map
templateMap.put(id, removed);
// rethrow
throw e;
}
}
} finally {
writeLock.unlock();
}
}
public Set<Template> getTemplates() {
readLock.lock();
try {
return new HashSet<>(templateMap.values());
} finally {
readLock.unlock();
}
}
public static List<Template> parseBytes(final byte[] bytes) {
final List<Template> templates = new ArrayList<>();
try (final InputStream rawIn = new ByteArrayInputStream(bytes);
final DataInputStream in = new DataInputStream(rawIn)) {
while (isMoreData(in)) {
final int length = in.readInt();
final byte[] buffer = new byte[length];
StreamUtils.fillBuffer(in, buffer, true);
final TemplateDTO dto = TemplateDeserializer.deserialize(new ByteArrayInputStream(buffer));
templates.add(new Template(dto));
}
} catch (final IOException e) {
throw new RuntimeException("Could not parse bytes", e); // won't happen because of the types of streams being used
}
return templates;
}
private static boolean isMoreData(final InputStream in) throws IOException {
in.mark(1);
final int nextByte = in.read();
if (nextByte == -1) {
return false;
}
in.reset();
return true;
}
public byte[] export() {
readLock.lock();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
for (final Template template : templateMap.values()) {
final TemplateDTO dto = template.getDetails();
final byte[] bytes = TemplateSerializer.serialize(dto);
dos.writeInt(bytes.length);
dos.write(bytes);
}
return baos.toByteArray();
} catch (final IOException e) {
// won't happen
return null;
} finally {
readLock.unlock();
}
}
}