DRILL-7651: Increase timeout for TestLargeFileCompilation to avoid GitHub Action failures and fix concurrent issue in TestTpchDistributedConcurrent
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index ba3bd44..c144c53 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -26,18 +26,25 @@
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.BaseTestQuery.SilentListener;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestTools;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.QueryTestUtil;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -49,17 +56,18 @@
  * any particular order of execution. We ignore the results.
  */
 @Category({SlowTest.class})
-public class TestTpchDistributedConcurrent extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributedConcurrent.class);
+public class TestTpchDistributedConcurrent extends ClusterTest {
+  private static final Logger logger = LoggerFactory.getLogger(TestTpchDistributedConcurrent.class);
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(360000); // Longer timeout than usual.
+  @Rule
+  public final TestRule TIMEOUT = TestTools.getTimeoutRule(400_000); // 400 secs
 
   /*
    * Valid test names taken from TestTpchDistributed. Fuller path prefixes are
    * used so that tests may also be taken from other locations -- more variety
    * is better as far as this test goes.
    */
-  private final static String queryFile[] = {
+  private static final String[] queryFile = {
     "queries/tpch/01.sql",
     "queries/tpch/03.sql",
     "queries/tpch/04.sql",
@@ -80,44 +88,91 @@
     "queries/tpch/20.sql",
   };
 
-  private final static int TOTAL_QUERIES = 115;
-  private final static int CONCURRENT_QUERIES = 15;
-
-  private final static Random random = new Random(0xdeadbeef);
-  private final static String alterSession = "alter session set `planner.slice_target` = 10";
+  private static final int TOTAL_QUERIES = 115;
+  private static final int CONCURRENT_QUERIES = 15;
+  private static final Random random = new Random(0xdeadbeef);
 
   private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES;
   private final Semaphore completionSemaphore = new Semaphore(0);
   private final Semaphore submissionSemaphore = new Semaphore(0);
   private final Set<UserResultsListener> listeners = Sets.newIdentityHashSet();
+  private final List<FailedQuery> failedQueries = new LinkedList<>();
   private Thread testThread = null; // used to interrupt semaphore wait in case of error
 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testConcurrentQueries() {
+    client.alterSession(ExecConstants.SLICE_TARGET, 10);
+
+    testThread = Thread.currentThread();
+    final QuerySubmitter querySubmitter = new QuerySubmitter();
+    querySubmitter.start();
+
+    // Kick off the initial queries. As they complete, they will submit more.
+    submissionSemaphore.release(CONCURRENT_QUERIES);
+
+    // Wait for all the queries to complete.
+    InterruptedException interruptedException = null;
+    try {
+      completionSemaphore.acquire(TOTAL_QUERIES);
+    } catch (InterruptedException e) {
+      interruptedException = e;
+
+      // List the failed queries.
+      for (FailedQuery fq : failedQueries) {
+        logger.error("{} failed with {}", fq.queryFile, fq.userEx);
+      }
+    }
+
+    // Stop the querySubmitter thread.
+    querySubmitter.interrupt();
+
+    if (interruptedException != null) {
+      logger.error("Interruped Exception ", interruptedException);
+    }
+
+    assertNull("Query error caused interruption", interruptedException);
+
+    int nListeners = listeners.size();
+    assertEquals(nListeners + " listeners still exist", 0, nListeners);
+
+    assertEquals("Didn't submit all queries", 0, remainingQueries);
+    assertEquals("Queries failed", 0, failedQueries.size());
+  }
+
+  private void submitRandomQuery() {
+    String filename = queryFile[random.nextInt(queryFile.length)];
+    String query;
+    try {
+      query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' ');
+    } catch (IOException e) {
+      throw new RuntimeException("Caught exception", e);
+    }
+    UserResultsListener listener = new ChainingSilentListener(query);
+    queryBuilder()
+        .query(UserBitShared.QueryType.SQL, query)
+        .withListener(listener);
+    synchronized (this) {
+      listeners.add(listener);
+    }
+  }
+
   private static class FailedQuery {
     final String queryFile;
     final UserException userEx;
 
-    public FailedQuery(final String queryFile, final UserException userEx) {
+    public FailedQuery(String queryFile, UserException userEx) {
       this.queryFile = queryFile;
       this.userEx = userEx;
     }
   }
 
-  private final List<FailedQuery> failedQueries = new LinkedList<>();
-
-  private void submitRandomQuery() {
-    final String filename = queryFile[random.nextInt(queryFile.length)];
-    final String query;
-    try {
-      query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' ');
-    } catch(IOException e) {
-      throw new RuntimeException("Caught exception", e);
-    }
-    final UserResultsListener listener = new ChainingSilentListener(query);
-    client.runQuery(UserBitShared.QueryType.SQL, query, listener);
-    synchronized(this) {
-      listeners.add(listener);
-    }
-  }
 
   private class ChainingSilentListener extends SilentListener {
     private final String query;
@@ -130,8 +185,7 @@
     public void queryCompleted(QueryState state) {
       super.queryCompleted(state);
 
-      completionSemaphore.release();
-      synchronized(TestTpchDistributedConcurrent.this) {
+      synchronized (TestTpchDistributedConcurrent.this) {
         final Object object = listeners.remove(this);
         assertNotNull("listener not found", object);
 
@@ -146,6 +200,7 @@
           --remainingQueries;
         }
       }
+      completionSemaphore.release();
     }
 
     @Override
@@ -154,7 +209,7 @@
 
       completionSemaphore.release();
       logger.error("submissionFailed for {} \nwith:", query, uex);
-      synchronized(TestTpchDistributedConcurrent.this) {
+      synchronized (TestTpchDistributedConcurrent.this) {
         final Object object = listeners.remove(this);
         assertNotNull("listener not found", object);
         failedQueries.add(new FailedQuery(query, uex));
@@ -166,10 +221,10 @@
   private class QuerySubmitter extends Thread {
     @Override
     public void run() {
-      while(true) {
+      while (true) {
         try {
           submissionSemaphore.acquire();
-        } catch(InterruptedException e) {
+        } catch (InterruptedException e) {
           logger.error("QuerySubmitter quitting.");
           return;
         }
@@ -178,50 +233,4 @@
       }
     }
   }
-
-  @Test
-  public void testConcurrentQueries() throws Exception {
-    QueryTestUtil.testRunAndLog(client, UserBitShared.QueryType.SQL, alterSession);
-
-    testThread = Thread.currentThread();
-    final QuerySubmitter querySubmitter = new QuerySubmitter();
-    querySubmitter.start();
-
-    // Kick off the initial queries. As they complete, they will submit more.
-    submissionSemaphore.release(CONCURRENT_QUERIES);
-
-    // Wait for all the queries to complete.
-    InterruptedException interruptedException = null;
-    try {
-      completionSemaphore.acquire(TOTAL_QUERIES);
-    } catch(InterruptedException e) {
-      interruptedException = e;
-
-      // List the failed queries.
-      for(final FailedQuery fq : failedQueries) {
-        logger.error(String.format("%s failed with %s", fq.queryFile, fq.userEx));
-      }
-    }
-
-    // Stop the querySubmitter thread.
-    querySubmitter.interrupt();
-
-    if (interruptedException != null) {
-      final StackTraceElement[] ste = interruptedException.getStackTrace();
-      final StringBuilder sb = new StringBuilder();
-      for(StackTraceElement s : ste) {
-        sb.append(s.toString());
-        sb.append('\n');
-      }
-      logger.error("Interruped Exception ", interruptedException);
-    }
-
-    assertNull("Query error caused interruption", interruptedException);
-
-    final int nListeners = listeners.size();
-    assertEquals(nListeners + " listeners still exist", 0, nListeners);
-
-    assertEquals("Didn't submit all queries", 0, remainingQueries);
-    assertEquals("Queries failed", 0, failedQueries.size());
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 35e58c1..7bd08ec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -21,8 +21,14 @@
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestTools;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -30,8 +36,8 @@
 import org.junit.rules.TestRule;
 
 @Category({SlowTest.class})
-public class TestLargeFileCompilation extends BaseTestQuery {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200000); // 200 secs
+public class TestLargeFileCompilation extends ClusterTest {
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200_000); // 200 secs
 
   private static final String LARGE_QUERY_GROUP_BY;
 
@@ -156,77 +162,107 @@
     return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
   }
 
-  @Ignore // TODO DRILL-5997
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000);
+    startCluster(builder);
+  }
+
+  @Before
+  public void setJDK() {
+    client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
+  }
+
+  @After
+  public void resetJDK() {
+    client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+  }
+
   @Test
   public void testTEXT_WRITER() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult("use dfs.tmp");
-    testNoResult("alter session set `%s`='csv'", ExecConstants.OUTPUT_FORMAT_OPTION);
-    testNoResult(LARGE_QUERY_WRITER, "wide_table_csv");
+    try {
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+      run("use dfs.tmp");
+      run(LARGE_QUERY_WRITER, "wide_table_csv");
+    } finally {
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+    }
   }
 
   @Test
   public void testPARQUET_WRITER() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult("use dfs.tmp");
-    testNoResult("alter session set `%s`='parquet'", ExecConstants.OUTPUT_FORMAT_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_WRITER, "wide_table_parquet");
+    try {
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "parquet");
+      run("use dfs.tmp");
+      for (int i = 0; i < ITERATION_COUNT; i++) {
+        run(LARGE_QUERY_WRITER, "wide_table_parquet");
+      }
+    } finally {
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+    }
   }
 
-  @Ignore // TODO DRILL-5997
   @Test
   public void testGROUP_BY() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_GROUP_BY);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(LARGE_QUERY_GROUP_BY);
+    }
   }
 
   @Test
   public void testEXTERNAL_SORT() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(LARGE_QUERY_ORDER_BY);
+    }
   }
 
   @Test
   public void testTOP_N_SORT() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY_WITH_LIMIT);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(LARGE_QUERY_ORDER_BY_WITH_LIMIT);
+    }
   }
 
   @Ignore // TODO DRILL-5997
   @Test
   public void testFILTER() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_FILTER);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(LARGE_QUERY_FILTER);
+    }
   }
 
   @Test
   public void testClassTransformationOOM() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, QUERY_FILTER);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(QUERY_FILTER);
+    }
   }
 
   @Test
   public void testProject() throws Exception {
-    testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-    testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(LARGE_QUERY_SELECT_LIST);
+    }
   }
 
   @Test
   public void testHashJoin() throws Exception {
     String tableName = "wide_table_hash_join";
     try {
-      setSessionOption("drill.exec.hashjoin.fallback.enabled", true);
-      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("alter session set `planner.enable_mergejoin` = false");
-      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
-      testNoResult("use dfs.tmp");
-      testNoResult(LARGE_TABLE_WRITER, tableName);
-      testNoResult(QUERY_WITH_JOIN, tableName);
+      client.alterSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY, true);
+      client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false);
+      client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false);
+
+      run("use dfs.tmp");
+      run(LARGE_TABLE_WRITER, tableName);
+      run(QUERY_WITH_JOIN, tableName);
     } finally {
-      resetSessionOption("planner.enable_mergejoin");
-      resetSessionOption("planner.enable_nestedloopjoin");
-      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("drop table if exists %s", tableName);
+      client.resetSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
+      client.resetSession(PlannerSettings.MERGEJOIN.getOptionName());
+      client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName());
+      run("drop table if exists %s", tableName);
     }
   }
 
