blob: 0ad4ddea2b557443d1030953cadd47ab475c165c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.logging.log4j.core.LogEvent;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(Enclosed.class)
public class PartialDimensionCardinalityTaskTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
private static final HashedPartitionsSpec HASHED_PARTITIONS_SPEC = HashedPartitionsSpec.defaultSpec();
public static class ConstructorTest
{
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void requiresForceGuaranteedRollup()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("forceGuaranteedRollup must be set");
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.forceGuaranteedRollup(false)
.partitionsSpec(new DynamicPartitionsSpec(null, null))
.build();
new PartialDimensionCardinalityTaskBuilder()
.tuningConfig(tuningConfig)
.build();
}
@Test
public void requiresHashedPartitions()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("hashed partitionsSpec required");
PartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(null, 1, "a", false);
ParallelIndexTuningConfig tuningConfig =
new ParallelIndexTestingFactory.TuningConfigBuilder().partitionsSpec(partitionsSpec).build();
new PartialDimensionCardinalityTaskBuilder()
.tuningConfig(tuningConfig)
.build();
}
@Test
public void requiresGranularitySpecInputIntervals()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Missing intervals in granularitySpec");
DataSchema dataSchema = ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
new PartialDimensionCardinalityTaskBuilder()
.dataSchema(dataSchema)
.build();
}
@Test
public void serializesDeserializes()
{
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.build();
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
}
@Test
public void hasCorrectPrefixForAutomaticId()
{
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.id(ParallelIndexTestingFactory.AUTOMATIC_ID)
.build();
Assert.assertThat(task.getId(), Matchers.startsWith(PartialDimensionCardinalityTask.TYPE));
}
}
public static class RunTaskTest
{
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public LoggerCaptureRule logger = new LoggerCaptureRule(ParseExceptionHandler.class);
private Capture<SubTaskReport> reportCapture;
private TaskToolbox taskToolbox;
@Before
public void setup()
{
reportCapture = Capture.newInstance();
ParallelIndexSupervisorTaskClient taskClient = EasyMock.mock(ParallelIndexSupervisorTaskClient.class);
taskClient.report(EasyMock.eq(ParallelIndexTestingFactory.SUPERVISOR_TASK_ID), EasyMock.capture(reportCapture));
EasyMock.replay(taskClient);
taskToolbox = EasyMock.mock(TaskToolbox.class);
EasyMock.expect(taskToolbox.getIndexingTmpDir()).andStubReturn(temporaryFolder.getRoot());
EasyMock.expect(taskToolbox.getSupervisorTaskClientFactory()).andReturn(
new IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>()
{
@Override
public ParallelIndexSupervisorTaskClient build(
TaskInfoProvider taskInfoProvider,
String callerId,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
return taskClient;
}
}
);
EasyMock.expect(taskToolbox.getIndexingServiceClient()).andReturn(new NoopIndexingServiceClient());
EasyMock.expect(taskToolbox.getRowIngestionMetersFactory()).andReturn(new DropwizardRowIngestionMetersFactory());
EasyMock.replay(taskToolbox);
}
@Test
public void requiresPartitionDimension() throws Exception
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("partitionDimension must be specified");
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(
new ParallelIndexTestingFactory.SingleDimensionPartitionsSpecBuilder().partitionDimension(null).build()
)
.build();
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.tuningConfig(tuningConfig)
.build();
task.runTask(taskToolbox);
}
@Test
public void logsParseExceptionsIfEnabled() throws Exception
{
long invalidTimestamp = Long.MAX_VALUE;
InputSource inlineInputSource = new InlineInputSource(
ParallelIndexTestingFactory.createRow(invalidTimestamp, "a")
);
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(HASHED_PARTITIONS_SPEC)
.logParseExceptions(true)
.build();
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.inputSource(inlineInputSource)
.tuningConfig(tuningConfig)
.build();
task.runTask(taskToolbox);
List<LogEvent> logEvents = logger.getLogEvents();
Assert.assertEquals(1, logEvents.size());
String logMessage = logEvents.get(0).getMessage().getFormattedMessage();
Assert.assertThat(logMessage, Matchers.containsString("Encountered parse exception"));
}
@Test
public void doesNotLogParseExceptionsIfDisabled() throws Exception
{
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(HASHED_PARTITIONS_SPEC)
.logParseExceptions(false)
.build();
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.tuningConfig(tuningConfig)
.build();
task.runTask(taskToolbox);
Assert.assertEquals(Collections.emptyList(), logger.getLogEvents());
}
@Test
public void failsWhenTooManyParseExceptions() throws Exception
{
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(HASHED_PARTITIONS_SPEC)
.maxParseExceptions(0)
.build();
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.tuningConfig(tuningConfig)
.build();
exception.expect(RuntimeException.class);
exception.expectMessage("Max parse exceptions[0] exceeded");
task.runTask(taskToolbox);
}
@Test
public void sendsCorrectReportWhenRowHasMultipleDimensionValues()
{
InputSource inlineInputSource = new InlineInputSource(
ParallelIndexTestingFactory.createRow(0, Arrays.asList("a", "b"))
);
PartialDimensionCardinalityTaskBuilder taskBuilder = new PartialDimensionCardinalityTaskBuilder()
.inputSource(inlineInputSource);
DimensionCardinalityReport report = runTask(taskBuilder);
Assert.assertEquals(ParallelIndexTestingFactory.ID, report.getTaskId());
Map<Interval, byte[]> intervalToCardinalities = report.getIntervalToCardinalities();
byte[] hllSketchBytes = Iterables.getOnlyElement(intervalToCardinalities.values());
HllSketch hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
Assert.assertNotNull(hllSketch);
Assert.assertEquals(1L, (long) hllSketch.getEstimate());
}
@Test
public void sendsCorrectReportWithMultipleIntervalsInData()
{
// Segment granularity is DAY, query granularity is HOUR
InputSource inlineInputSource = new InlineInputSource(
ParallelIndexTestingFactory.createRow(DateTimes.of("1970-01-01T00:00:00.001Z").getMillis(), "a") + "\n" +
ParallelIndexTestingFactory.createRow(DateTimes.of("1970-01-02T03:46:40.000Z").getMillis(), "b") + "\n" +
ParallelIndexTestingFactory.createRow(DateTimes.of("1970-01-02T03:46:40.000Z").getMillis(), "c") + "\n" +
ParallelIndexTestingFactory.createRow(DateTimes.of("1970-01-02T04:02:40.000Z").getMillis(), "b") + "\n" +
ParallelIndexTestingFactory.createRow(DateTimes.of("1970-01-02T05:19:10.000Z").getMillis(), "b")
);
PartialDimensionCardinalityTaskBuilder taskBuilder = new PartialDimensionCardinalityTaskBuilder()
.inputSource(inlineInputSource);
DimensionCardinalityReport report = runTask(taskBuilder);
Assert.assertEquals(ParallelIndexTestingFactory.ID, report.getTaskId());
Map<Interval, byte[]> intervalToCardinalities = report.getIntervalToCardinalities();
Assert.assertEquals(2, intervalToCardinalities.size());
byte[] hllSketchBytes;
HllSketch hllSketch;
hllSketchBytes = intervalToCardinalities.get(Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"));
hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
Assert.assertNotNull(hllSketch);
Assert.assertEquals(1L, (long) hllSketch.getEstimate());
hllSketchBytes = intervalToCardinalities.get(Intervals.of("1970-01-02T00:00:00.000Z/1970-01-03T00:00:00.000Z"));
hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
Assert.assertNotNull(hllSketch);
Assert.assertEquals(4L, (long) hllSketch.getEstimate());
}
@Test
public void returnsSuccessIfNoExceptions() throws Exception
{
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.build();
TaskStatus taskStatus = task.runTask(taskToolbox);
Assert.assertEquals(ParallelIndexTestingFactory.ID, taskStatus.getId());
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
private DimensionCardinalityReport runTask(PartialDimensionCardinalityTaskBuilder taskBuilder)
{
try {
taskBuilder.build()
.runTask(taskToolbox);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return (DimensionCardinalityReport) reportCapture.getValue();
}
}
private static class PartialDimensionCardinalityTaskBuilder
{
private static final InputFormat INPUT_FORMAT = ParallelIndexTestingFactory.getInputFormat();
private String id = ParallelIndexTestingFactory.ID;
private InputSource inputSource = new InlineInputSource("row-with-invalid-timestamp");
private ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(HASHED_PARTITIONS_SPEC)
.build();
private DataSchema dataSchema =
ParallelIndexTestingFactory
.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
.withGranularitySpec(
new UniformGranularitySpec(
Granularities.DAY,
Granularities.HOUR,
ImmutableList.of(Intervals.of("1970-01-01T00:00:00Z/P10D"))
)
);
@SuppressWarnings("SameParameterValue")
PartialDimensionCardinalityTaskBuilder id(String id)
{
this.id = id;
return this;
}
PartialDimensionCardinalityTaskBuilder inputSource(InputSource inputSource)
{
this.inputSource = inputSource;
return this;
}
PartialDimensionCardinalityTaskBuilder tuningConfig(ParallelIndexTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
}
PartialDimensionCardinalityTaskBuilder dataSchema(DataSchema dataSchema)
{
this.dataSchema = dataSchema;
return this;
}
PartialDimensionCardinalityTask build()
{
ParallelIndexIngestionSpec ingestionSpec =
ParallelIndexTestingFactory.createIngestionSpec(inputSource, INPUT_FORMAT, tuningConfig, dataSchema);
return new PartialDimensionCardinalityTask(
id,
ParallelIndexTestingFactory.GROUP_ID,
ParallelIndexTestingFactory.TASK_RESOURCE,
ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
ParallelIndexTestingFactory.NUM_ATTEMPTS,
ingestionSpec,
ParallelIndexTestingFactory.CONTEXT,
OBJECT_MAPPER
);
}
}
}