DRILL-8359: Add mount and unmount command support to the filesystem plugin (#2713)

* Remove duplicated call to StoragePlugin.start() in ClassicConnectorLocator.
* Handle an additional error message format in serverMessage.js.
* Add a boot option that disables mount commands by default for security.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 740f899..c73168a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -212,6 +212,7 @@
 
   public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
   public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+  public static final String FILE_PLUGIN_MOUNT_COMMANDS = "drill.exec.storage.file.enable_mount_commands";
   public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
   public static final String HTTP_ENABLE = "drill.exec.http.enabled";
   public static final String HTTP_MAX_PROFILES = "drill.exec.http.max_profiles";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 4613693..03b43c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -93,6 +93,12 @@
   public void start() throws IOException { }
 
   @Override
+  public void onEnabled() throws Exception { }
+
+  @Override
+  public void onDisabled() throws Exception { }
+
+  @Override
   public void close() throws Exception { }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
index 5b36207..8d99d1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
@@ -271,9 +271,8 @@
     }
     try {
       plugin = constructor.newInstance(pluginConfig, context.drillbitContext(), name);
-      plugin.start();
       return plugin;
-    } catch (ReflectiveOperationException | IOException e) {
+    } catch (ReflectiveOperationException e) {
       Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
       if (t instanceof ExecutionSetupException) {
         throw ((ExecutionSetupException) t);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 96b75f6..9a51ddc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -48,6 +48,20 @@
   void start() throws IOException;
 
   /**
+   * Lifecycle method allowing the plugin to perform operations when it has been enabled.
+   * @throws Exception in the event of an error. The exception will be propagated
+   * but the enabling of the plugin will _not_ be rolled back.
+   */
+  void onEnabled() throws Exception;
+
+  /**
+   * Lifecycle method allowing the plugin to perform operations when it has been disabled.
+   * @throws Exception in the event of an error. The exception will be propagated
+   * but the disabling of the plugin will _not_ be rolled back.
+   */
+  void onDisabled() throws Exception;
+
+  /**
    * Indicates if Drill can read the table from this format.
   */
   boolean supportsRead();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 64d87f3..c636612 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -415,9 +415,10 @@
   @Override
   public void validatedPut(String name, StoragePluginConfig config)
       throws PluginException {
-
+    Exception lifecycleException = null;
     name = validateName(name);
     PluginHandle oldEntry;
+
     if (config.isEnabled()) {
       PluginHandle entry = restoreFromEphemeral(name, config);
       try {
@@ -431,11 +432,40 @@
           + "Please switch to Logs panel from the UI then check the log.", name), e);
       }
       oldEntry = pluginCache.put(entry);
+      try {
+        if (oldEntry == null || !oldEntry.config().isEnabled()) {
+          // entry has the new plugin config attached to it so is appropriate for onEnabled
+          entry.plugin().onEnabled();
+        }
+      } catch (Exception e) {
+        // Store the exception to be thrown only once we complete the validatePut logic
+        lifecycleException = e;
+      }
     } else {
       oldEntry = pluginCache.remove(name);
+      try {
+        if (oldEntry != null && oldEntry.config().isEnabled()) {
+          // oldEntry has the old plugin config attached to it so is appropriate for onDisabled
+          oldEntry.plugin().onDisabled();
+        }
+      } catch (Exception e) {
+        // Store the exception to be thrown only once we complete the validatePut logic
+        lifecycleException = e;
+      }
     }
     moveToEphemeral(oldEntry);
     pluginStore.put(name, config);
+
+    if (lifecycleException != null) {
+      throw new PluginException(
+        String.format(
+          "A lifecycle method in plugin %s failed. The initiating plugin " +
+            "config update has not been rolled back.",
+          name
+        ),
+        lifecycleException
+      );
+    }
   }
 
   @Override
@@ -881,7 +911,7 @@
     ephemeralPlugins.invalidateAll();
     pluginCache.close();
     pluginStore.close();
