blob: ffc8032f9d3c9f632ca79a205a91c846e4b08677 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"command", "process", "source", "external", "invoke", "script"})
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
+ "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
+ "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.")
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
@Restricted(
restrictions = {
@Restriction(
requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
}
)
@WritesAttributes({
@WritesAttribute(attribute = "command", description = "Executed command"),
@WritesAttribute(attribute = "command.arguments", description = "Arguments of the command")
})
public class ExecuteProcess extends AbstractProcessor {
final static String ATTRIBUTE_COMMAND = "command";
final static String ATTRIBUTE_COMMAND_ARGS = "command.arguments";
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
.name("Command")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
.name("Command Arguments")
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
.name("Working Directory")
.description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
.required(false)
.build();
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
.name("Batch Duration")
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
.name("Redirect Error Stream")
.description("If true will redirect any error stream output of the process to the output stream. "
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
.name("Argument Delimiter")
.description("Delimiter to use to separate arguments for a command [default: space]. Must be a single character.")
.addValidator(Validator.VALID)
.addValidator(characterValidator)
.required(true)
.defaultValue(" ")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All created FlowFiles are routed to this relationship")
.build();
private volatile Process externalProcess;
private volatile ExecutorService executor;
private Future<?> longRunningProcess;
private AtomicBoolean failure = new AtomicBoolean(false);
private volatile ProxyOutputStream proxyOut;
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(COMMAND);
properties.add(COMMAND_ARGUMENTS);
properties.add(BATCH_DURATION);
properties.add(REDIRECT_ERROR_STREAM);
properties.add(WORKING_DIR);
properties.add(ARG_DELIMITER);
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
@OnScheduled
public void setupExecutor(final ProcessContext context) {
executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setName("ExecuteProcess " + getIdentifier() + " Task");
return t;
}
});
}
@OnUnscheduled
public void shutdownExecutor() {
try {
executor.shutdown();
} finally {
if (this.externalProcess.isAlive()) {
this.getLogger().info("Process hasn't terminated, forcing the interrupt");
this.externalProcess.destroyForcibly();
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (proxyOut==null) {
proxyOut = new ProxyOutputStream(getLogger());
}
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
final String command = context.getProperty(COMMAND).evaluateAttributeExpressions().getValue();
final String arguments = context.getProperty(COMMAND_ARGUMENTS).isSet()
? context.getProperty(COMMAND_ARGUMENTS).evaluateAttributeExpressions().getValue()
: null;
final List<String> commandStrings = createCommandStrings(context, command, arguments);
final String commandString = StringUtils.join(commandStrings, " ");
if (longRunningProcess == null || longRunningProcess.isDone()) {
try {
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
} catch (final IOException ioe) {
getLogger().error("Failed to create process due to {}", new Object[] { ioe });
context.yield();
return;
}
} else {
getLogger().info("Read from long running process");
}
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
longRunningProcess.cancel(true);
return;
}
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
// or until the specified amount of time
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream flowFileOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
proxyOut.setDelegate(out);
if (batchNanos == null) {
// we are not creating batches; wait until process terminates.
// NB!!! Maybe get(long timeout, TimeUnit unit) should
// be used to avoid waiting forever.
try {
longRunningProcess.get();
} catch (final InterruptedException ie) {
} catch (final ExecutionException ee) {
getLogger().error("Process execution failed due to {}", new Object[] { ee.getCause() });
}
} else {
// wait the allotted amount of time.
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {
}
}
proxyOut.setDelegate(null); // prevent from writing to this
// stream
}
}
});
if (flowFile.getSize() == 0L) {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if (failure.get()) {
// If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else {
// add command and arguments as attribute
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND, command);
if(arguments != null) {
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND_ARGS, arguments);
}
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[] { flowFile });
session.transfer(flowFile, REL_SUCCESS);
}
}
protected List<String> createCommandStrings(final ProcessContext context, final String command, final String arguments) {
final List<String> args = ArgumentUtils.splitArgs(arguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0));
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
commandStrings.add(command);
commandStrings.addAll(args);
return commandStrings;
}
protected Future<?> launchProcess(final ProcessContext context, final List<String> commandStrings, final Long batchNanos,
final ProxyOutputStream proxyOut) throws IOException {
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
final String workingDirName = context.getProperty(WORKING_DIR).evaluateAttributeExpressions().getValue();
if (workingDirName != null) {
builder.directory(new File(workingDirName));
}
final Map<String, String> environment = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
environment.put(entry.getKey().getName(), entry.getValue());
}
}
if (!environment.isEmpty()) {
builder.environment().putAll(environment);
}
getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings });
this.externalProcess = builder.redirectErrorStream(redirectErrorStream).start();
// Submit task to read error stream from process
if (!redirectErrorStream) {
executor.submit(new Runnable() {
@Override
public void run() {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getErrorStream()))) {
reader.lines().filter(line -> line != null && line.length() > 0).forEach(getLogger()::warn);
} catch (final IOException ioe) {
}
}
});
}
// Submit task to read output of Process and write to FlowFile.
failure = new AtomicBoolean(false);
final Future<?> future = executor.submit(new Callable<Object>() {
@Override
public Object call() throws IOException {
try {
if (batchNanos == null) {
// if we aren't batching, just copy the stream from the
// process to the flowfile.
try (final BufferedInputStream bufferedIn = new BufferedInputStream(externalProcess.getInputStream())) {
final byte[] buffer = new byte[4096];
int len;
while ((len = bufferedIn.read(buffer)) > 0) {
// NB!!!! Maybe all data should be read from
// input stream in case of !isScheduled() to
// avoid subprocess deadlock?
// (we just don't write data to proxyOut)
// Or because we don't use this subprocess
// anymore anyway, we don't care?
if (!isScheduled()) {
return null;
}
proxyOut.write(buffer, 0, len);
}
}
} else {
// we are batching, which means that the output of the
// process is text. It doesn't make sense to grab
// arbitrary batches of bytes from some process and send
// it along as a piece of data, so we assume that
// setting a batch during means text.
// Also, we don't want that text to get split up in the
// middle of a line, so we use BufferedReader
// to read lines of text and write them as lines of text.
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (!isScheduled()) {
return null;
}
proxyOut.write((line + "\n").getBytes(StandardCharsets.UTF_8));
}
}
}
} catch (final IOException ioe) {
failure.set(true);
throw ioe;
} finally {
try {
// Since we are going to exit anyway, one sec gives it an extra chance to exit gracefully.
// In the future consider exposing it via configuration.
boolean terminated = externalProcess.waitFor(1000, TimeUnit.MILLISECONDS);
int exitCode = terminated ? externalProcess.exitValue() : -9999;
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
return null;
}
});
return future;
}
/**
* Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed
*/
private static class ProxyOutputStream extends OutputStream {
private final ComponentLog logger;
private final Lock lock = new ReentrantLock();
private OutputStream delegate;
public ProxyOutputStream(final ComponentLog logger) {
this.logger = logger;
}
public void setDelegate(final OutputStream delegate) {
lock.lock();
try {
logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate});
this.delegate = delegate;
} finally {
lock.unlock();
}
}
private void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
@Override
public void write(final int b) throws IOException {
lock.lock();
try {
while (true) {
if (delegate != null) {
logger.trace("Writing to {}", new Object[]{delegate});
delegate.write(b);
return;
} else {
lock.unlock();
sleep(1L);
lock.lock();
}
}
} finally {
lock.unlock();
}
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
lock.lock();
try {
while (true) {
if (delegate != null) {
logger.trace("Writing to {}", new Object[]{delegate});
delegate.write(b, off, len);
return;
} else {
lock.unlock();
sleep(1L);
lock.lock();
}
}
} finally {
lock.unlock();
}
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void close() throws IOException {
}
@Override
public void flush() throws IOException {
lock.lock();
try {
while (true) {
if (delegate != null) {
delegate.flush();
return;
} else {
lock.unlock();
sleep(1L);
lock.lock();
}
}
} finally {
lock.unlock();
}
}
}
}