NIFIREG-373 Making command timeout configurable in ShellUserGroupProvider
- Changing ShellRunner to use a separate thread for reading the output of the process
- Removing unused member variable
- Staying in-sync with review feedback from NiFi side
This closes #268.
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellRunner.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellRunner.java
index ee7ef41..de38b63 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellRunner.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellRunner.java
@@ -25,6 +25,9 @@
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class ShellRunner {
@@ -32,50 +35,93 @@
static String SHELL = "sh";
static String OPTS = "-c";
- static Integer TIMEOUT = 60;
- public static List<String> runShell(String command) throws IOException {
+ private final int timeoutSeconds;
+ private final ExecutorService executor;
+
+ public ShellRunner(final int timeoutSeconds) {
+ this.timeoutSeconds = timeoutSeconds;
+ this.executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("ShellRunner");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ }
+
+ public List<String> runShell(String command) throws IOException {
return runShell(command, "<unknown>");
}
- public static List<String> runShell(String command, String description) throws IOException {
+ public List<String> runShell(String command, String description) throws IOException {
final ProcessBuilder builder = new ProcessBuilder(SHELL, OPTS, command);
- final List<String> builderCommand = builder.command();
+ builder.redirectErrorStream(true);
- logger.debug("Run Command '" + description + "': " + builderCommand);
+ final List<String> builderCommand = builder.command();
+ logger.debug("Run Command '{}': {}", new Object[]{description, builderCommand});
+
final Process proc = builder.start();
+ final List<String> lines = new ArrayList<>();
+ executor.submit(() -> {
+ try {
+ try (final Reader stdin = new InputStreamReader(proc.getInputStream());
+ final BufferedReader reader = new BufferedReader(stdin)) {
+ logger.trace("Reading process input stream...");
+
+ String line;
+ int lineCount = 0;
+ while ((line = reader.readLine()) != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace((++lineCount) + " - " + line);
+ }
+ lines.add(line.trim());
+ }
+
+ logger.trace("Finished reading process input stream");
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ });
+
boolean completed;
try {
- completed = proc.waitFor(TIMEOUT, TimeUnit.SECONDS);
+ completed = proc.waitFor(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException irexc) {
throw new IOException(irexc.getMessage(), irexc.getCause());
}
if (!completed) {
+ logger.debug("Process did not complete in allotted time, attempting to forcibly destroy process...");
+ try {
+ proc.destroyForcibly();
+ } catch (Exception e) {
+ logger.debug("Process failed to destroy: " + e.getMessage(), e);
+ }
throw new IllegalStateException("Shell command '" + command + "' did not complete during the allotted time period");
}
if (proc.exitValue() != 0) {
- try (final Reader stderr = new InputStreamReader(proc.getErrorStream());
- final BufferedReader reader = new BufferedReader(stderr)) {
- String line;
- while ((line = reader.readLine()) != null) {
- logger.warn(line.trim());
- }
- }
- throw new IOException("Command exit non-zero: " + proc.exitValue());
- }
-
- final List<String> lines = new ArrayList<>();
- try (final Reader stdin = new InputStreamReader(proc.getInputStream());
- final BufferedReader reader = new BufferedReader(stdin)) {
- String line;
- while ((line = reader.readLine()) != null) {
- lines.add(line.trim());
- }
+ throw new IOException("Process exited with non-zero value: " + proc.exitValue());
}
return lines;
}
+
+ public void shutdown() {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
+ logger.info("Failed to stop ShellRunner executor in 5 seconds. Terminating");
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellUserGroupProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellUserGroupProvider.java
index 1d709ac..4e201d2 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellUserGroupProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/shell/ShellUserGroupProvider.java
@@ -63,20 +63,23 @@
public static final String EXCLUDE_USER_PROPERTY = "Exclude Users";
public static final String EXCLUDE_GROUP_PROPERTY = "Exclude Groups";
+ public static final String COMMAND_TIMEOUT_PROPERTY = "Command Timeout";
+
+ private static final String DEFAULT_COMMAND_TIMEOUT = "60 seconds";
+
private long fixedDelay;
private Pattern excludeUsers;
private Pattern excludeGroups;
+ private int timeoutSeconds;
// Our scheduler has one thread for users, one for groups:
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
- // Our shell timeout, in seconds:
- @SuppressWarnings("FieldCanBeLocal")
- private final Integer shellTimeout = 10;
-
// Commands selected during initialization:
private ShellCommandsProvider selectedShellCommands;
+ private ShellRunner shellRunner;
+
// Start of the UserGroupProvider implementation. Javadoc strings
// copied from the interface definition for reference.
@@ -243,7 +246,13 @@
*/
@Override
public void onConfigured(AuthorizerConfigurationContext configurationContext) throws SecurityProviderCreationException {
+ logger.info("Configuring ShellUserGroupProvider");
+
fixedDelay = getDelayProperty(configurationContext, REFRESH_DELAY_PROPERTY, "5 mins");
+ timeoutSeconds = getTimeoutProperty(configurationContext, COMMAND_TIMEOUT_PROPERTY, DEFAULT_COMMAND_TIMEOUT);
+ shellRunner = new ShellRunner(timeoutSeconds);
+ logger.debug("Configured ShellRunner with command timeout of '{}' seconds", new Object[]{timeoutSeconds});
+
// Our next init step is to select the command set based on the operating system name:
ShellCommandsProvider commands = getCommandsProvider();
@@ -256,7 +265,7 @@
// Our next init step is to run the system check from that command set to determine if the other commands
// will work on this host or not.
try {
- ShellRunner.runShell(commands.getSystemCheck());
+ shellRunner.runShell(commands.getSystemCheck());
} catch (final Exception e) {
logger.error("initialize exception: " + e + " system check command: " + commands.getSystemCheck());
throw new SecurityProviderCreationException(SYS_CHECK_ERROR, e);
@@ -282,6 +291,7 @@
}
}, fixedDelay, fixedDelay, TimeUnit.MILLISECONDS);
+ logger.info("Completed configuration of ShellUserGroupProvider");
}
private static ShellCommandsProvider getCommandsProviderFromName(String osName) {
@@ -338,6 +348,26 @@
return syncInterval;
}
+ private int getTimeoutProperty(AuthorizerConfigurationContext authContext, String propertyName, String defaultValue) {
+ final PropertyValue timeoutProperty = authContext.getProperty(propertyName);
+
+ final String propertyValue;
+ if (timeoutProperty.isSet()) {
+ propertyValue = timeoutProperty.getValue();
+ } else {
+ propertyValue = defaultValue;
+ }
+
+ final long timeoutValue;
+ try {
+ timeoutValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, TimeUnit.SECONDS));
+ } catch (final IllegalArgumentException ignored) {
+ throw new SecurityProviderCreationException(String.format("The %s '%s' is not a valid time interval.", propertyName, propertyValue));
+ }
+
+ return Math.toIntExact(timeoutValue);
+ }
+
/**
* Called immediately before instance destruction for implementers to release resources.
*
@@ -347,7 +377,13 @@
public void preDestruction() throws SecurityProviderDestructionException {
try {
scheduler.shutdownNow();
- } catch (final Exception ignored) {
+ } catch (final Exception e) {
+ logger.warn("Error shutting down refresh scheduler: " + e.getMessage(), e);
+ }
+ try {
+ shellRunner.shutdown();
+ } catch (final Exception e) {
+ logger.warn("Error shutting down ShellRunner: " + e.getMessage(), e);
}
}
@@ -373,7 +409,7 @@
List<String> userLines;
try {
- userLines = ShellRunner.runShell(command, description);
+ userLines = shellRunner.runShell(command, description);
rebuildUsers(userLines, idToUser, usernameToUser, gidToUser);
} catch (final IOException ioexc) {
logger.error("refreshOneUser shell exception: " + ioexc);
@@ -407,7 +443,7 @@
List<String> groupLines;
try {
- groupLines = ShellRunner.runShell(command, description);
+ groupLines = shellRunner.runShell(command, description);
rebuildGroups(groupLines, gidToGroup);
} catch (final IOException ioexc) {
logger.error("refreshOneGroup shell exception: " + ioexc);
@@ -429,6 +465,8 @@
* other methods for record parse, extract, and object construction.
*/
private void refreshUsersAndGroups() {
+ final long startTime = System.currentTimeMillis();
+
Map<String, User> uidToUser = new HashMap<>();
Map<String, User> usernameToUser = new HashMap<>();
Map<String, User> gidToUser = new HashMap<>();
@@ -438,8 +476,8 @@
List<String> groupLines;
try {
- userLines = ShellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
- groupLines = ShellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
+ userLines = shellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
+ groupLines = shellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
} catch (final IOException ioexc) {
logger.error("refreshUsersAndGroups shell exception: " + ioexc);
return;
@@ -479,6 +517,9 @@
sortedGroups.forEach(g -> logger.trace("=== " + g.toString()));
}
}
+
+ final long endTime = System.currentTimeMillis();
+ logger.info("Refreshed users and groups, took {} seconds", (endTime - startTime) / 1000);
}
/**
@@ -547,7 +588,7 @@
try {
String groupMembersCommand = selectedShellCommands.getGroupMembers(groupName);
- List<String> memberLines = ShellRunner.runShell(groupMembersCommand);
+ List<String> memberLines = shellRunner.runShell(groupMembersCommand);
// Use the first line only, and log if the line count isn't exactly one:
if (!memberLines.isEmpty()) {
String memberLine = memberLines.get(0);
diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/authorizers.xml b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/authorizers.xml
index 38a6ee8..607814c 100644
--- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/authorizers.xml
+++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/authorizers.xml
@@ -173,6 +173,7 @@
'Refresh Delay' - duration to wait between subsequent refreshes. Default is '5 mins'.
'Exclude Groups' - regular expression used to exclude groups. Default is '', which means no groups are excluded.
'Exclude Users' - regular expression used to exclude users. Default is '', which means no users are excluded.
+ 'Command Timeout' - amount of time to wait while executing a command before timing out
-->
<!-- To enable the shell-user-group-provider remove 2 lines. This is 1 of 2.
<userGroupProvider>
@@ -181,6 +182,7 @@
<property name="Refresh Delay">5 mins</property>
<property name="Exclude Groups"></property>
<property name="Exclude Users"></property>
+ <property name="Command Timeout">60 seconds</property>
</userGroupProvider>
To enable the shell-user-group-provider remove 2 lines. This is 2 of 2. -->