-    locators.stream().forEach(loc -> loc.close());
+    locators.forEach(loc -> loc.close());
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index d9cc120..eb80abe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -54,6 +54,7 @@
   public static final String NAME = "file";
 
   private final String connection;
+  private final List<String> mountCommand, unmountCommand;
   private final Map<String, String> config;
   private final Map<String, WorkspaceConfig> workspaces;
   private final Map<String, FormatPluginConfig> formats;
@@ -65,7 +66,7 @@
     Map<String, FormatPluginConfig> formats,
     OAuthConfig oAuthConfig,
     CredentialsProvider credentialsProvider) {
-    this(connection, config, workspaces, formats, oAuthConfig, null,
+    this(connection, null, null, config, workspaces, formats, oAuthConfig, null,
       credentialsProvider);
   }
 
@@ -74,12 +75,14 @@
     Map<String, WorkspaceConfig> workspaces,
     Map<String, FormatPluginConfig> formats,
     CredentialsProvider credentialsProvider) {
-    this(connection, config, workspaces, formats, null, null,
+    this(connection, null, null, config, workspaces, formats, null, null,
       credentialsProvider);
   }
 
   @JsonCreator
   public FileSystemConfig(@JsonProperty("connection") String connection,
+                          @JsonProperty("mountCommand") List<String> mountCommand,
+                          @JsonProperty("unmountCommand") List<String> unmountCommand,
                           @JsonProperty("config") Map<String, String> config,
                           @JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
                           @JsonProperty("formats") Map<String, FormatPluginConfig> formats,
@@ -88,6 +91,8 @@
                           @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
     super(credentialsProvider, credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER), oAuthConfig);
     this.connection = connection;
+    this.mountCommand = mountCommand;
+    this.unmountCommand = unmountCommand;
 
     // Force creation of an empty map so that configs compare equal
     Builder<String, String> builder = ImmutableMap.builder();
@@ -108,6 +113,16 @@
   }
 
   @JsonProperty
+  public List<String> getMountCommand() {
+    return mountCommand;
+  }
+
+  @JsonProperty
+  public List<String> getUnmountCommand() {
+    return unmountCommand;
+  }
+
+  @JsonProperty
   public Map<String, String> getConfig() {
     return config;
   }
@@ -142,6 +157,8 @@
     }
     FileSystemConfig other = (FileSystemConfig) obj;
     return Objects.equals(connection, other.connection) &&
+           Objects.equals(mountCommand, other.mountCommand) &&
+           Objects.equals(unmountCommand, other.unmountCommand) &&
            Objects.equals(config, other.config) &&
            Objects.equals(formats, other.formats) &&
            Objects.equals(workspaces, other.workspaces);
@@ -151,6 +168,8 @@
   public String toString() {
     return new PlanStringBuilder(this)
         .field("connection", connection)
+        .field("mountCommand", mountCommand)
+        .field("unmountCommand", unmountCommand)
         .field("config", config)
         .field("formats", formats)
         .field("workspaces", workspaces)
@@ -184,9 +203,18 @@
       formatsCopy = formatsCopy == null ? new LinkedHashMap<>() : formatsCopy;
       formatsCopy.putAll(newFormats);
     }
