blob: 3886f3dd145417b856fad37227621a8cf21846cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.hll.HyperLogLogHash;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class DruidSegmentReaderTest extends NullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File segmentDirectory;
private final IndexIO indexIO = TestHelper.getTestIndexIO();
@Before
public void setUp() throws IOException
{
// Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex).
final IncrementalIndex<?> incrementalIndex =
IndexBuilder.create()
.schema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
)
)
.withMetrics(
new CountAggregatorFactory("cnt"),
new HyperUniquesAggregatorFactory("met_s", "s")
)
.withRollup(false)
.build()
)
.rows(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "foo")
.put("d", 1.23)
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "bar")
.put("d", 4.56)
.build()
)
)
)
.buildIncrementalIndex();
segmentDirectory = temporaryFolder.newFolder();
try {
TestHelper.getTestIndexMergerV9(
OnHeapMemorySegmentWriteOutMediumFactory.instance()
).persist(
incrementalIndex,
segmentDirectory,
new IndexSpec(),
null
);
}
finally {
incrementalIndex.close();
}
}
@Test
public void testReader() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderAutoTimestampFormat() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "auto", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithDimensionExclusions() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(),
ImmutableList.of("__time", "s", "cnt", "met_s"),
ImmutableList.of()
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithInclusiveColumnsFilter() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "s", "d")),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithInclusiveColumnsFilterNoTimestamp() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("s", "d")),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "foo")
.put("d", 1.23d)
.build()
),
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "bar")
.put("d", 4.56d)
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithFilter() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
new SelectorDimFilter("d", "1.23", null),
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampFromDouble() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("d", "posix", null),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:01.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:04.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampAsPosixIncorrectly() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "posix", null),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("31969-04-01T00:00:00.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("31969-05-12T16:00:00.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampSpecDefault() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec(null, null, DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testMakeCloseableIteratorFromSequenceAndSegmentFileCloseYielderOnClose() throws IOException
{
MutableBoolean isSequenceClosed = new MutableBoolean(false);
MutableBoolean isFileClosed = new MutableBoolean(false);
Sequence<Map<String, Object>> sequence = new BaseSequence<>(
new IteratorMaker<Map<String, Object>, Iterator<Map<String, Object>>>()
{
@Override
public Iterator<Map<String, Object>> make()
{
return Collections.emptyIterator();
}
@Override
public void cleanup(Iterator<Map<String, Object>> iterFromMake)
{
isSequenceClosed.setValue(true);
}
}
);
CleanableFile cleanableFile = new CleanableFile()
{
@Override
public File file()
{
return null;
}
@Override
public void close()
{
isFileClosed.setValue(true);
}
};
try (CloseableIterator<Map<String, Object>> iterator =
DruidSegmentReader.makeCloseableIteratorFromSequenceAndSegmentFile(sequence, cleanableFile)) {
while (iterator.hasNext()) {
iterator.next();
}
}
Assert.assertTrue("File is not closed", isFileClosed.booleanValue());
Assert.assertTrue("Sequence is not closed", isSequenceClosed.booleanValue());
}
private DruidSegmentInputEntity makeInputEntity(final Interval interval)
{
return new DruidSegmentInputEntity(
new SegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public File getSegmentFiles(DataSegment segment)
{
return segmentDirectory;
}
@Override
public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
},
DataSegment.builder()
.dataSource("ds")
.dimensions(ImmutableList.of("s", "d"))
.metrics(ImmutableList.of("cnt", "met_s"))
.interval(Intervals.of("2000/P1D"))
.version("1")
.size(0)
.build(),
interval
);
}
private List<InputRow> readRows(final DruidSegmentReader reader) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
while (iterator.hasNext()) {
rows.addAll(reader.parseInputRows(iterator.next()));
}
}
return rows;
}
private static HyperLogLogCollector makeHLLC(final String... values)
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (String value : values) {
collector.add(HyperLogLogHash.getDefault().hash(value));
}
return collector;
}
}