blob: d226dc292e626764449da3f496c465f0bc2e985a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class IngestSegmentFirehoseFactoryTimelineTest
{
private static final String DATA_SOURCE = "foo";
private static final String TIME_COLUMN = "t";
private static final String[] DIMENSIONS = new String[]{"d1"};
private static final String[] METRICS = new String[]{"m1"};
// Must decorate the parser, since IngestSegmentFirehoseFactory will undecorate it.
private static final InputRowParser<Map<String, Object>> ROW_PARSER = TransformSpec.NONE.decorate(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)),
null,
null
),
null,
null,
null
)
)
);
private final IngestSegmentFirehoseFactory factory;
private final File tmpDir;
private final int expectedCount;
private final long expectedSum;
private final int segmentCount;
private static final ObjectMapper MAPPER;
private static final IndexIO INDEX_IO;
private static final IndexMergerV9 INDEX_MERGER_V9;
static {
TestUtils testUtils = new TestUtils();
MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_IO = testUtils.getTestIndexIO();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
}
public IngestSegmentFirehoseFactoryTimelineTest(
String name,
IngestSegmentFirehoseFactory factory,
File tmpDir,
int expectedCount,
long expectedSum,
int segmentCount
)
{
this.factory = factory;
this.tmpDir = tmpDir;
this.expectedCount = expectedCount;
this.expectedSum = expectedSum;
this.segmentCount = segmentCount;
}
@Test
public void test() throws Exception
{
// Junit 4.12 doesn't have a good way to run tearDown after multiple tests in a Parameterized
// class run. (Junit 4.13 adds @AfterParam but isn't released yet.) Fake it by just running
// "tests" in series inside one @Test.
testSimple();
testSplit();
}
private void testSimple() throws Exception
{
int count = 0;
long sum = 0;
try (final Firehose firehose = factory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
sum += row.getMetric(METRICS[0]).longValue();
}
}
Assert.assertEquals("count", expectedCount, count);
Assert.assertEquals("sum", expectedSum, sum);
}
private void testSplit() throws Exception
{
Assert.assertTrue(factory.isSplittable());
final int numSplits = factory.getNumSplits(null);
// We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte.
Assert.assertEquals(segmentCount, numSplits);
final List<InputSplit<List<WindowedSegmentId>>> splits =
factory.getSplits(null).collect(Collectors.toList());
Assert.assertEquals(numSplits, splits.size());
int count = 0;
long sum = 0;
for (InputSplit<List<WindowedSegmentId>> split : splits) {
final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
factory.withSplit(split);
try (final Firehose firehose = splitFactory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
sum += row.getMetric(METRICS[0]).longValue();
}
}
}
Assert.assertEquals("count", expectedCount, count);
Assert.assertEquals("sum", expectedSum, sum);
}
@After
public void tearDown() throws Exception
{
FileUtils.deleteDirectory(tmpDir);
}
private static TestCase tc(
String intervalString,
int expectedCount,
long expectedSum,
DataSegmentMaker... segmentMakers
)
{
final File tmpDir = FileUtils.createTempDir();
final Set<DataSegment> segments = new HashSet<>();
for (DataSegmentMaker segmentMaker : segmentMakers) {
segments.add(segmentMaker.make(tmpDir));
}
return new TestCase(
tmpDir,
Intervals.of(intervalString),
expectedCount,
expectedSum,
segments
);
}
private static DataSegmentMaker ds(
String intervalString,
String version,
int partitionNum,
InputRow... rows
)
{
return new DataSegmentMaker(Intervals.of(intervalString), version, partitionNum, Arrays.asList(rows));
}
private static InputRow ir(String timeString, long metricValue)
{
return new MapBasedInputRow(
DateTimes.of(timeString).getMillis(),
Arrays.asList(DIMENSIONS),
ImmutableMap.of(
TIME_COLUMN, DateTimes.of(timeString).toString(),
DIMENSIONS[0], "bar",
METRICS[0], metricValue
)
);
}
private static Map<String, Object> persist(File tmpDir, InputRow... rows)
{
final File persistDir = new File(tmpDir, UUID.randomUUID().toString());
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(new LongSumAggregatorFactory(METRICS[0], METRICS[0]))
.build();
final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(rows.length)
.build();
for (InputRow row : rows) {
try {
index.add(row);
}
catch (IndexSizeExceededException e) {
throw new RuntimeException(e);
}
}
try {
INDEX_MERGER_V9.persist(index, persistDir, new IndexSpec(), null);
}
catch (IOException e) {
throw new RuntimeException(e);
}
return ImmutableMap.of(
"type", "local",
"path", persistDir.getAbsolutePath()
);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
final List<TestCase> testCases = ImmutableList.of(
tc(
"2000/2000T02", 3, 7,
ds("2000/2000T01", "v1", 0, ir("2000", 1), ir("2000T00:01", 2)),
ds("2000T01/2000T02", "v1", 0, ir("2000T01", 4))
) /* Adjacent segments */,
tc(
"2000/2000T02", 3, 7,
ds("2000/2000T02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8)),
ds("2000T01/2000T02", "v2", 0, ir("2000T01:01", 4))
) /* 1H segment overlaid on top of 2H segment */,
tc(
"2000/2000-01-02", 4, 23,
ds("2000/2000-01-02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8), ir("2000T02", 16)),
ds("2000T01/2000T02", "v2", 0, ir("2000T01:01", 4))
) /* 1H segment overlaid on top of 1D segment */,
tc(
"2000/2000T02", 4, 15,
ds("2000/2000T02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8)),
ds("2000/2000T02", "v1", 1, ir("2000T01:01", 4))
) /* Segment set with two segments for the same interval */,
tc(
"2000T01/2000T02", 1, 2,
ds("2000/2000T03", "v1", 0, ir("2000", 1), ir("2000T01", 2), ir("2000T02", 4))
) /* Segment wider than desired interval */,
tc(
"2000T02/2000T04", 2, 12,
ds("2000/2000T03", "v1", 0, ir("2000", 1), ir("2000T01", 2), ir("2000T02", 4)),
ds("2000T03/2000T04", "v1", 0, ir("2000T03", 8))
) /* Segment intersecting desired interval */
);
final List<Object[]> constructors = new ArrayList<>();
for (final TestCase testCase : testCases) {
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
final CoordinatorClient cc = new CoordinatorClient(null, null)
{
@Override
public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(
String dataSource,
List<Interval> intervals
)
{
// Expect the interval we asked for
if (intervals.equals(ImmutableList.of(testCase.interval))) {
return ImmutableSet.copyOf(testCase.segments);
} else {
throw new IllegalArgumentException("BAD");
}
}
@Override
public DataSegment fetchUsedSegment(String dataSource, String segmentId)
{
return testCase.segments
.stream()
.filter(s -> s.getId().toString().equals(segmentId))
.findAny()
.get(); // throwing if not found is exactly what the real code does
}
};
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
DATA_SOURCE,
testCase.interval,
null,
TrueDimFilter.instance(),
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
// Split as much as possible
1L,
INDEX_IO,
cc,
slf,
retryPolicyFactory
);
constructors.add(
new Object[]{
testCase.toString(),
factory,
testCase.tmpDir,
testCase.expectedCount,
testCase.expectedSum,
testCase.segments.size()
}
);
}
return constructors;
}
private static class TestCase
{
final File tmpDir;
final Interval interval;
final int expectedCount;
final long expectedSum;
final Set<DataSegment> segments;
public TestCase(
File tmpDir,
Interval interval,
int expectedCount,
long expectedSum,
Set<DataSegment> segments
)
{
this.tmpDir = tmpDir;
this.interval = interval;
this.expectedCount = expectedCount;
this.expectedSum = expectedSum;
this.segments = segments;
}
@Override
public String toString()
{
final List<SegmentId> segmentIds = new ArrayList<>();
for (DataSegment segment : segments) {
segmentIds.add(segment.getId());
}
return "TestCase{" +
"interval=" + interval +
", expectedCount=" + expectedCount +
", expectedSum=" + expectedSum +
", segments=" + segmentIds +
'}';
}
}
private static class DataSegmentMaker
{
final Interval interval;
final String version;
final int partitionNum;
final List<InputRow> rows;
public DataSegmentMaker(
Interval interval,
String version,
int partitionNum,
List<InputRow> rows
)
{
this.interval = interval;
this.version = version;
this.partitionNum = partitionNum;
this.rows = rows;
}
public DataSegment make(File tmpDir)
{
final Map<String, Object> loadSpec = persist(tmpDir, Iterables.toArray(rows, InputRow.class));
return new DataSegment(
DATA_SOURCE,
interval,
version,
loadSpec,
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
new LinearShardSpec(partitionNum),
-1,
2L
);
}
}
}