blob: b974c7d20e979a7ac403d6524ddc9997ceab7d47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionTaskTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_QUERIES_YEAR_RESOURCE = "/indexer/wikipedia_index_queries_year_query_granularity.json";
private static final String INDEX_QUERIES_HOUR_RESOURCE = "/indexer/wikipedia_index_queries_hour_query_granularity.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json";
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static final String PARALLEL_COMPACTION_TASK = "/indexer/wikipedia_compaction_task_parallel.json";
private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json";
private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json";
private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json";
@Inject
private IntegrationTestingConfig config;
private String fullDatasourceName;
@BeforeMethod
public void setFullDatasourceName(Method method)
{
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName();
}
@Test
public void testCompaction() throws Exception
{
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
}
@Test
public void testCompactionWithSegmentGranularity() throws Exception
{
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH);
}
@Test
public void testCompactionWithSegmentGranularityInGranularitySpec() throws Exception
{
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
}
@Test
public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
// QueryGranularity was SECOND, now we will change it to HOUR (QueryGranularity changed to coarser)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR);
// The original 4 segments should be compacted into 2 new segments since data only has 2 days and the compaction
// segmentGranularity is DAY
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
// QueryGranularity was HOUR, now we will change it to MINUTE (QueryGranularity changed to finer)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.MINUTE);
// There will be no change in number of segments as compaction segmentGranularity is the same and data interval
// is the same. Since QueryGranularity is changed to finer qranularity, the data will remains the same. (data
// will just be bucketed to a finer qranularity but roll up will not be different
// i.e. 2020-10-29T05:00 will just be bucketed to 2020-10-29T05:00:00)
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.MINUTE.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
@Test
public void testParallelHashedCompaction() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
queryHelper.testQueriesFromString(queryResponseTemplate);
String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
Assert.assertTrue(reports != null && reports.size() > 0);
Assert.assertEquals(
2,
reports.values()
.stream()
.filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
.mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsPublished())
.sum()
);
Assert.assertEquals(
4,
reports.values()
.stream()
.filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
.mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsRead())
.sum()
);
}
}
@Test
public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.YEAR, GranularityType.YEAR);
// The original 4 segments should be compacted into 1 new segment
checkNumberOfSegments(1);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_YEAR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.YEAR.name(), 1);
List<String> newIntervals = new ArrayList<>();
for (String interval : expectedIntervalAfterCompaction) {
for (Interval newinterval : GranularityType.YEAR.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
newIntervals.add(newinterval.toString());
}
}
expectedIntervalAfterCompaction = newIntervals;
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
@Test
public void testCompactionWithTimestampDimension() throws Exception
{
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
}
private void loadDataAndCompact(
String indexTask,
String queriesResource,
String compactionResource,
GranularityType newSegmentGranularity
) throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(indexTask, fullDatasourceName);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(queriesResource);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
queryHelper.testQueriesFromString(queryResponseTemplate);
String taskId = compactData(compactionResource, newSegmentGranularity, null);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
if (newSegmentGranularity != null) {
List<String> newIntervals = new ArrayList<>();
for (String interval : expectedIntervalAfterCompaction) {
for (Interval newinterval : newSegmentGranularity.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
newIntervals.add(newinterval.toString());
}
}
expectedIntervalAfterCompaction = newIntervals;
}
checkCompactionIntervals(expectedIntervalAfterCompaction);
Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
Assert.assertTrue(reports != null && reports.size() > 0);
}
}
private String compactData(
String compactionResource,
GranularityType newSegmentGranularity,
GranularityType newQueryGranularity
) throws Exception
{
String template = getResourceAsString(compactionResource);
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
// For the new granularitySpec map
Map<String, String> granularityMap = new HashMap<>();
if (newSegmentGranularity != null) {
granularityMap.put("segmentGranularity", newSegmentGranularity.name());
}
if (newQueryGranularity != null) {
granularityMap.put("queryGranularity", newQueryGranularity.name());
}
template = StringUtils.replace(
template,
"%%GRANULARITY_SPEC%%",
jsonMapper.writeValueAsString(granularityMap)
);
// For the deprecated segment granularity field
if (newSegmentGranularity != null) {
template = StringUtils.replace(
template,
"%%SEGMENT_GRANULARITY%%",
newSegmentGranularity.name()
);
}
final String taskID = indexer.submitTask(template);
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
return taskID;
}
private void checkQueryGranularity(String queryResource, String expectedQueryGranularity, int segmentCount) throws Exception
{
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResource);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResource);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%ANALYSIS_TYPE%%",
"queryGranularity"
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%INTERVALS%%",
"2013-08-31/2013-09-02"
);
List<Map<String, String>> expectedResults = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
Map<String, String> result = new HashMap<>();
result.put("queryGranularity", expectedQueryGranularity);
expectedResults.add(result);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_QUERY_GRANULARITY%%",
jsonMapper.writeValueAsString(expectedResults)
);
queryHelper.testQueriesFromString(queryResponseTemplate);
}
private void checkNumberOfSegments(int numExpectedSegments)
{
ITRetryUtil.retryUntilTrue(
() -> {
int metadataSegmentCount = coordinator.getSegments(fullDatasourceName).size();
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
return metadataSegmentCount == numExpectedSegments;
},
"Segment count check"
);
}
private void checkCompactionIntervals(List<String> expectedIntervals)
{
Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
ITRetryUtil.retryUntilTrue(
() -> {
final Set<String> intervalsAfterCompaction = new HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
System.out.println("ACTUAL: " + intervalsAfterCompaction);
System.out.println("EXPECTED: " + expectedIntervalsSet);
return intervalsAfterCompaction.equals(expectedIntervalsSet);
},
"Compaction interval check"
);
}
private String getQueryResponseTemplate(String queryResourcePath)
{
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResourcePath);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResourcePath);
}
return queryResponseTemplate;
}
}