-    FileSystemConfig newConfig =
-        new FileSystemConfig(connection, configCopy, workspaces, formatsCopy, oAuthConfig,
-          authMode.name(), credentialsProvider);
+
+    FileSystemConfig newConfig = new FileSystemConfig(
+      connection,
+      mountCommand,
+      unmountCommand,
+      configCopy,
+      workspaces,
+      formatsCopy,
+      oAuthConfig,
+      authMode.name(),
+      credentialsProvider
+    );
     newConfig.setEnabled(isEnabled());
     return newConfig;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 4daf30d..0f0f353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,8 +34,10 @@
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -61,6 +64,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.drill.exec.ExecConstants.FILE_PLUGIN_MOUNT_COMMANDS;
+
 /**
  * A Storage engine associated with a Hadoop FileSystem Implementation. Examples
  * include HDFS, MapRFS, QuantacastFileSystem, LocalFileSystem, as well Apache
@@ -86,6 +91,7 @@
   private final FileSystemConfig config;
   private final Configuration fsConf;
   private TokenRegistry tokenRegistry;
+  private final boolean mountCommandsEnabled;
 
   public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
     super(context, name);
@@ -142,6 +148,7 @@
       }
 
       this.schemaFactory = new FileSystemSchemaFactory(name, factories);
+      this.mountCommandsEnabled = context.getConfig().getBoolean(FILE_PLUGIN_MOUNT_COMMANDS);
     } catch (IOException e) {
       throw new ExecutionSetupException("Failure setting up file system plugin.", e);
     }
@@ -332,4 +339,100 @@
     OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
     tokenRegistry = tokenProvider.getOauthTokenRegistry(null);
   }
+
+  /**
+   * Runs the configured mount command if mount commands are enabled
+   * and the command is not empty.
+   * @return true if the configured mount command was executed
+   */
+  private synchronized boolean mount() {
+    List<String> mountCmd = config.getMountCommand();
+    if (mountCmd == null || mountCmd.isEmpty()) {
+      return false;
+    }
+    if (!mountCommandsEnabled) {
+      throw UserException.permissionError()
+        .message(
+          "A mount command has been configured but mount commands are disabled, see %s",
+          FILE_PLUGIN_MOUNT_COMMANDS
+        )
+        .build(logger);
+    }
+
+    try {
+      Process proc = Runtime.getRuntime().exec(mountCmd.toArray(new String[0]));
+      if (proc.waitFor() != 0) {
+        String stderrOutput = IOUtils.toString(proc.getErrorStream(), StandardCharsets.UTF_8);
+        throw new IOException(stderrOutput);
+      }
+      logger.info("The mount command for plugin {} succeeded.", getName());
+      return true;
+    } catch (IOException | InterruptedException e) {
+      logger.error("The mount command for plugin {} failed.", getName(), e);
+      throw UserException.pluginError(e)
+        .message("The mount command for plugin %s failed.", getName())
+        .build(logger);
+    }
+  }
+
+  /**
+   * Runs the configured unmount command if mount commands are enabled
+   * and the command is not empty.
+   * @return true if the configured unmount command was executed
+   */
+  private synchronized boolean unmount() {
+    List<String> unmountCmd = config.getUnmountCommand();
+    if (unmountCmd == null || unmountCmd.isEmpty()) {
+      return false;
+    }
+    if (!mountCommandsEnabled) {
+      throw UserException.permissionError()
+        .message(
+          "A mount command has been configured but mount commands are disabled, see %s",
+          FILE_PLUGIN_MOUNT_COMMANDS
+        )
+        .build(logger);
+    }
+    try {
+      Process proc = Runtime.getRuntime().exec(unmountCmd.toArray(new String[0]));
+      if (proc.waitFor() != 0) {
+        String stderrOutput = IOUtils.toString(proc.getErrorStream(), StandardCharsets.UTF_8);
+        throw new IOException(stderrOutput);
+      }
+      logger.info("The unmount command for plugin {} succeeded.", getName());
+      return true;
+    } catch (IOException | InterruptedException e) {
+      logger.error("The unmount command for plugin {} failed.", getName(), e);
+      throw UserException.pluginError(e)
+        .message("The unmount command for plugin %s failed.", getName())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void start() {
+    if (config.isEnabled()) {
+      mount();
+    }
+  }
+
+  @Override
+  public void onEnabled() {
+    mount();
+  }
+
+  @Override
+  public void onDisabled() {
+    unmount();
+  }
+
+  @Override
+  public void close() {
+    // config.isEnabled() is not a reliable way to tell if we're still enabled
+    // at this stage
+    boolean isEnabled = getContext().getStorage().getDefinedConfig(getName()) != null;
+    if (isEnabled) {
+      unmount();
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4dc45ec..4e7ef0a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -115,7 +115,8 @@
       text: {
         buffer.size: 262144,
         batch.size: 4000
-      }
+      },
+      enable_mount_commands: false
     },
     # The name of the file to scan for "classic" storage plugins
     # Configured here for ease of testing. Users should NEVER change
diff --git a/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js b/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
index 1defcc9..e325191 100644
--- a/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
+++ b/exec/java-exec/src/main/resources/rest/static/js/serverMessage.js
@@ -22,7 +22,7 @@
         setTimeout(function() { window.location.href = "/storage"; }, 800);
         return true;
     } else {
-	    const errorMessage = data.errorMessage || data.responseJSON["result"];
+	    const errorMessage = data.errorMessage || data.responseJSON["result"] || data.responseJSON["message"];
 
         messageEl.addClass("d-none");
         // Wait a fraction of a second before showing the message again. This
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
index f0100dc..f9d608d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
@@ -103,7 +103,7 @@
 
     CredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
 
-    StoragePluginConfig boxConfig = new FileSystemConfig("box:///", boxConfigVars,
+    StoragePluginConfig boxConfig = new FileSystemConfig("box:///", null, null, boxConfigVars,
       workspaces, formats, oAuthConfig, AuthMode.SHARED_USER.name(), credentialsProvider);
     boxConfig.setEnabled(true);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java
new file mode 100644
index 0000000..79aa0fb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestMountCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.util.Arrays;
+
+import static org.apache.drill.exec.ExecConstants.FILE_PLUGIN_MOUNT_COMMANDS;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(UnlikelyTest.class)
+public class TestMountCommand extends ClusterTest {
+
+  private static File testFile;
+  private static String touchCmd, rmCmd;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+      .configProperty(FILE_PLUGIN_MOUNT_COMMANDS, true);
+
+    startCluster(builder);
+
+    // A file that will be created by the filesystem plugin's mount command
+    // and deleted by its unmount command.
+    testFile = new File(String.format(
+      "%s/drill-mount-test",
+      cluster.getDrillTempDir().getAbsolutePath()
+    ));
+
+   if (SystemUtils.IS_OS_WINDOWS) {
+     touchCmd = "type nul > %s";
+     rmCmd = "del %s";
+   } else {
+     touchCmd = "touch %s";
+     rmCmd = "rm %s";
+   }
+  }
+
+  @Test
+  public void testMountUnmount() throws Exception {
+    StoragePluginRegistry pluginRegistry = cluster.storageRegistry();
+    FileSystemConfig dfsConfig = (FileSystemConfig) pluginRegistry
+      .getDefinedConfig(DFS_PLUGIN_NAME);
+
+    FileSystemConfig dfsConfigNew = new FileSystemConfig(
+      dfsConfig.getConnection(),
+      Arrays.asList(String.format(touchCmd, testFile.getAbsolutePath()).split(" ")),
+      Arrays.asList(String.format(rmCmd, testFile.getAbsolutePath()).split(" ")),
+      dfsConfig.getConfig(),
+      dfsConfig.getWorkspaces(),
+      dfsConfig.getFormats(),
+      null, null,
+      dfsConfig.getCredentialsProvider()
+    );
+    dfsConfigNew.setEnabled(true);
+    pluginRegistry.put(DFS_PLUGIN_NAME, dfsConfigNew);
+
+    // Run a query to trigger the mount command because plugins are lazily initialised.
+    run("show files in %s", DFS_PLUGIN_NAME);
+    assertTrue(testFile.exists());
+
+    cluster.storageRegistry().setEnabled(DFS_PLUGIN_NAME, false);
+    assertFalse(testFile.exists());
+  }
+}