Fix compaction integration test CI timeout (#10517)

* fix flaky IT Compaction test

* fix flaky IT Compaction test

* test

* test

* test

* test

* Fix compaction integration test CI timeout

* address comments

* test

* test

* Add print logs

* add error msg

* add taskId to logging
diff --git a/.travis.yml b/.travis.yml
index 878bd51..9c6fd07 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -178,7 +178,7 @@
           | node_modules/.bin/diff-test-coverage
           --coverage "**/target/site/jacoco/jacoco.xml"
           --type jacoco
-          --line-coverage 50 
+          --line-coverage 50
           --branch-coverage 50
           --function-coverage 0
           --log-template "coverage-lines-complete"
diff --git a/integration-tests/README.md b/integration-tests/README.md
index 4c96a48..bf9baa8 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -370,6 +370,7 @@
 Refer ITIndexerTest as an example on how to use dependency Injection
 
 ### Running test methods in parallel
+
 By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test 
 class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
 the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" 
@@ -385,3 +386,13 @@
 other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
 test does not use excessive memory starving other concurent task, test does not modify and/or use other task, 
 supervisor, datasource it did not create. 
+
+### Limitation of Druid cluster in Travis environment
+
+By default, integration tests are run in Travis environment on commits made to open PR. These integration test jobs are
+required to pass for a PR to be elligible to be merged. Here are known issues and limitations to the Druid docker cluster
+running in Travis machine that may cause the tests to fail:
+- Number of concurrent running tasks. Although the default Druid cluster config sets the maximum number of tasks (druid.worker.capacity) to 10,
+the actual maximum can be lower depending on the type of the tasks. For example, running 2 range partitioning compaction tasks with 2 subtasks each 
+(for a total of 6 tasks) concurrently can cause the cluster to intermittently fail. This can cause the Travis job to become stuck until it timeouts (50 minutes) 
+and/or terminates after 10 mins of not receiving new output.
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 2b06ba5..f2ca899 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -26,6 +26,8 @@
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
@@ -185,6 +187,40 @@
     }
   }
 
+  public String getTaskLog(String taskId)
+  {
+    return getTaskLog(taskId, -88000);
+  }
+
+  public String getTaskLog(String taskId, long offsetValue)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format("%s%s", getIndexerURL(), StringUtils.format("task/%s/log?offset=%s", StringUtils.urlEncode(taskId), offsetValue))
+      );
+      return response.getContent();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getTaskErrorMessage(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format("%s%s", getIndexerURL(), StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId)))
+      );
+      Map<String, IngestionStatsAndErrorsTaskReport> x = jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, IngestionStatsAndErrorsTaskReport>>() {});
+      return ((IngestionStatsAndErrorsTaskReportData) x.get("ingestionStatsAndErrors").getPayload()).getErrorMsg();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public void waitUntilTaskCompletes(final String taskID)
   {
     waitUntilTaskCompletes(taskID, 10000, 60);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 7138f24..ceaa124 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -77,7 +77,7 @@
   @BeforeMethod
   public void setup() throws Exception
   {
-    // Set comapction slot to 10
+    // Set comapction slot to 5
     updateCompactionTaskSlot(0.5, 10);
     fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
   }
@@ -175,7 +175,7 @@
           "city",
           false
       );
-      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 2);
+      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1);
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(rangePartitionsSpec, 2);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index afe5bbf..3f39be5 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -26,9 +26,11 @@
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.clients.TaskResponseObject;
 import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.testing.utils.TestQueryHelper;
 import org.joda.time.Interval;
@@ -46,6 +48,8 @@
 
 public abstract class AbstractIndexerTest
 {
+  private static final Logger LOG = new Logger(AbstractIndexerTest.class);
+
   @Inject
   protected CoordinatorResourceTestClient coordinator;
   @Inject
@@ -69,6 +73,21 @@
 
   protected void unloadAndKillData(final String dataSource)
   {
+    // Get all failed task logs
+    List<TaskResponseObject> allTasks = indexer.getCompleteTasksForDataSource(dataSource);
+    for (TaskResponseObject task : allTasks) {
+      if (task.getStatus().isFailure()) {
+        LOG.info("------- START Found failed task logging for taskId=" + task.getId() + " -------");
+        LOG.info("Start failed task log:");
+        LOG.info(indexer.getTaskLog(task.getId()));
+        LOG.info("End failed task log.");
+        LOG.info("Start failed task errorMsg:");
+        LOG.info(indexer.getTaskErrorMessage(task.getId()));
+        LOG.info("End failed task errorMsg.");
+        LOG.info("------- END Found failed task logging for taskId=" + task.getId() + " -------");
+      }
+    }
+
     List<String> intervals = coordinator.getSegmentIntervals(dataSource);
 
     // each element in intervals has this form: