Framework changes and tests for RM queue selection (#571)

* Framework changes and tests for RM queue selection

- Added a new class for RM queue selection tests.
- Added new methods in ConnectionPool to create connections based on user, password, tags and groups.
- Refactored restartDrill out of TestDriver, since it is a utility.
- Create connection with schema, group and queryTags properties.
- Changes in JavaTestBase to get all Drillbits' hostnames.
- Fixed faulty tests in TestSSLProperties

* Cosmetic changes. Add comments for the tests.

* Add package to manage DrillCluster from SSH

- Added classes to manage SSH sessions from a user to a DrillCluster.
- Most of the code is re-used from mapr/ycsb-driver implementations.
- The DrillCluster instance should be used when the test framework has to talk to the cluster for copying files or running certain commands.
- Modified the test cases accordingly.
- Added ThrowingConsumer to support lambda for functions with checked exceptions.

* Cosmetic changes, add javadoc for new utility methods

* Existing TestDriver continue using older method

* Small change, missed synchronized

* Add a method for cleaning up cluster before every test

- Remove any existing RM config in DrillCluster.

* Add cleanup after class as well, other cosmetic changes

* Minor change, add description to testng methods

* Run cleanup once for a class

* Cleanup and add test case for unknown tag
diff --git a/framework/pom.xml b/framework/pom.xml
index aa876fe..5dc9633 100644
--- a/framework/pom.xml
+++ b/framework/pom.xml
@@ -130,6 +130,11 @@
       <artifactId>config</artifactId>
       <version>1.3.2</version>
     </dependency>
+    <dependency>
+      <groupId>com.jcraft</groupId>
+      <artifactId>jsch</artifactId>
+      <version>0.1.53</version>
+    </dependency>
   </dependencies>
   <repositories>
     <repository>
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ConnectionPool.java b/framework/src/main/java/org/apache/drill/test/framework/ConnectionPool.java
index 736b5b2..6eca059 100644
--- a/framework/src/main/java/org/apache/drill/test/framework/ConnectionPool.java
+++ b/framework/src/main/java/org/apache/drill/test/framework/ConnectionPool.java
@@ -90,8 +90,32 @@
   }
 
   @VisibleForTesting
-  public Connection createConnection(Properties connectionProperties) throws SQLException {
-    return DriverManager.getConnection(DrillTestDefaults.CONNECTION_STRING, connectionProperties);
+  public static Connection createConnection(Properties connectionProperties) throws SQLException {
+    return createConnection(DrillTestDefaults.CONNECTION_STRING, connectionProperties);
+  }
+
+  /**
+   * Create a connection with a custom URL and properties.
+   *
+   * @param url
+   * @param props
+   * @return Connection instance to drill cluster.
+   * @throws SQLException
+   */
+  @VisibleForTesting
+  public static Connection createConnection(final String url,
+                                            final Properties props) throws SQLException {
+    return createConnection(url, DrillTestDefaults.USERNAME, DrillTestDefaults.PASSWORD, props);
+  }
+
+  @VisibleForTesting
+  public static synchronized Connection createConnection(final String url,
+                                                         final String username,
+                                                         final String password,
+                                                         final Properties props) throws SQLException {
+    props.put("user", username == null ? DrillTestDefaults.USERNAME : username);
+    props.put("password", password == null ? DrillTestDefaults.PASSWORD : password);
+    return DriverManager.getConnection(url, props);
   }
 
   /**
diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillTestDefaults.java b/framework/src/main/java/org/apache/drill/test/framework/DrillTestDefaults.java
index 9c86008..71b21f1 100644
--- a/framework/src/main/java/org/apache/drill/test/framework/DrillTestDefaults.java
+++ b/framework/src/main/java/org/apache/drill/test/framework/DrillTestDefaults.java
@@ -113,6 +113,8 @@
 
   public static final String DRILL_RM_OVERRIDE_CONF_FILENAME = "drill-rm-override.conf";
 
+  public static final long DEFAULT_SLEEP_IN_MILLIS = 20000; //default sleep
+
   // Adding classifications for Execution Failures
   public static enum DRILL_EXCEPTION{
        VALIDATION_ERROR_INVALID_SCHEMA,
diff --git a/framework/src/main/java/org/apache/drill/test/framework/TestDriver.java b/framework/src/main/java/org/apache/drill/test/framework/TestDriver.java
index 1194f9f..079e89a 100644
--- a/framework/src/main/java/org/apache/drill/test/framework/TestDriver.java
+++ b/framework/src/main/java/org/apache/drill/test/framework/TestDriver.java
@@ -618,7 +618,7 @@
 
     executor.close();
     connectionPool.close();
-    restartDrill();
+    Utils.restartDrill();
     return totalExecutionFailures + totalDataVerificationFailures + totalPlanVerificationFailures + totalTimeoutFailures + totalRandomFailures;
   }
 
@@ -784,7 +784,7 @@
     LOG.info("\n>> Generation duration: " + stopwatch + "\n");
 
     if (restartDrillbits) {
-      restartDrill();
+      Utils.restartDrill();
     }
   }
 
@@ -973,19 +973,4 @@
       e.printStackTrace();
     }
   }
-  
-  private int restartDrill() {
-    int exitCode = 0;
-    String command = DrillTestDefaults.TEST_ROOT_DIR + "/" + DrillTestDefaults.RESTART_DRILL_SCRIPT;
-    File commandFile = new File(command);
-    if (commandFile.exists() && commandFile.canExecute()) {
-      LOG.info("\n> Executing Post Build Script");
-      LOG.info("\n>> Path: " + command);
-      exitCode = Utils.execCmd(command).exitCode;
-      if (exitCode != 0) {
-        LOG.error("\n>> Error restarting drillbits");
-      }
-    }
-    return exitCode;
-  }
 }
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ThrowingConsumer.java b/framework/src/main/java/org/apache/drill/test/framework/ThrowingConsumer.java
new file mode 100644
index 0000000..88de0f0
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ThrowingConsumer.java
@@ -0,0 +1,19 @@
+package org.apache.drill.test.framework;
+
+import java.util.function.Consumer;
+
+@FunctionalInterface
+public interface ThrowingConsumer<T, E extends Throwable> {
+    void accept(T t) throws E;
+
+    static <T> Consumer<T> throwingConsumerWrapper(
+            ThrowingConsumer<T, Exception> throwingConsumer) {
+        return i -> {
+            try {
+                throwingConsumer.accept(i);
+            } catch (Exception ex) {
+                Utils.sneakyThrow(ex);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/framework/src/main/java/org/apache/drill/test/framework/Utils.java b/framework/src/main/java/org/apache/drill/test/framework/Utils.java
index 0e9e00e..04f0fc2 100755
--- a/framework/src/main/java/org/apache/drill/test/framework/Utils.java
+++ b/framework/src/main/java/org/apache/drill/test/framework/Utils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test.framework;
 
+import com.google.common.base.Preconditions;
 import oadd.org.apache.drill.exec.proto.UserBitShared;
 import org.apache.commons.io.FilenameUtils;
 
@@ -43,6 +44,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -55,6 +57,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Resources;
 
+import org.apache.drill.test.framework.ssh.DrillCluster;
 import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.HttpClient;
@@ -115,6 +118,24 @@
     return connectionProperties;
   }
 
+  public static Properties createConnectionProperties(final String schema,
+                                                      final String group,
+                                                      final String queryTags) {
+    Properties props = createConnectionProperties();
+    if(schema != null) {
+      props.put("schema", schema);
+    }
+
+    if(group != null) {
+      props.put("group", group);
+    }
+
+    if(queryTags != null) {
+      props.put("queryTags", queryTags);
+    }
+    return props;
+  }
+
   // Accept self-signed certificate
   public static class MyHostNameVerifier implements HostnameVerifier {
 
@@ -919,52 +940,35 @@
   }
 
   /**
-   * Apply RM config represented by DrillRMConfig to a specified Drillbit.
+   * Apply RM config represented by DrillRMConfig to all drillbits part of {@link DrillCluster}
    *
    * As a part of this method
    * - Write the config to a temporary file (remove if file exists previously.
-   * - Copy the file to specified Drillbit node.
+   * - Copy the file to all nodes part of the {@link DrillCluster}.
    *
    * @param config
-   * @param drillbitHost
+   * @param drillCluster
    * @throws IOException
    */
-  public static synchronized void applyRMConfigToDrillbit(final DrillRMConfig config,
-                                             final String drillbitHost) throws IOException {
+  public static synchronized void applyRMConfigToDrillCluster(final DrillRMConfig config,
+                                                              final DrillCluster drillCluster) throws IOException {
     final String drillRMConfFilePath = DrillTestDefaults.TEST_ROOT_DIR + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME;
-
     File drillRMConfFile = new File(drillRMConfFilePath);
 
     CmdConsOut out;
     if(drillRMConfFile.exists()) {
       LOG.warn(drillRMConfFilePath + " exists! Removing the file");
       if ((out = Utils.execCmd("rm -rf " + drillRMConfFilePath)).exitCode != 0) {
-        LOG.error("Could not remove config file " +
-                drillRMConfFilePath + "\n\n" +
-                out);
+        LOG.error("Could not remove config file " + drillRMConfFilePath + "\n\n" + out);
         throw new IOException(out.consoleErr);
       }
     }
-
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(drillRMConfFilePath))) {
       writer.write(DRILL_EXEC_RM_CONFIG_KEY + ":" + config.render());
     }
-
-    final String scpCommand = new StringBuilder("scp ")
-            .append(drillRMConfFilePath)
-            .append(" ")
-            .append(USERNAME)
-            .append("@").append(drillbitHost)
-            .append(":").append(DRILL_HOME)
-            .append("/conf/")
-            .append(DRILL_RM_OVERRIDE_CONF_FILENAME)
-            .toString();
-
-    LOG.info("Copying config " + scpCommand);
-    if ((out = Utils.execCmd(scpCommand)).exitCode != 0) {
-      LOG.error("Copying config to drillbit failed!\n\n" + out);
-      throw new IOException(out.consoleErr);
-    }
+    //Remove if an override conf exists
+    drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME);
+    drillCluster.copyToRemote(drillRMConfFilePath, DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME);
   }
 
   public static boolean sanityTest(Connection connection) {
@@ -1072,6 +1076,66 @@
     return true;
   }
 
+  /**
+   * Restart drillbits, ignore IOExceptions, if any.
+   * This version of the utility uses the restart drillbit script configured.
+   *
+   * Refactored out of {@link TestDriver}, kept for backward compatibility.
+   * Use {@link #restartDrillbits(DrillCluster)} instead.
+   */
+  @Deprecated
+  public static synchronized int restartDrill() {
+    int exitCode = 0;
+    String command = DrillTestDefaults.TEST_ROOT_DIR + "/" + DrillTestDefaults.RESTART_DRILL_SCRIPT;
+    File commandFile = new File(command);
+    if (commandFile.exists() && commandFile.canExecute()) {
+      LOG.info("\n> Executing Post Build Script");
+      LOG.info("\n>> Path: " + command);
+      exitCode = Utils.execCmd(command).exitCode;
+      if (exitCode != 0) {
+        LOG.error("\n>> Error restarting drillbits");
+      }
+    }
+    return exitCode;
+  }
+
+    /**
+     * Restart drillbits available as a part of {@link DrillCluster} instance passed.
+     * @param drillCluster instance of a drill cluster.
+     */
+  public static synchronized void restartDrillbits(final DrillCluster drillCluster) {
+      Preconditions.checkNotNull(drillCluster, "drillCluster cannot be null!");
+      drillCluster.runCommand(DRILL_HOME + "/bin/drillbit.sh restart");
+      sleepForTimeInMillis(DEFAULT_SLEEP_IN_MILLIS);
+  }
+
+  /**
+   * Utility method to sleep for specified amount of time (in milliseconds).
+   *
+   * @param timeInMillis
+   */
+  public static void sleepForTimeInMillis(final long timeInMillis) {
+    try {
+      LOG.info("Waiting for " + timeInMillis + "ms .. ");
+      Thread.sleep(timeInMillis);
+    } catch (Exception e) {
+      //Ignore
+    }
+  }
+
+  /**
+   * Passed exception is hidden from compiler but re-thrown.
+   * Used by functional interfaces that allow checked exceptions.
+   *
+   * @param e
+   * @param <E>
+   *     @throws E
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
+    throw (E) e;
+  }
+  
   public static String getFrameworkVersion() {
     String commitID = "";
     String commitAuthor = "";
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/CopyToRemote.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/CopyToRemote.java
new file mode 100644
index 0000000..b8b3b18
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/CopyToRemote.java
@@ -0,0 +1,107 @@
+package org.apache.drill.test.framework.ssh;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.ChannelExec;
+
+/**
+ * Represent a scp or copy to remote host task.
+ *
+ * Code reused from implementation of CopyToRemote.java in
+ * {@link - https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/db/ycsb-driver/}
+ */
+public class CopyToRemote extends SSHTask<CopyToRemote> {
+    private static final Logger LOG = LoggerFactory .getLogger(CopyToRemote.class);
+
+    final private File localFile;
+    private String remoteFilePath;
+
+    public CopyToRemote(final DrillCluster cluster,
+                        final String host,
+                        final String localFile,
+                        final String remoteFile) {
+        super(cluster, host);
+        this.localFile = new File(localFile);
+        this.remoteFilePath = remoteFile;
+    }
+
+    @Override
+    public CopyToRemote run(SSHSession session) throws Exception {
+        LOG.info("Copying {} to {}", localFile, remoteFilePath);
+
+        StringBuilder command = new StringBuilder("scp -p -t ").append(remoteFilePath);
+        ChannelExec channel = session.openChannelExec("exec");
+        channel.setCommand(command.toString());
+
+        OutputStream out = channel.getOutputStream();
+        InputStream in = channel.getInputStream();
+
+        channel.connect();
+        checkAck(in);
+
+        // send file lastModifiedTime and lastAccessTime
+        command.setLength(0);
+        Path localFileDir = Paths.get(localFile.getParent());
+        Path localFilePath = localFileDir.resolve(localFile.getName());
+        BasicFileAttributes attrs = Files.readAttributes(localFilePath,
+                BasicFileAttributes.class);
+        command.append("T").append(attrs.lastModifiedTime().toMillis() / 1000).append(" 0");
+        command.append(" ").append(attrs.lastAccessTime().toMillis() / 1000).append(" 0\n");
+        LOG.debug("Transmiting last modified and access time . . .");
+        out.write(command.toString().getBytes(UTF_8));
+        out.flush();
+        checkAck(in);
+
+        // send "C0644 filesize filename"
+        command.setLength(0);
+        long filesize = localFile.length();
+        command.append("C0644 ").append(filesize).append(" ").append(localFile.getName()).append('\n');
+        LOG.debug("Transmiting file size and name . . .");
+        out.write(command.toString().getBytes(UTF_8));
+        out.flush();
+        checkAck(in);
+
+        // send the content of lfile
+        FileInputStream fis = new FileInputStream(localFile);
+        LOG.debug("Transmiting file content . . .");
+        byte[] buf = new byte[1024];
+        long transmitted = 0;
+        long lastPercentTransmitted = 0;
+        while (true) {
+            int len = fis.read(buf, 0, buf.length);
+            if (len <= 0)
+                break;
+            out.write(buf, 0, len);
+            transmitted += len;
+            long percentTransmitted = transmitted*100/filesize;
+            if (percentTransmitted > lastPercentTransmitted
+                    && percentTransmitted % 10 == 0) {
+                lastPercentTransmitted = percentTransmitted;
+                LOG.debug("{}% complete", percentTransmitted);
+            }
+        }
+        fis.close();
+        buf[0] = 0; // send '\0'
+        out.write(buf, 0, 1);
+        out.flush();
+        checkAck(in);
+        out.close();
+
+        channel.disconnect();
+        LOG.debug("Transmission complete.");
+        return this;
+    }
+
+}
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/DrillCluster.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/DrillCluster.java
new file mode 100644
index 0000000..a19f308
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/DrillCluster.java
@@ -0,0 +1,89 @@
+package org.apache.drill.test.framework.ssh;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.drill.test.framework.DrillTestDefaults;
+import org.apache.drill.test.framework.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a DrillCluster.
+ * Should be used whenever the test framework should talk to a drill cluster.
+ *
+ * Instantiated with hostnames of nodes running drillbits and an admin user.
+ * If admin user is not provided, "root" is considered as the admin user.
+ */
+public class DrillCluster {
+    final private static Logger LOG = LoggerFactory.getLogger(DrillCluster.class);
+    final private SessionManager manager;
+    final private ExecutorService executor = Executors.newCachedThreadPool();
+    final private Set<String> clusterNodes = Sets.newLinkedHashSet();
+
+    public DrillCluster(List<String> hosts) {
+        this(DrillTestDefaults.USERNAME, hosts);
+    }
+
+    public DrillCluster(String adminUser, List<String> hosts) {
+        Preconditions.checkNotNull(hosts, "\"hosts\" cannot be null");
+        if(adminUser == null) {
+            adminUser = DrillTestDefaults.USERNAME;
+        }
+
+        manager = new SessionManager(adminUser);
+        clusterNodes.addAll(hosts);
+        LOG.info("DrillCluster initialized with {} nodes", clusterNodes);
+    }
+
+    public SessionManager getSessionManager() {
+        return manager;
+    }
+
+    public void copyToRemote(final String localFile, final String remoteFile) {
+        List<Future<CopyToRemote>> futures = Lists.newArrayList();
+
+        clusterNodes.forEach(ThrowingConsumer.throwingConsumerWrapper(n -> {
+            CopyToRemote cmd = new CopyToRemote(this, n, localFile, remoteFile);
+            futures.add(executor.submit(cmd));
+        }));
+
+        futures.forEach(ThrowingConsumer.throwingConsumerWrapper(Future::get));
+    }
+
+    public void runCommand(final String command) {
+        List<Future<RunCommand>> futures = Lists.newArrayList();
+
+        clusterNodes.forEach(ThrowingConsumer.throwingConsumerWrapper(n -> {
+            RunCommand cmd = new RunCommand(this, n, command);
+            futures.add(executor.submit(cmd));
+        }));
+        futures.forEach(ThrowingConsumer.throwingConsumerWrapper(Future::get));
+    }
+
+    public void shutdown() {
+        manager.shutdown();
+        executor.shutdown();
+    }
+
+    public void addHost(String host) {
+        clusterNodes.add(host);
+    }
+
+    public void addHosts(List<String> hosts) {
+        clusterNodes.addAll(hosts);
+    }
+
+    public List<String> getHosts() {
+        return new LinkedList<>(clusterNodes);
+    }
+}
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/RunCommand.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/RunCommand.java
new file mode 100644
index 0000000..086dc06
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/RunCommand.java
@@ -0,0 +1,99 @@
+package org.apache.drill.test.framework.ssh;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.ChannelExec;
+
+/**
+ * Task to run specified command on a remote host.
+ * Code reused from implementation of RunCommand.java in
+ * {@link - https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/db/ycsb-driver/}
+ */
+public class RunCommand extends SSHTask<RunCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(RunCommand.class);
+    private static final CharsetDecoder DECODER = StandardCharsets.UTF_8.newDecoder()
+            .onMalformedInput(CodingErrorAction.REPLACE)
+            .onUnmappableCharacter(CodingErrorAction.REPLACE);
+
+    private String cmd;
+    private String output;
+    private int exitCode;
+
+    public RunCommand(final DrillCluster cluster, final String host, final String cmd) {
+        super(cluster, host);
+        this.cmd = cmd;
+    }
+
+    @Override
+    public RunCommand run(SSHSession session) throws Exception {
+        ChannelExec channel = session.openChannelExec("exec");
+        channel.setCommand(cmd);
+        channel.setInputStream(null);
+
+        LOG.info("Executing command '{}'.", cmd);
+        channel.connect();
+
+        BufferedReader stdErrReader = new BufferedReader(
+                new InputStreamReader(channel.getErrStream(), DECODER), 1024);
+        BufferedReader stdOutReader = new BufferedReader(
+                new InputStreamReader(channel.getInputStream(), DECODER), 1024);
+
+        LOG.debug("Reading from stream.");
+        final StringBuilder out = new StringBuilder();
+        out.append("Error:\n"); // must read err before out
+        if (!consumeReader(channel, stdErrReader, out)) {
+            out.setLength(0);
+        }
+        int bufLen = out.length();
+        out.append("Output:\n");
+        if (!consumeReader(channel, stdOutReader, out)) {
+            out.setLength(bufLen);
+        }
+
+        if (out.length() > 0) {
+            output = out.toString();
+            LOG.debug("\n{}", output);
+        }
+
+        LOG.debug("Command completed, exit-status: {}.", exitCode = channel.getExitStatus());
+        channel.disconnect();
+        LOG.debug("Channel disconnected.");
+        return this;
+    }
+
+    private boolean consumeReader(ChannelExec channel,
+                                  BufferedReader reader, StringBuilder out) throws IOException {
+        boolean hadData = false;
+        while (true) {
+            int ch;
+            while ((ch = reader.read()) > 0) {
+                hadData = true;
+                out.append((char)ch);
+            }
+            if (channel.isClosed()) {
+                break;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (Exception ee) {}
+        }
+        return hadData;
+    }
+
+    public String getOutput() throws Exception {
+        return output;
+    }
+
+    public int getExitCode() {
+        return exitCode;
+    }
+
+}
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHSession.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHSession.java
new file mode 100644
index 0000000..4081804
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHSession.java
@@ -0,0 +1,128 @@
+package org.apache.drill.test.framework.ssh;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents a SSH session object that can be used to talk to a host as a particular user.
+ * These SSH sessions are managed by {@link SessionManager}.
+ *
+ * Code reused from implementation of SSHSession.java in
+ * {@link - https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/db/ycsb-driver/}
+ */
+public class SSHSession {
+    static Logger logger = LoggerFactory.getLogger(SSHSession.class);
+
+    final private static String userHome = System.getProperty("user.home");
+    final private static File sshDir = new File(userHome, ".ssh");
+    private static JSch jsch = null;
+    final static private AtomicInteger idTracker = new AtomicInteger();
+
+    private final Session session;
+    private final String id;
+
+    public SSHSession(final String user, final String host)
+            throws JSchException {
+        id = String.valueOf(idTracker.incrementAndGet());
+        session = getJSch().getSession(user, host, 22);
+        session.setConfig("HashKnownHosts", "yes");
+        session.setConfig("StrictHostKeyChecking", "no");
+        session.setUserInfo(new UserInfo() {
+            @Override
+            public String getPassphrase() {
+                //Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public String getPassword() {
+                //Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public boolean promptPassword(String message) {
+                //Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public boolean promptPassphrase(String message) {
+                //Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public boolean promptYesNo(String message) {
+                //Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public void showMessage(String message) {
+                //Auto-generated method stub
+            }
+        });
+        logger.info("Connecting to '{}' as '{}'.", host, user);
+        session.connect();
+        logger.info("Success.");
+    }
+
+//    public String substituteId(String orginal) {
+//        return orginal.replaceAll("\\$\\{node_id}", id).replaceAll("\\$\\{node_host}", session.getHost());
+//    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    private static synchronized JSch getJSch() throws JSchException {
+        if (jsch == null) {
+            jsch = new JSch();
+            addIdFile(jsch, "id_rsa");
+            addIdFile(jsch, "id_dsa");
+            addIdFile(jsch, "id_rsa.ppk");
+            jsch.setKnownHosts(new File(sshDir, "known_hosts").getAbsolutePath());
+        }
+        return jsch;
+    }
+
+    private static void addIdFile(JSch jsch, String fileName) throws JSchException {
+        File idFile = new File(sshDir, fileName);
+        if (idFile.exists()) {
+            jsch.addIdentity(idFile.getAbsolutePath());
+        }
+    }
+
+    public boolean isConnected() {
+        return session.isConnected();
+    }
+
+    public void disconnect() {
+        session.disconnect();
+    }
+
+    public String getUserName() {
+        return session.getUserName();
+    }
+
+    public String getHost() {
+        return session.getHost();
+    }
+
+    public ChannelExec openChannelExec(String exec)
+            throws JSchException {
+        return (ChannelExec) session.openChannel(exec);
+    }
+
+}
+
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHTask.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHTask.java
new file mode 100644
index 0000000..84833e6
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/SSHTask.java
@@ -0,0 +1,56 @@
+package org.apache.drill.test.framework.ssh;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+
+/**
+ * Class represents an SSH task ro be run on a host.
+ *
+ * Code reused from implementation of NodeTask.java in
+ * {@link - https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/db/ycsb-driver/}
+ */
+public abstract class SSHTask<T> implements Callable<T> {
+    final protected DrillCluster cluster;
+    private final String host;
+
+    protected SSHTask(final DrillCluster cluster, final String host) {
+        this.cluster = cluster;
+        this.host = host;
+    }
+
+    @Override
+    public T call() throws Exception {
+        final SSHSession session = cluster.getSessionManager().getSession(getHost());
+        Thread.currentThread().setName(session.getUserName() + "@" + session.getHost());
+        return run(session);
+    }
+
+    protected void ack(OutputStream out) throws IOException {
+        out.write(0); out.flush();
+    }
+
+    protected int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        switch (b) {
+            case 1: // error
+            case 2: // fatal error
+                int c;
+                StringBuilder sb = new StringBuilder();
+                sb.append("Error: " + b + ". ");
+                while ((c = in.read()) != '\n') {
+                    sb.append((char) c);
+                }
+                throw new IOException(sb.toString());
+            case 0: // success
+            default:
+                return b;
+        }
+    }
+    protected abstract T run(SSHSession session) throws Exception;
+
+    public String getHost() {
+        return host;
+    }
+}
diff --git a/framework/src/main/java/org/apache/drill/test/framework/ssh/SessionManager.java b/framework/src/main/java/org/apache/drill/test/framework/ssh/SessionManager.java
new file mode 100644
index 0000000..34651e5
--- /dev/null
+++ b/framework/src/main/java/org/apache/drill/test/framework/ssh/SessionManager.java
@@ -0,0 +1,46 @@
+package org.apache.drill.test.framework.ssh;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.jcraft.jsch.JSchException;
+
+/**
+ * Represents a SSH session manager.
+ * Every user and host combination has an entry in the in-memory map.
+ *
+ * Code reused from implementation of SessionManager.java in
+ * {@link - https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/db/ycsb-driver/}
+ */
+public class SessionManager {
+    final Map<String, SSHSession> sessionMap = Maps.newHashMap();
+
+    final private String user;
+
+    public SessionManager(String user) {
+        this.user = user;
+    }
+
+    public synchronized SSHSession getSession(final String node)
+            throws JSchException {
+        String host = node;
+        String hostString = user + "@" + host;
+        if(host.contains("_")){
+            host=host.substring(0,host.indexOf("_"));
+        }
+        SSHSession session = sessionMap.get(hostString);
+        if (session == null || !session.isConnected()) {
+            session = new SSHSession(user, host);
+            sessionMap.put(hostString, session);
+        }
+        return session;
+    }
+
+    public void shutdown() {
+        sessionMap.forEach((key, session) -> {
+            if (session.isConnected()) {
+                session.disconnect();
+            }
+        });
+    }
+}
diff --git a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java
index ed554ce..d7c3994 100644
--- a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java
+++ b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java
@@ -22,12 +22,12 @@
 
 import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY;
 import static org.apache.drill.test.framework.common.DrillTestNGDefaults.UNIT_GROUP;
+import static org.apache.drill.test.framework.common.DrillTestNGDefaults.SAMPLE_RM_CONFIG_NAME;
 
 @Test(groups = UNIT_GROUP)
 public class DrillTestFrameworkUnitTests extends DrillJavaTestBase {
     private static final Logger LOG = Logger.getLogger(DrillTestFrameworkUnitTests.class);
-    private static final String SAMPLE_RM_CONFIG_NAME =
-            DrillTestDefaults.CWD + "/src/test/resources/sample-drill-rm-override.conf";
+
 
     @BeforeTest(alwaysRun = true)
     public void runBeforeTest() {
@@ -44,11 +44,9 @@
      */
     @Test(groups = UNIT_GROUP)
     public void testGetQueryProfile(Method method) {
-        final Properties props = Utils.createConnectionProperties();
-        final ConnectionPool pool = new ConnectionPool(props);
         final String sqlStatement = "select name, val, status from sys.options where name like \'%runtime%\'";
 
-        try (Connection connection = pool.getOrCreateConnection()) {
+        try (Connection connection = connectionPool.getOrCreateConnection()) {
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery(sqlStatement);
 
diff --git a/framework/src/test/java/org/apache/drill/test/framework/TestSSLProperties.java b/framework/src/test/java/org/apache/drill/test/framework/TestSSLProperties.java
index c1b2426..19916b7 100644
--- a/framework/src/test/java/org/apache/drill/test/framework/TestSSLProperties.java
+++ b/framework/src/test/java/org/apache/drill/test/framework/TestSSLProperties.java
@@ -12,7 +12,7 @@
 public class TestSSLProperties {
 
   @Test
-  public void testSSLDisabled() throws Exception {
+  public void testSSLDisabled() {
     Properties connectionProperties = new Properties();
 
     connectionProperties.put("auth", DrillTestDefaults.AUTHENTICATION_MECHANISM);
@@ -20,9 +20,8 @@
     connectionProperties.put("user", DrillTestDefaults.USERNAME);
     connectionProperties.put("password", DrillTestDefaults.PASSWORD);
 
-    ConnectionPool connectionPool = new ConnectionPool(connectionProperties);
-    try {
-      connectionPool.createConnection(connectionProperties);
+    try (Connection conn = ConnectionPool.createConnection(connectionProperties)){
+      fail("Establishing connection succeeded, but should have failed!");
     } catch (SQLException ex) {
       String message = "Error setting/closing connection. Details: HANDSHAKE_COMMUNICATION";
       assertTrue(ex.getMessage().contains(message));
@@ -30,7 +29,7 @@
   }
 
   @Test
-  public void testSSLEnabled() throws Exception {
+  public void testSSLEnabled() {
     Properties connectionProperties = new Properties();
 
     connectionProperties.put("auth", DrillTestDefaults.AUTHENTICATION_MECHANISM);
@@ -40,17 +39,11 @@
     connectionProperties.put("user", DrillTestDefaults.USERNAME);
     connectionProperties.put("password", DrillTestDefaults.PASSWORD);
 
-    ConnectionPool connectionPool = new ConnectionPool(connectionProperties);
-    try {
-      connectionPool.createConnection(connectionProperties);
+    try (Connection conn = ConnectionPool.createConnection(connectionProperties)){
+      assertTrue(Utils.sanityTest(conn));
     } catch (SQLException ex) {
       ex.printStackTrace();
       fail("Establishing connection failed.");
-    } finally {
-      Connection connection = connectionPool.createConnection(connectionProperties);
-      assertTrue(Utils.sanityTest(connection));
-      connection.close();
-      connectionPool.close();
     }
   }
 
diff --git a/framework/src/test/java/org/apache/drill/test/framework/common/DrillJavaTestBase.java b/framework/src/test/java/org/apache/drill/test/framework/common/DrillJavaTestBase.java
index 0fa102c..3bb7b1b 100644
--- a/framework/src/test/java/org/apache/drill/test/framework/common/DrillJavaTestBase.java
+++ b/framework/src/test/java/org/apache/drill/test/framework/common/DrillJavaTestBase.java
@@ -1,5 +1,9 @@
 package org.apache.drill.test.framework.common;
 
+import com.google.common.base.Preconditions;
+import org.apache.drill.test.framework.ConnectionPool;
+import org.apache.drill.test.framework.Utils;
+import org.apache.drill.test.framework.ssh.DrillCluster;
 import org.apache.log4j.Logger;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -11,9 +15,14 @@
 import org.testng.annotations.BeforeTest;
 
 import java.lang.reflect.Method;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
 
 public class DrillJavaTestBase {
     private static final Logger LOG = Logger.getLogger(DrillJavaTestBase.class);
+    protected ConnectionPool connectionPool;
+    protected DrillCluster drillCluster;
 
     @BeforeSuite(alwaysRun = true, description = "Invoked at the beginning of the Test Suite.")
     public void baseBeforeSuite() {
@@ -26,12 +35,16 @@
     }
 
     @BeforeClass(alwaysRun = true, description = "Invoked at the beginning of every Test Class.")
-    public void baseBeforeClass() {
+    public void baseBeforeClass() throws IllegalStateException {
         LOG.debug("Running Base Before Class");
+        final Properties props = Utils.createConnectionProperties();
+        connectionPool = new ConnectionPool(props);
+        drillCluster = createDrillCluster(connectionPool);
     }
 
     @BeforeMethod(alwaysRun = true, description = "Invoked before every Test Method.")
     public void baseBeforeMethod(Method method) {
+
         LOG.info("\n\n---------- Test " + method.getName() + " started ----------\n\n");
     }
 
@@ -43,6 +56,13 @@
     @AfterClass(alwaysRun = true, description = "Invoked after all tests in a Test Class finish.")
     public void baseAfterClass() {
         LOG.debug("Running Base After Class");
+        if (connectionPool != null) {
+            connectionPool.close();
+        }
+
+        if(drillCluster != null) {
+            drillCluster.shutdown();
+        }
     }
 
     @AfterTest(alwaysRun = true, description = "Invoked once tests in all classes in the Test Module finish.")
@@ -54,4 +74,25 @@
     public void baseAfterSuite() {
         LOG.debug("Running Base After Suite");
     }
+
+    /**
+     * Utility method to create a {@link DrillCluster} instance.
+     * @param pool
+     * @return
+     * @throws SQLException
+     */
+    protected DrillCluster createDrillCluster(ConnectionPool pool) throws IllegalStateException {
+        Preconditions.checkNotNull(pool, "Connection pool is not created!");
+        List<String> drillbitHosts;
+        try {
+             drillbitHosts = Utils.getDrillbitHosts(pool.getOrCreateConnection());
+            LOG.info("Size of Drill cluster " + drillbitHosts.size());
+            drillbitHosts.forEach(LOG::info);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IllegalStateException("Could not query drillbit hosts.\n" +
+                    "[Hint: Ensure that the Drill cluster is up and ready to take take queries.]");
+        }
+        return new DrillCluster(drillbitHosts);
+    }
 }
diff --git a/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java b/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java
index 3bc1374..fbc4753 100644
--- a/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java
+++ b/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java
@@ -1,6 +1,17 @@
 package org.apache.drill.test.framework.common;
 
+import org.apache.drill.test.framework.DrillTestDefaults;
+
 public final class DrillTestNGDefaults {
     public static final String FUNCTIONAL_GROUP = "functional";
     public static final String UNIT_GROUP = "unit";
+    public static final String SAMPLE_RM_CONFIG_NAME =
+            DrillTestDefaults.CWD + "/src/test/resources/sample-drill-rm-override.conf";
+    public static final String BASIC_RM_CONFIG_NAME =
+            DrillTestDefaults.CWD + "/src/test/resources/basic-drill-rm-override.conf";
+
+    public static String CONNECTION_URL_FOR_DRILLBIT(final String hostnameOrIp) {
+        return String.format("jdbc:drill:drillbit=%s", hostnameOrIp);
+    }
+    public static final String NO_RESOURCE_POOL_ERROR = "No resource pools to choose from for the query";
 }
diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java
new file mode 100644
index 0000000..169e469
--- /dev/null
+++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java
@@ -0,0 +1,291 @@
+package org.apache.drill.test.framework.resourcemanagement;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.test.framework.*;
+import org.apache.drill.test.framework.common.DrillJavaTestBase;
+import org.apache.drill.test.framework.common.DrillTestNGDefaults;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.*;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_HOME;
+import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_RM_OVERRIDE_CONF_FILENAME;
+import static org.apache.drill.test.framework.common.DrillTestNGDefaults.FUNCTIONAL_GROUP;
+import static org.apache.drill.test.framework.common.DrillTestNGDefaults.NO_RESOURCE_POOL_ERROR;
+import static org.apache.drill.test.framework.common.DrillTestNGDefaults.BASIC_RM_CONFIG_NAME;
+
+@SuppressWarnings("Duplicates")
+@Test(groups = FUNCTIONAL_GROUP)
+public class QueueSelectionTests extends DrillJavaTestBase {
+    private static final Logger LOG = Logger.getLogger(QueueSelectionTests.class);
+
+    @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class")
+    private void setup() throws IOException {
+        cleanup(false);
+        DrillRMConfig config = DrillRMConfig.load(BASIC_RM_CONFIG_NAME);
+        Utils.applyRMConfigToDrillCluster(config, drillCluster);
+        Utils.restartDrillbits(drillCluster);
+    }
+
+    @AfterClass(alwaysRun = true, description = "Invoked after all tests in the class are executed")
+    private void cleanup() {
+        cleanup(true);
+    }
+
+    private void cleanup(final boolean restart) {
+        Preconditions.checkNotNull(connectionPool,
+                "Cleanup failed! Connection pool has not be instantiated");
+        Preconditions.checkNotNull(drillCluster,
+                "Cleanup failed! Drill cluster information is unavailable");
+        drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME);
+        if(restart) {
+            Utils.restartDrillbits(drillCluster);
+        }
+    }
+
+    /**
+     * Test validates that the tag is evaluated and the right queue is picked based on the tag.
+     *
+     * @throws IOException
+     */
+    @Test(groups = FUNCTIONAL_GROUP)
+    public void testTagSelectorBasic() throws IOException {
+        final String query = "SELECT o_orderkey " +
+                "FROM orders " +
+                "ORDER BY o_orderkey " +
+                "DESC limit 1";
+
+        //Set expectations
+        final long expectedOrderId = 60000;
+        final String queryTag = "dev";
+        final String expectedPoolName = "DevPool";
+        final int expectedRowCount = 1;
+
+        //Build a connection with queryTag
+        final Properties props = Utils.createConnectionProperties(
+                "dfs.drilltestdirtpch01parquet", null, queryTag);
+
+        try(Connection conn = ConnectionPool
+                .createConnection(
+                        DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)),
+                        props);
+            Statement stmt = conn.createStatement();
+            ResultSet res = stmt.executeQuery(query)) {
+            final String queryId = Utils.getQueryID(res); //Get query id
+            LOG.info("QueryID: " + queryId + " - Query: " + query);
+
+            final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile
+
+            //Validate that the query was allowed into the queue
+            Assert.assertEquals(queryProfile.queueName, expectedPoolName,
+                    "QueryID: " + queryId + " - The pool names do not match!");
+            LOG.info("QueryID: " + queryId + ", Queue: " + queryProfile.queueName);
+
+            //Validate the data returned.
+            long rowCount = 0;
+            while(res.next()) {
+                rowCount++;
+                Assert.assertEquals(res.getLong("o_orderkey"), expectedOrderId,
+                        "QueryID: " + queryId + " - OrderId expected did not match");
+
+            }
+            Assert.assertEquals(rowCount, expectedRowCount,
+                    "QueryID: " + queryId + " - Number of rows returned did not match!");
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    /**
+     * Test validates that user in Acl is evaluated and the right queue is picked based on the Acl.
+     *
+     * @throws IOException
+     */
+    @Test(groups = FUNCTIONAL_GROUP)
+    public void testAclSelectorForUser() throws IOException {
+        final String query = "SELECT o_orderkey " +
+                "FROM orders " +
+                "ORDER BY o_orderkey " +
+                "DESC limit 1";
+
+        //Set expectations
+        final long expectedOrderId = 60000;
+        final String expectedPoolName = "TestPool";
+        final int expectedRowCount = 1;
+        final String expectedUser = "bob";
+
+        //Build a connection with only schema
+        final Properties props = Utils.createConnectionProperties(
+                "dfs.drilltestdirtpch01parquet", null, null);
+
+        try(Connection conn = ConnectionPool
+                .createConnection(
+                        DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)),
+                        expectedUser, //Provide username for the connection
+                        null,
+                        props);
+
+            Statement stmt = conn.createStatement();
+            ResultSet res = stmt.executeQuery(query)) {
+            final String queryId = Utils.getQueryID(res); //Get query id
+            LOG.info("Query ID: " + queryId + ", Query: " + query);
+
+            final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile
+
+            //Validate that the query was allowed into the queue
+            Assert.assertEquals(queryProfile.queueName, expectedPoolName, "The pool names do not match!");
+            LOG.info("QueryID: " + queryId + ", Queue: " + queryProfile.queueName);
+
+            long rowCount = 0;
+            while(res.next()) {
+                rowCount++;
+                Assert.assertEquals(res.getLong("o_orderkey"), expectedOrderId,
+                        "OrderId expected did not match");
+
+            }
+            Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!");
+            Assert.assertEquals(queryProfile.user, expectedUser);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    /**
+     * Test validates where multiple queues are eligible to admit a query, based on tag,
+     * but the bestfit queue is selected.
+     *
+     * @throws IOException
+     */
+    @Test(groups = FUNCTIONAL_GROUP)
+    public void testTagSelectBestFitPool() throws IOException {
+        final String query = "SELECT o_orderkey " +
+                "FROM orders " +
+                "ORDER BY o_orderkey " +
+                "DESC limit 1";
+        final String queryTag = "dev,test"; //Set tag such that both dev and test are eligible
+
+        //Set expectations
+        final long expectedOrderId = 60000;
+        final String expectedPoolName = "DevPool"; //As per "bestfit" policy
+        final int expectedRowCount = 1;
+        final String user = "bob";
+
+        final Properties props = Utils.createConnectionProperties(
+                "dfs.drilltestdirtpch01parquet", null, queryTag);
+
+        try(Connection conn = ConnectionPool
+                .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(
+                        drillCluster.getHosts().get(0)),
+                        user,
+                        null,
+                        props); //Create a connection based on hostname and properties
+
+            Statement stmt = conn.createStatement();
+            ResultSet res = stmt.executeQuery(query)) {
+            final String queryId = Utils.getQueryID(res); //Get query id
+            LOG.info("Query ID: " + queryId + ", Query: " + query);
+
+            final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile
+
+            Assert.assertEquals(queryProfile.user, user, "Query user did not match for queryID: " +
+                    queryId + " !");
+
+            //Validate that the query was allowed into the queue
+            Assert.assertEquals(queryProfile.queueName, expectedPoolName, "The pool names do not match!");
+            LOG.info("QueryID: " + queryId + ", Queue: " + queryProfile.queueName);
+
+            long rowCount = 0;
+            while(res.next()) {
+                rowCount++;
+                Assert.assertEquals(res.getLong("o_orderkey"), expectedOrderId,
+                        "OrderId expected did not match");
+
+            }
+            Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!");
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    /**
+     * Test to validate that the query is not admitted since there are no tags and Acls do not match
+     * any of the queues.
+     * Also validates the error message.
+     *
+     * @throws IOException
+     */
+    @Test(groups = FUNCTIONAL_GROUP)
+    public void testUnknownUserDoesNotAllowQuery() throws IOException {
+        final String query = "SELECT o_orderkey " +
+                "FROM orders " +
+                "ORDER BY o_orderkey " +
+                "DESC limit 1";
+        final Properties props = Utils.createConnectionProperties("dfs.drilltestdirtpch01parquet",
+                null, null); //NO Query Tags
+
+        try(Connection conn = ConnectionPool
+                .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)),
+                        "anonymous",
+                        null,
+                        props);
+            Statement stmt = conn.createStatement();
+            ResultSet res = stmt.executeQuery(query)) {
+            final String queryId = Utils.getQueryID(res); //Get query id
+            LOG.info("Query ID: " + queryId + ", Query: " + query);
+
+            Utils.getQueryProfile(queryId); //Get query profile
+            Assert.fail("Did not receive expected exception: " + NO_RESOURCE_POOL_ERROR);
+        } catch (Exception e) {
+            if (e.getMessage().contains(NO_RESOURCE_POOL_ERROR)) {
+                LOG.info("Received expected exception: " + e.getMessage());
+            } else {
+                e.printStackTrace();
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Validates that an unknown tag is not wrongly admitted into the queue.
+     *
+     * @throws IOException
+     */
+    @Test(groups = FUNCTIONAL_GROUP)
+    public void testUnknownTagDoesNotAllowQuery() {
+        final String query = "SELECT o_orderkey " +
+                "FROM orders " +
+                "ORDER BY o_orderkey " +
+                "DESC limit 1";
+        final String queryTag = "marketing"; //This tag is not configured for basic RM template
+        final Properties props = Utils.createConnectionProperties("dfs.drilltestdirtpch01parquet",
+                null, queryTag); //NO Query Tags
+
+        try(Connection conn = ConnectionPool
+                .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)),
+                        props);
+            Statement stmt = conn.createStatement();
+            ResultSet res = stmt.executeQuery(query)) {
+            final String queryId = Utils.getQueryID(res); //Get query id
+            LOG.info("Query ID: " + queryId + ", Query: " + query);
+
+            Utils.getQueryProfile(queryId); //Get query profile
+            Assert.fail("Did not receive expected exception: " + NO_RESOURCE_POOL_ERROR);
+        } catch (Exception e) {
+            if (e.getMessage().contains(NO_RESOURCE_POOL_ERROR)) {
+                LOG.info("Received expected exception: " + e.getMessage());
+            } else {
+                e.printStackTrace();
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/framework/src/test/resources/basic-drill-rm-override.conf b/framework/src/test/resources/basic-drill-rm-override.conf
new file mode 100644
index 0000000..36653e0
--- /dev/null
+++ b/framework/src/test/resources/basic-drill-rm-override.conf
@@ -0,0 +1,38 @@
+drill.exec.rm: {
+  pool_name: "root",
+  memory: 1.0, // 90% of total direct memory allocated to Drill
+  queue_selection_policy: "bestfit", // policy to select queue for a query when multiple queues are eligible
+  child_pools: [
+    {
+      pool_name: "DevPool",
+      memory: 0.5,
+      selector: {
+        tag: "dev",
+      },
+      queue: {
+        max_query_memory_per_node: 1073741824 // supported format regex [0-9]*[kKmMgG]?
+        max_waiting: 10, // default
+        max_admissible: 10, // default
+        max_wait_timeout: 30000, // default in ms
+        wait_for_preferred_nodes: true // default
+      }
+    },
+    {
+      pool_name: "TestPool",
+      memory: 0.5,
+      selector: {
+        acl: {
+          users: ["bob"],
+          group: ["test"]
+        }
+      },
+      queue: {
+        max_query_memory_per_node: 2147483648 // supported format regex [0-9]*[kKmMgG]?
+        max_waiting: 10, // default
+        max_admissible: 10, // default
+        max_wait_timeout: 30000, // default in ms
+        wait_for_preferred_nodes: true // default
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/framework/src/test/resources/sample-drill-rm-override.conf b/framework/src/test/resources/sample-drill-rm-override.conf
index e11d9e5..6af836a 100644
--- a/framework/src/test/resources/sample-drill-rm-override.conf
+++ b/framework/src/test/resources/sample-drill-rm-override.conf
@@ -4,16 +4,17 @@
   queue_selection_policy:"bestfit",
   child_pools:[
     {
-      pool_name:"Devs Resource Pool",
+      pool_name:"DevResourcePool",
       memory:0.75,
       selector:{
         acl:{
-          groups:["dev"]
+          groups:["dev","test","sales:-"]
+          users:["alice","john"]
         }
       },
       child_pools:[
         {
-          pool_name:"Small Query Resource Pool",
+          pool_name:"SmallQueryResourcePool",
           memory:0.20,
           selector:{
             tag:"small"
@@ -25,7 +26,7 @@
           }
         },
         {
-          pool_name:"Large Query Resource Pool",
+          pool_name:"LargeQueryResourcePool",
           memory:0.60,
           selector:{
             tag:"large"
@@ -36,7 +37,7 @@
           }
         },
         {
-          pool_name:"Experimental Query Resource Pool",
+          pool_name:"ExperimentalQueryResourcePool",
           memory:0.20,
           selector:{
             tag :"experimental"
@@ -49,7 +50,7 @@
       ]
     },
     {
-      pool_name:"Marketing Resource Pool",
+      pool_name:"MarketingResourcePool",
       memory:0.25,
       selector:{
         acl :{
diff --git a/framework/testng-java.xml b/framework/testng-java.xml
index 0afc56c..3890701 100644
--- a/framework/testng-java.xml
+++ b/framework/testng-java.xml
@@ -3,10 +3,12 @@
     <test name="Resource Management Tests">
         <classes>
             <class name="org.apache.drill.test.framework.DrillTestFrameworkUnitTests" />
+            <class name="org.apache.drill.test.framework.resourcemanagement.QueueSelectionTests" />
         </classes>
         <groups>
             <run>
                 <include name="unit" />
+                <include name="functional" />
                 <exclude name="broken" />
             </run>
         </groups>