@@ -234,17 +270,15 @@
   public void testMergeJoin() throws Exception {
     String tableName = "wide_table_merge_join";
     try {
-      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("alter session set `planner.enable_hashjoin` = false");
-      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
-      testNoResult("use dfs.tmp");
-      testNoResult(LARGE_TABLE_WRITER, tableName);
-      testNoResult(QUERY_WITH_JOIN, tableName);
+      client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false);
+      client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false);
+      run("use dfs.tmp");
+      run(LARGE_TABLE_WRITER, tableName);
+      run(QUERY_WITH_JOIN, tableName);
     } finally {
-      resetSessionOption("planner.enable_hashjoin");
-      resetSessionOption("planner.enable_nestedloopjoin");
-      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("drop table if exists %s", tableName);
+      client.resetSession(PlannerSettings.HASHJOIN.getOptionName());
+      client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName());
+      run("drop table if exists %s", tableName);
     }
   }
 
@@ -252,39 +286,36 @@
   public void testNestedLoopJoin() throws Exception {
     String tableName = "wide_table_loop_join";
     try {
-      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false");
-      testNoResult("alter session set `planner.enable_hashjoin` = false");
-      testNoResult("alter session set `planner.enable_mergejoin` = false");
-      testNoResult("use dfs.tmp");
-      testNoResult(LARGE_TABLE_WRITER, tableName);
-      testNoResult(QUERY_WITH_JOIN, tableName);
+      client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false);
+      client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false);
+      client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
+      run("use dfs.tmp");
+      run(LARGE_TABLE_WRITER, tableName);
+      run(QUERY_WITH_JOIN, tableName);
     } finally {
-      resetSessionOption("planner.enable_nljoin_for_scalar_only");
-      resetSessionOption("planner.enable_hashjoin");
-      resetSessionOption("planner.enable_mergejoin");
-      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
-      testNoResult("drop table if exists %s", tableName);
+      client.resetSession(PlannerSettings.HASHJOIN.getOptionName());
+      client.resetSession(PlannerSettings.MERGEJOIN.getOptionName());
+      client.resetSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testJDKHugeStringConstantCompilation() throws Exception {
-    try {
-      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
-      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
-    } finally {
-      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      run(HUGE_STRING_CONST_QUERY);
     }
   }
 
   @Test
   public void testJaninoHugeStringConstantCompilation() throws Exception {
     try {
-      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
-      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+      client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
+      for (int i = 0; i < ITERATION_COUNT; i++) {
+        run(HUGE_STRING_CONST_QUERY);
+      }
     } finally {
-      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION);
     }
   }
 }