DRILL-7651: Increase timeout for TestLargeFileCompilation to avoid GitHub Action failures and fix concurrent issue in TestTpchDistributedConcurrent
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index ba3bd44..c144c53 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -26,18 +26,25 @@
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.BaseTestQuery.SilentListener;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.TestTools;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.QueryTestUtil;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -49,17 +56,18 @@
* any particular order of execution. We ignore the results.
*/
@Category({SlowTest.class})
-public class TestTpchDistributedConcurrent extends BaseTestQuery {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributedConcurrent.class);
+public class TestTpchDistributedConcurrent extends ClusterTest {
+ private static final Logger logger = LoggerFactory.getLogger(TestTpchDistributedConcurrent.class);
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(360000); // Longer timeout than usual.
+ @Rule
+ public final TestRule TIMEOUT = TestTools.getTimeoutRule(400_000); // 400 secs
/*
* Valid test names taken from TestTpchDistributed. Fuller path prefixes are
* used so that tests may also be taken from other locations -- more variety
* is better as far as this test goes.
*/
- private final static String queryFile[] = {
+ private static final String[] queryFile = {
"queries/tpch/01.sql",
"queries/tpch/03.sql",
"queries/tpch/04.sql",
@@ -80,44 +88,91 @@
"queries/tpch/20.sql",
};
- private final static int TOTAL_QUERIES = 115;
- private final static int CONCURRENT_QUERIES = 15;
-
- private final static Random random = new Random(0xdeadbeef);
- private final static String alterSession = "alter session set `planner.slice_target` = 10";
+ private static final int TOTAL_QUERIES = 115;
+ private static final int CONCURRENT_QUERIES = 15;
+ private static final Random random = new Random(0xdeadbeef);
private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES;
private final Semaphore completionSemaphore = new Semaphore(0);
private final Semaphore submissionSemaphore = new Semaphore(0);
private final Set<UserResultsListener> listeners = Sets.newIdentityHashSet();
+ private final List<FailedQuery> failedQueries = new LinkedList<>();
private Thread testThread = null; // used to interrupt semaphore wait in case of error
+ @BeforeClass
+ public static void setUp() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000);
+ startCluster(builder);
+ }
+
+ @Test
+ public void testConcurrentQueries() {
+ client.alterSession(ExecConstants.SLICE_TARGET, 10);
+
+ testThread = Thread.currentThread();
+ final QuerySubmitter querySubmitter = new QuerySubmitter();
+ querySubmitter.start();
+
+ // Kick off the initial queries. As they complete, they will submit more.
+ submissionSemaphore.release(CONCURRENT_QUERIES);
+
+ // Wait for all the queries to complete.
+ InterruptedException interruptedException = null;
+ try {
+ completionSemaphore.acquire(TOTAL_QUERIES);
+ } catch (InterruptedException e) {
+ interruptedException = e;
+
+ // List the failed queries.
+ for (FailedQuery fq : failedQueries) {
+ logger.error("{} failed with {}", fq.queryFile, fq.userEx);
+ }
+ }
+
+ // Stop the querySubmitter thread.
+ querySubmitter.interrupt();
+
+ if (interruptedException != null) {
+ logger.error("Interruped Exception ", interruptedException);
+ }
+
+ assertNull("Query error caused interruption", interruptedException);
+
+ int nListeners = listeners.size();
+ assertEquals(nListeners + " listeners still exist", 0, nListeners);
+
+ assertEquals("Didn't submit all queries", 0, remainingQueries);
+ assertEquals("Queries failed", 0, failedQueries.size());
+ }
+
+ private void submitRandomQuery() {
+ String filename = queryFile[random.nextInt(queryFile.length)];
+ String query;
+ try {
+ query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' ');
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception", e);
+ }
+ UserResultsListener listener = new ChainingSilentListener(query);
+ queryBuilder()
+ .query(UserBitShared.QueryType.SQL, query)
+ .withListener(listener);
+ synchronized (this) {
+ listeners.add(listener);
+ }
+ }
+
private static class FailedQuery {
final String queryFile;
final UserException userEx;
- public FailedQuery(final String queryFile, final UserException userEx) {
+ public FailedQuery(String queryFile, UserException userEx) {
this.queryFile = queryFile;
this.userEx = userEx;
}
}
- private final List<FailedQuery> failedQueries = new LinkedList<>();
-
- private void submitRandomQuery() {
- final String filename = queryFile[random.nextInt(queryFile.length)];
- final String query;
- try {
- query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' ');
- } catch(IOException e) {
- throw new RuntimeException("Caught exception", e);
- }
- final UserResultsListener listener = new ChainingSilentListener(query);
- client.runQuery(UserBitShared.QueryType.SQL, query, listener);
- synchronized(this) {
- listeners.add(listener);
- }
- }
private class ChainingSilentListener extends SilentListener {
private final String query;
@@ -130,8 +185,7 @@
public void queryCompleted(QueryState state) {
super.queryCompleted(state);
- completionSemaphore.release();
- synchronized(TestTpchDistributedConcurrent.this) {
+ synchronized (TestTpchDistributedConcurrent.this) {
final Object object = listeners.remove(this);
assertNotNull("listener not found", object);
@@ -146,6 +200,7 @@
--remainingQueries;
}
}
+ completionSemaphore.release();
}
@Override
@@ -154,7 +209,7 @@
completionSemaphore.release();
logger.error("submissionFailed for {} \nwith:", query, uex);
- synchronized(TestTpchDistributedConcurrent.this) {
+ synchronized (TestTpchDistributedConcurrent.this) {
final Object object = listeners.remove(this);
assertNotNull("listener not found", object);
failedQueries.add(new FailedQuery(query, uex));
@@ -166,10 +221,10 @@
private class QuerySubmitter extends Thread {
@Override
public void run() {
- while(true) {
+ while (true) {
try {
submissionSemaphore.acquire();
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
logger.error("QuerySubmitter quitting.");
return;
}
@@ -178,50 +233,4 @@
}
}
}
-
- @Test
- public void testConcurrentQueries() throws Exception {
- QueryTestUtil.testRunAndLog(client, UserBitShared.QueryType.SQL, alterSession);
-
- testThread = Thread.currentThread();
- final QuerySubmitter querySubmitter = new QuerySubmitter();
- querySubmitter.start();
-
- // Kick off the initial queries. As they complete, they will submit more.
- submissionSemaphore.release(CONCURRENT_QUERIES);
-
- // Wait for all the queries to complete.
- InterruptedException interruptedException = null;
- try {
- completionSemaphore.acquire(TOTAL_QUERIES);
- } catch(InterruptedException e) {
- interruptedException = e;
-
- // List the failed queries.
- for(final FailedQuery fq : failedQueries) {
- logger.error(String.format("%s failed with %s", fq.queryFile, fq.userEx));
- }
- }
-
- // Stop the querySubmitter thread.
- querySubmitter.interrupt();
-
- if (interruptedException != null) {
- final StackTraceElement[] ste = interruptedException.getStackTrace();
- final StringBuilder sb = new StringBuilder();
- for(StackTraceElement s : ste) {
- sb.append(s.toString());
- sb.append('\n');
- }
- logger.error("Interruped Exception ", interruptedException);
- }
-
- assertNull("Query error caused interruption", interruptedException);
-
- final int nListeners = listeners.size();
- assertEquals(nListeners + " listeners still exist", 0, nListeners);
-
- assertEquals("Didn't submit all queries", 0, remainingQueries);
- assertEquals("Queries failed", 0, failedQueries.size());
- }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 35e58c1..7bd08ec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -21,8 +21,14 @@
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.TestTools;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -30,8 +36,8 @@
import org.junit.rules.TestRule;
@Category({SlowTest.class})
-public class TestLargeFileCompilation extends BaseTestQuery {
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200000); // 200 secs
+public class TestLargeFileCompilation extends ClusterTest {
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200_000); // 200 secs
private static final String LARGE_QUERY_GROUP_BY;
@@ -156,77 +162,107 @@
return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
}
- @Ignore // TODO DRILL-5997
+ @BeforeClass
+ public static void setUp() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000);
+ startCluster(builder);
+ }
+
+ @Before
+ public void setJDK() {
+ client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
+ }
+
+ @After
+ public void resetJDK() {
+ client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ }
+
@Test
public void testTEXT_WRITER() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("use dfs.tmp");
- testNoResult("alter session set `%s`='csv'", ExecConstants.OUTPUT_FORMAT_OPTION);
- testNoResult(LARGE_QUERY_WRITER, "wide_table_csv");
+ try {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+ run("use dfs.tmp");
+ run(LARGE_QUERY_WRITER, "wide_table_csv");
+ } finally {
+ client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+ }
}
@Test
public void testPARQUET_WRITER() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("use dfs.tmp");
- testNoResult("alter session set `%s`='parquet'", ExecConstants.OUTPUT_FORMAT_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_WRITER, "wide_table_parquet");
+ try {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "parquet");
+ run("use dfs.tmp");
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_WRITER, "wide_table_parquet");
+ }
+ } finally {
+ client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+ }
}
- @Ignore // TODO DRILL-5997
@Test
public void testGROUP_BY() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_GROUP_BY);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_GROUP_BY);
+ }
}
@Test
public void testEXTERNAL_SORT() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_ORDER_BY);
+ }
}
@Test
public void testTOP_N_SORT() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY_WITH_LIMIT);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_ORDER_BY_WITH_LIMIT);
+ }
}
@Ignore // TODO DRILL-5997
@Test
public void testFILTER() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_FILTER);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_FILTER);
+ }
}
@Test
public void testClassTransformationOOM() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, QUERY_FILTER);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(QUERY_FILTER);
+ }
}
@Test
public void testProject() throws Exception {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(LARGE_QUERY_SELECT_LIST);
+ }
}
@Test
public void testHashJoin() throws Exception {
String tableName = "wide_table_hash_join";
try {
- setSessionOption("drill.exec.hashjoin.fallback.enabled", true);
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("alter session set `planner.enable_mergejoin` = false");
- testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
- testNoResult("use dfs.tmp");
- testNoResult(LARGE_TABLE_WRITER, tableName);
- testNoResult(QUERY_WITH_JOIN, tableName);
+ client.alterSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY, true);
+ client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false);
+ client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false);
+
+ run("use dfs.tmp");
+ run(LARGE_TABLE_WRITER, tableName);
+ run(QUERY_WITH_JOIN, tableName);
} finally {
- resetSessionOption("planner.enable_mergejoin");
- resetSessionOption("planner.enable_nestedloopjoin");
- resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("drop table if exists %s", tableName);
+ client.resetSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
+ client.resetSession(PlannerSettings.MERGEJOIN.getOptionName());
+ client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName());
+ run("drop table if exists %s", tableName);
}
}
@@ -234,17 +270,15 @@
public void testMergeJoin() throws Exception {
String tableName = "wide_table_merge_join";
try {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("alter session set `planner.enable_hashjoin` = false");
- testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
- testNoResult("use dfs.tmp");
- testNoResult(LARGE_TABLE_WRITER, tableName);
- testNoResult(QUERY_WITH_JOIN, tableName);
+ client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false);
+ client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false);
+ run("use dfs.tmp");
+ run(LARGE_TABLE_WRITER, tableName);
+ run(QUERY_WITH_JOIN, tableName);
} finally {
- resetSessionOption("planner.enable_hashjoin");
- resetSessionOption("planner.enable_nestedloopjoin");
- resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("drop table if exists %s", tableName);
+ client.resetSession(PlannerSettings.HASHJOIN.getOptionName());
+ client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName());
+ run("drop table if exists %s", tableName);
}
}
@@ -252,39 +286,36 @@
public void testNestedLoopJoin() throws Exception {
String tableName = "wide_table_loop_join";
try {
- testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false");
- testNoResult("alter session set `planner.enable_hashjoin` = false");
- testNoResult("alter session set `planner.enable_mergejoin` = false");
- testNoResult("use dfs.tmp");
- testNoResult(LARGE_TABLE_WRITER, tableName);
- testNoResult(QUERY_WITH_JOIN, tableName);
+ client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false);
+ client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false);
+ client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
+ run("use dfs.tmp");
+ run(LARGE_TABLE_WRITER, tableName);
+ run(QUERY_WITH_JOIN, tableName);
} finally {
- resetSessionOption("planner.enable_nljoin_for_scalar_only");
- resetSessionOption("planner.enable_hashjoin");
- resetSessionOption("planner.enable_mergejoin");
- resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
- testNoResult("drop table if exists %s", tableName);
+ client.resetSession(PlannerSettings.HASHJOIN.getOptionName());
+ client.resetSession(PlannerSettings.MERGEJOIN.getOptionName());
+ client.resetSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testJDKHugeStringConstantCompilation() throws Exception {
- try {
- setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
- testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
- } finally {
- resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(HUGE_STRING_CONST_QUERY);
}
}
@Test
public void testJaninoHugeStringConstantCompilation() throws Exception {
try {
- setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
- testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+ client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ run(HUGE_STRING_CONST_QUERY);
+ }
} finally {
- resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION);
}
}
}