DRILL-8359: Add mount and unmount command support to the filesystem plugin (#2713)
* Remove duplicated call to StoragePlugin.start() in ClassicConnectorLocator.
* Handle an additional error message format in serverMessage.js.
* Add a boot option that disables mount commands by default for security.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 740f899..c73168a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -212,6 +212,7 @@
public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+ public static final String FILE_PLUGIN_MOUNT_COMMANDS = "drill.exec.storage.file.enable_mount_commands";
public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
public static final String HTTP_ENABLE = "drill.exec.http.enabled";
public static final String HTTP_MAX_PROFILES = "drill.exec.http.max_profiles";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 4613693..03b43c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -93,6 +93,12 @@
public void start() throws IOException { }
@Override
+ public void onEnabled() throws Exception { }
+
+ @Override
+ public void onDisabled() throws Exception { }
+
+ @Override
public void close() throws Exception { }
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
index 5b36207..8d99d1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
@@ -271,9 +271,8 @@
}
try {
plugin = constructor.newInstance(pluginConfig, context.drillbitContext(), name);
- plugin.start();
return plugin;
- } catch (ReflectiveOperationException | IOException e) {
+ } catch (ReflectiveOperationException e) {
Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
if (t instanceof ExecutionSetupException) {
throw ((ExecutionSetupException) t);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 96b75f6..9a51ddc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -48,6 +48,20 @@
void start() throws IOException;
/**
+ * Lifecycle method allowing the plugin to perform operations when it has been enabled.
+ * @throws Exception in the event of an error. The exception will be propagated
+ * but the enabling of the plugin will _not_ be rolled back.
+ */
+ void onEnabled() throws Exception;
+
+ /**
+ * Lifecycle method allowing the plugin to perform operations when it has been disabled.
+ * @throws Exception in the event of an error. The exception will be propagated
+ * but the disabling of the plugin will _not_ be rolled back.
+ */
+ void onDisabled() throws Exception;
+
+ /**
* Indicates if Drill can read the table from this format.
*/
boolean supportsRead();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 64d87f3..c636612 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -415,9 +415,10 @@
@Override
public void validatedPut(String name, StoragePluginConfig config)
throws PluginException {
-
+ Exception lifecycleException = null;
name = validateName(name);
PluginHandle oldEntry;
+
if (config.isEnabled()) {
PluginHandle entry = restoreFromEphemeral(name, config);
try {
@@ -431,11 +432,40 @@
+ "Please switch to Logs panel from the UI then check the log.", name), e);
}
oldEntry = pluginCache.put(entry);
+ try {
+ if (oldEntry == null || !oldEntry.config().isEnabled()) {
+ // entry has the new plugin config attached to it so is appropriate for onEnabled
+ entry.plugin().onEnabled();
+ }
+ } catch (Exception e) {
+ // Store the exception to be thrown only once we complete the validatePut logic
+ lifecycleException = e;
+ }
} else {
oldEntry = pluginCache.remove(name);
+ try {
+ if (oldEntry != null && oldEntry.config().isEnabled()) {
+ // oldEntry has the old plugin config attached to it so is appropriate for onDisabled
+ oldEntry.plugin().onDisabled();
+ }
+ } catch (Exception e) {
+ // Store the exception to be thrown only once we complete the validatePut logic
+ lifecycleException = e;
+ }
}
moveToEphemeral(oldEntry);
pluginStore.put(name, config);
+
+ if (lifecycleException != null) {
+ throw new PluginException(
+ String.format(
+ "A lifecycle method in plugin %s failed. The initiating plugin " +
+ "config update has not been rolled back.",
+ name
+ ),
+ lifecycleException
+ );
+ }
}
@Override
@@ -881,7 +911,7 @@
ephemeralPlugins.invalidateAll();
pluginCache.close();
pluginStore.close();
- locators.stream().forEach(loc -> loc.close());
+ locators.forEach(loc -> loc.close());
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index d9cc120..eb80abe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -54,6 +54,7 @@
public static final String NAME = "file";
private final String connection;
+ private final List<String> mountCommand, unmountCommand;
private final Map<String, String> config;
private final Map<String, WorkspaceConfig> workspaces;
private final Map<String, FormatPluginConfig> formats;
@@ -65,7 +66,7 @@
Map<String, FormatPluginConfig> formats,
OAuthConfig oAuthConfig,
CredentialsProvider credentialsProvider) {
- this(connection, config, workspaces, formats, oAuthConfig, null,
+ this(connection, null, null, config, workspaces, formats, oAuthConfig, null,
credentialsProvider);
}
@@ -74,12 +75,14 @@
Map<String, WorkspaceConfig> workspaces,
Map<String, FormatPluginConfig> formats,
CredentialsProvider credentialsProvider) {
- this(connection, config, workspaces, formats, null, null,
+ this(connection, null, null, config, workspaces, formats, null, null,
credentialsProvider);
}
@JsonCreator
public FileSystemConfig(@JsonProperty("connection") String connection,
+ @JsonProperty("mountCommand") List<String> mountCommand,
+ @JsonProperty("unmountCommand") List<String> unmountCommand,
@JsonProperty("config") Map<String, String> config,
@JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
@JsonProperty("formats") Map<String, FormatPluginConfig> formats,
@@ -88,6 +91,8 @@
@JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
super(credentialsProvider, credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER), oAuthConfig);
this.connection = connection;
+ this.mountCommand = mountCommand;
+ this.unmountCommand = unmountCommand;
// Force creation of an empty map so that configs compare equal
Builder<String, String> builder = ImmutableMap.builder();
@@ -108,6 +113,16 @@
}
@JsonProperty
+ public List<String> getMountCommand() {
+ return mountCommand;
+ }
+
+ @JsonProperty
+ public List<String> getUnmountCommand() {
+ return unmountCommand;
+ }
+
+ @JsonProperty
public Map<String, String> getConfig() {
return config;
}
@@ -142,6 +157,8 @@
}
FileSystemConfig other = (FileSystemConfig) obj;
return Objects.equals(connection, other.connection) &&
+ Objects.equals(mountCommand, other.mountCommand) &&
+ Objects.equals(unmountCommand, other.unmountCommand) &&
Objects.equals(config, other.config) &&
Objects.equals(formats, other.formats) &&
Objects.equals(workspaces, other.workspaces);
@@ -151,6 +168,8 @@
public String toString() {
return new PlanStringBuilder(this)
.field("connection", connection)
+ .field("mountCommand", mountCommand)
+ .field("unmountCommand", unmountCommand)
.field("config", config)
.field("formats", formats)
.field("workspaces", workspaces)
@@ -184,9 +203,18 @@
formatsCopy = formatsCopy == null ? new LinkedHashMap<>() : formatsCopy;
formatsCopy.putAll(newFormats);
}
- FileSystemConfig newConfig =
- new FileSystemConfig(connection, configCopy, workspaces, formatsCopy, oAuthConfig,
- authMode.name(), credentialsProvider);
+
+ FileSystemConfig newConfig = new FileSystemConfig(
+ connection,
+ mountCommand,
+ unmountCommand,
+ configCopy,
+ workspaces,
+ formatsCopy,
+ oAuthConfig,
+ authMode.name(),
+ credentialsProvider
+ );
newConfig.setEnabled(isEnabled());
return newConfig;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 4daf30d..0f0f353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,8 +34,10 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -61,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.drill.exec.ExecConstants.FILE_PLUGIN_MOUNT_COMMANDS;
+
/**
* A Storage engine associated with a Hadoop FileSystem Implementation. Examples
* include HDFS, MapRFS, QuantacastFileSystem, LocalFileSystem, as well Apache
@@ -86,6 +91,7 @@
private final FileSystemConfig config;
private final Configuration fsConf;
private TokenRegistry tokenRegistry;
+ private final boolean mountCommandsEnabled;
public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
super(context, name);
@@ -142,6 +148,7 @@
}
this.schemaFactory = new FileSystemSchemaFactory(name, factories);
+ this.mountCommandsEnabled = context.getConfig().getBoolean(FILE_PLUGIN_MOUNT_COMMANDS);
} catch (IOException e) {
throw new ExecutionSetupException("Failure setting up file system plugin.", e);
}
@@ -332,4 +339,100 @@
OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
tokenRegistry = tokenProvider.getOauthTokenRegistry(null);
}
+
+ /**
+ * Runs the configured mount command if mount commands are enabled
+ * and the command is not empty.
+ * @return true if the configured mount command was executed
+ */
+ private synchronized boolean mount() {
+ List<String> mountCmd = config.getMountCommand();
+ if (mountCmd == null || mountCmd.isEmpty()) {
+ return false;
+ }
+ if (!mountCommandsEnabled) {
+ throw UserException.permissionError()
+ .message(
+ "A mount command has been configured but mount commands are disabled, see %s",
+ FILE_PLUGIN_MOUNT_COMMANDS
+ )
+ .build(logger);
+ }
+
+ try {
+ Process proc = Runtime.getRuntime().exec(mountCmd.toArray(new String[0]));
+ if (proc.waitFor() != 0) {
+ String stderrOutput = IOUtils.toString(proc.getErrorStream(), StandardCharsets.UTF_8);
+ throw new IOException(stderrOutput);
+ }
+ logger.info("The mount command for plugin {} succeeded.", getName());
+ return true;
+ } catch (IOException | InterruptedException e) {
+ logger.error("The mount command for plugin {} failed.", getName(), e);
+ throw UserException.pluginError(e)
+ .message("The mount command for plugin %s failed.", getName())
+ .build(logger);
+ }
+ }
+
+ /**
+ * Runs the configured unmount command if mount commands are enabled
+ * and the command is not empty.
+ * @return true if the configured unmount command was executed
+ */
+ private synchronized boolean unmount() {
+ List<String> unmountCmd = config.getUnmountCommand();
+ if (unmountCmd == null || unmountCmd.isEmpty()) {
+ return false;
+ }
+ if (!mountCommandsEnabled) {
+ throw UserException.permissionError()
+ .message(
+ "A mount command has been configured but mount commands are disabled, see %s",
+ FILE_PLUGIN_MOUNT_COMMANDS
+ )
+ .build(logger);
+ }
+ try {
+ Process proc = Runtime.getRuntime().exec(unmountCmd.toArray(new String[0]));
+ if (proc.waitFor() != 0) {
+ String stderrOutput = IOUtils.toString(proc.getErrorStream(), StandardCharsets.UTF_8);
+ throw new IOException(stderrOutput);
+ }
+ logger.info("The unmount command for plugin {} succeeded.", getName());
+ return true;
+ } catch (IOException | InterruptedException e) {
+ logger.error("The unmount command for plugin {} failed.", getName(), e);
+ throw UserException.pluginError(e)
+ .message("The unmount command for plugin %s failed.", getName())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void start() {
+ if (config.isEnabled()) {
+ mount();
+ }
+ }
+
+ @Override
+ public void onEnabled() {
+ mount();
+ }
+
+ @Override
+ public void onDisabled() {
+ unmount();
+ }
+
+ @Override
+ public void close() {
+ // config.isEnabled() is not a reliable way to tell if we're still enabled
+ // at this stage
+ boolean isEnabled = getContext().getStorage().getDefinedConfig(getName()) != null;
+ if (isEnabled) {
+ unmount();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4dc45ec..4e7ef0a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -115,7 +115,8 @@
text: {
buffer.size: 262144,
batch.size: 4000
- }
+ },
+ enable_mount_commands: false
},
# The name of the file to scan for "classic" storage plugins
# Configured here for ease of testing. Users should NEVER change
diff --git a/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js b/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
index 1defcc9..e325191 100644
--- a/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
+++ b/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
@@ -22,7 +22,7 @@
setTimeout(function() { window.location.href = "/storage"; }, 800);
return true;
} else {
- const errorMessage = data.errorMessage || data.responseJSON["result"];
+ const errorMessage = data.errorMessage || data.responseJSON["result"] || data.responseJSON["message"];
messageEl.addClass("d-none");
// Wait a fraction of a second before showing the message again. This
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
index f0100dc..f9d608d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
@@ -103,7 +103,7 @@
CredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
- StoragePluginConfig boxConfig = new FileSystemConfig("box:///", boxConfigVars,
+ StoragePluginConfig boxConfig = new FileSystemConfig("box:///", null, null, boxConfigVars,
workspaces, formats, oAuthConfig, AuthMode.SHARED_USER.name(), credentialsProvider);
boxConfig.setEnabled(true);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java
new file mode 100644
index 0000000..79aa0fb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.util.Arrays;
+
+import static org.apache.drill.exec.ExecConstants.FILE_PLUGIN_MOUNT_COMMANDS;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(UnlikelyTest.class)
+public class TestMountCommand extends ClusterTest {
+
+ private static File testFile;
+ private static String touchCmd, rmCmd;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(FILE_PLUGIN_MOUNT_COMMANDS, true);
+
+ startCluster(builder);
+
+ // A file that will be created by the filesystem plugin's mount command
+ // and deleted by its unmount command.
+ testFile = new File(String.format(
+ "%s/drill-mount-test",
+ cluster.getDrillTempDir().getAbsolutePath()
+ ));
+
+ if (SystemUtils.IS_OS_WINDOWS) {
+ touchCmd = "type nul > %s";
+ rmCmd = "del %s";
+ } else {
+ touchCmd = "touch %s";
+ rmCmd = "rm %s";
+ }
+ }
+
+ @Test
+ public void testMountUnmount() throws Exception {
+ StoragePluginRegistry pluginRegistry = cluster.storageRegistry();
+ FileSystemConfig dfsConfig = (FileSystemConfig) pluginRegistry
+ .getDefinedConfig(DFS_PLUGIN_NAME);
+
+ FileSystemConfig dfsConfigNew = new FileSystemConfig(
+ dfsConfig.getConnection(),
+ Arrays.asList(String.format(touchCmd, testFile.getAbsolutePath()).split(" ")),
+ Arrays.asList(String.format(rmCmd, testFile.getAbsolutePath()).split(" ")),
+ dfsConfig.getConfig(),
+ dfsConfig.getWorkspaces(),
+ dfsConfig.getFormats(),
+ null, null,
+ dfsConfig.getCredentialsProvider()
+ );
+ dfsConfigNew.setEnabled(true);
+ pluginRegistry.put(DFS_PLUGIN_NAME, dfsConfigNew);
+
+ // Run a query to trigger the mount command because plugins are lazily initialised.
+ run("show files in %s", DFS_PLUGIN_NAME);
+ assertTrue(testFile.exists());
+
+ cluster.storageRegistry().setEnabled(DFS_PLUGIN_NAME, false);
+ assertFalse(testFile.exists());
+ }
+}