Fix compaction integration test CI timeout (#10517)
* fix flaky IT Compaction test
* fix flaky IT Compaction test
* test
* test
* test
* test
* Fix compaction integration test CI timeout
* address comments
* test
* test
* Add print logs
* add error msg
* add taskId to logging
diff --git a/.travis.yml b/.travis.yml
index 878bd51..9c6fd07 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -178,7 +178,7 @@
| node_modules/.bin/diff-test-coverage
--coverage "**/target/site/jacoco/jacoco.xml"
--type jacoco
- --line-coverage 50
+ --line-coverage 50
--branch-coverage 50
--function-coverage 0
--log-template "coverage-lines-complete"
diff --git a/integration-tests/README.md b/integration-tests/README.md
index 4c96a48..bf9baa8 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -370,6 +370,7 @@
Refer ITIndexerTest as an example on how to use dependency Injection
### Running test methods in parallel
+
By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test
class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests"
@@ -385,3 +386,13 @@
other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
test does not use excessive memory starving other concurent task, test does not modify and/or use other task,
supervisor, datasource it did not create.
+
+### Limitation of Druid cluster in Travis environment
+
+By default, integration tests are run in Travis environment on commits made to open PR. These integration test jobs are
+required to pass for a PR to be elligible to be merged. Here are known issues and limitations to the Druid docker cluster
+running in Travis machine that may cause the tests to fail:
+- Number of concurrent running tasks. Although the default Druid cluster config sets the maximum number of tasks (druid.worker.capacity) to 10,
+the actual maximum can be lower depending on the type of the tasks. For example, running 2 range partitioning compaction tasks with 2 subtasks each
+(for a total of 6 tasks) concurrently can cause the cluster to intermittently fail. This can cause the Travis job to become stuck until it timeouts (50 minutes)
+and/or terminates after 10 mins of not receiving new output.
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 2b06ba5..f2ca899 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -26,6 +26,8 @@
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
@@ -185,6 +187,40 @@
}
}
+ public String getTaskLog(String taskId)
+ {
+ return getTaskLog(taskId, -88000);
+ }
+
+ public String getTaskLog(String taskId, long offsetValue)
+ {
+ try {
+ StatusResponseHolder response = makeRequest(
+ HttpMethod.GET,
+ StringUtils.format("%s%s", getIndexerURL(), StringUtils.format("task/%s/log?offset=%s", StringUtils.urlEncode(taskId), offsetValue))
+ );
+ return response.getContent();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getTaskErrorMessage(String taskId)
+ {
+ try {
+ StatusResponseHolder response = makeRequest(
+ HttpMethod.GET,
+ StringUtils.format("%s%s", getIndexerURL(), StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId)))
+ );
+ Map<String, IngestionStatsAndErrorsTaskReport> x = jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, IngestionStatsAndErrorsTaskReport>>() {});
+ return ((IngestionStatsAndErrorsTaskReportData) x.get("ingestionStatsAndErrors").getPayload()).getErrorMsg();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void waitUntilTaskCompletes(final String taskID)
{
waitUntilTaskCompletes(taskID, 10000, 60);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 7138f24..ceaa124 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -77,7 +77,7 @@
@BeforeMethod
public void setup() throws Exception
{
- // Set comapction slot to 10
+ // Set comapction slot to 5
updateCompactionTaskSlot(0.5, 10);
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}
@@ -175,7 +175,7 @@
"city",
false
);
- submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 2);
+ submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index afe5bbf..3f39be5 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -26,9 +26,11 @@
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
@@ -46,6 +48,8 @@
public abstract class AbstractIndexerTest
{
+ private static final Logger LOG = new Logger(AbstractIndexerTest.class);
+
@Inject
protected CoordinatorResourceTestClient coordinator;
@Inject
@@ -69,6 +73,21 @@
protected void unloadAndKillData(final String dataSource)
{
+ // Get all failed task logs
+ List<TaskResponseObject> allTasks = indexer.getCompleteTasksForDataSource(dataSource);
+ for (TaskResponseObject task : allTasks) {
+ if (task.getStatus().isFailure()) {
+ LOG.info("------- START Found failed task logging for taskId=" + task.getId() + " -------");
+ LOG.info("Start failed task log:");
+ LOG.info(indexer.getTaskLog(task.getId()));
+ LOG.info("End failed task log.");
+ LOG.info("Start failed task errorMsg:");
+ LOG.info(indexer.getTaskErrorMessage(task.getId()));
+ LOG.info("End failed task errorMsg.");
+ LOG.info("------- END Found failed task logging for taskId=" + task.getId() + " -------");
+ }
+ }
+
List<String> intervals = coordinator.getSegmentIntervals(dataSource);
// each element in intervals has this form: