blob: 0cf4d0dd134bd90f97adf4da5a6f03693a7ffadf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*/
public class SchemalessIndexTest
{
private static final Logger log = new Logger(SchemalessIndexTest.class);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String TEST_FILE = "druid.sample.json";
private static final String TIMESTAMP = "timestamp";
private static final List<String> METRICS = Collections.singletonList("index");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("index", "index"),
new CountAggregatorFactory("count"),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};
private static final AggregatorFactory[] METRIC_AGGS_NO_UNIQ = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("index", "index"),
new CountAggregatorFactory("count")
};
private static final IndexSpec INDEX_SPEC = new IndexSpec();
private static final List<Map<String, Object>> EVENTS = new ArrayList<>();
private static final Map<Integer, Map<Integer, QueryableIndex>> INCREMENTAL_INDEXES = new HashMap<>();
private static final Map<Integer, Map<Integer, QueryableIndex>> MERGED_INDEXES = new HashMap<>();
private static final List<QueryableIndex> ROW_PERSISTED_INDEXES = new ArrayList<>();
private static IncrementalIndex index = null;
private static QueryableIndex mergedIndex = null;
static {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
}
private final IndexMerger indexMerger;
private final IndexIO indexIO;
public SchemalessIndexTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
{
indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
indexIO = TestHelper.getTestIndexIO();
}
public static IncrementalIndex getIncrementalIndex()
{
synchronized (log) {
if (index != null) {
return index;
}
index = makeIncrementalIndex(TEST_FILE, METRIC_AGGS);
return index;
}
}
public static QueryableIndex getIncrementalIndex(int index1, int index2)
{
synchronized (log) {
if (EVENTS.isEmpty()) {
makeEvents();
}
Map<Integer, QueryableIndex> entry = INCREMENTAL_INDEXES.get(index1);
if (entry != null) {
QueryableIndex index = entry.get(index2);
if (index != null) {
return index;
}
} else {
entry = new HashMap<>();
INCREMENTAL_INDEXES.put(index1, entry);
}
IncrementalIndex theIndex = null;
int count = 0;
for (final Map<String, Object> event : EVENTS) {
if (count != index1 && count != index2) {
count++;
continue;
}
final long timestamp = new DateTime(event.get(TIMESTAMP), ISOChronology.getInstanceUTC()).getMillis();
if (theIndex == null) {
theIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(METRIC_AGGS)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
}
final List<String> dims = new ArrayList<>();
for (final Map.Entry<String, Object> val : event.entrySet()) {
if (!val.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(val.getKey())) {
dims.add(val.getKey());
}
}
try {
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
}
catch (IndexSizeExceededException e) {
throw new RuntimeException(e);
}
count++;
}
QueryableIndex retVal = TestIndex.persistRealtimeAndLoadMMapped(theIndex);
entry.put(index2, retVal);
return retVal;
}
}
public QueryableIndex getMergedIncrementalIndex()
{
synchronized (log) {
if (mergedIndex != null) {
return mergedIndex;
}
try {
IncrementalIndex top = makeIncrementalIndex("druid.sample.json.top", METRIC_AGGS);
IncrementalIndex bottom = makeIncrementalIndex("druid.sample.json.bottom", METRIC_AGGS);
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File topFile = new File(tmpFile, "top");
File bottomFile = new File(tmpFile, "bottom");
File mergedFile = new File(tmpFile, "merged");
topFile.mkdirs();
topFile.deleteOnExit();
bottomFile.mkdirs();
bottomFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
indexMerger.persist(top, topFile, INDEX_SPEC, null);
indexMerger.persist(bottom, bottomFile, INDEX_SPEC, null);
mergedIndex = indexIO.loadIndex(
indexMerger.mergeQueryableIndex(
Arrays.asList(indexIO.loadIndex(topFile), indexIO.loadIndex(bottomFile)),
true,
METRIC_AGGS,
mergedFile,
INDEX_SPEC,
null
)
);
return mergedIndex;
}
catch (IOException e) {
mergedIndex = null;
throw new RuntimeException(e);
}
}
}
public QueryableIndex getMergedIncrementalIndex(int index1, int index2)
{
synchronized (log) {
if (ROW_PERSISTED_INDEXES.isEmpty()) {
makeRowPersistedIndexes();
}
Map<Integer, QueryableIndex> entry = MERGED_INDEXES.get(index1);
if (entry != null) {
QueryableIndex index = entry.get(index2);
if (index != null) {
return index;
}
} else {
entry = new HashMap<>();
MERGED_INDEXES.put(index1, entry);
}
try {
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File mergedFile = new File(tmpFile, "merged");
mergedFile.mkdirs();
mergedFile.deleteOnExit();
QueryableIndex index = indexIO.loadIndex(
indexMerger.mergeQueryableIndex(
Arrays.asList(ROW_PERSISTED_INDEXES.get(index1), ROW_PERSISTED_INDEXES.get(index2)),
true,
METRIC_AGGS,
mergedFile,
INDEX_SPEC,
null
)
);
entry.put(index2, index);
return index;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public QueryableIndex getMergedIncrementalIndex(int[] indexes)
{
synchronized (log) {
if (ROW_PERSISTED_INDEXES.isEmpty()) {
makeRowPersistedIndexes();
}
try {
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File mergedFile = new File(tmpFile, "merged");
mergedFile.mkdirs();
mergedFile.deleteOnExit();
List<QueryableIndex> indexesToMerge = new ArrayList<>();
for (int index : indexes) {
indexesToMerge.add(ROW_PERSISTED_INDEXES.get(index));
}
return indexIO.loadIndex(
indexMerger.mergeQueryableIndex(indexesToMerge, true, METRIC_AGGS, mergedFile, INDEX_SPEC, null)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public QueryableIndex getAppendedIncrementalIndex(
Iterable<Pair<String, AggregatorFactory[]>> files,
List<Interval> intervals
)
{
return makeAppendedMMappedIndex(files, intervals);
}
public QueryableIndex getMergedIncrementalIndexDiffMetrics()
{
return getMergedIncrementalIndex(
Arrays.asList(
new Pair<String, AggregatorFactory[]>("druid.sample.json.top", METRIC_AGGS_NO_UNIQ),
new Pair<String, AggregatorFactory[]>("druid.sample.json.bottom", METRIC_AGGS)
)
);
}
public QueryableIndex getMergedIncrementalIndex(Iterable<Pair<String, AggregatorFactory[]>> files)
{
return makeMergedMMappedIndex(files);
}
private static void makeEvents()
{
URL resource = TestIndex.class.getClassLoader().getResource(TEST_FILE);
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
try {
for (Object obj : JSON_MAPPER.readValue(new File(filename), List.class)) {
final Map<String, Object> event = JSON_MAPPER.convertValue(obj, Map.class);
EVENTS.add(event);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void makeRowPersistedIndexes()
{
synchronized (log) {
try {
if (EVENTS.isEmpty()) {
makeEvents();
}
for (final Map<String, Object> event : EVENTS) {
final long timestamp = new DateTime(event.get(TIMESTAMP), ISOChronology.getInstanceUTC()).getMillis();
final List<String> dims = new ArrayList<>();
for (Map.Entry<String, Object> entry : event.entrySet()) {
if (!entry.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(entry.getKey())) {
dims.add(entry.getKey());
}
}
final IncrementalIndex rowIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(METRIC_AGGS)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
rowIndex.add(
new MapBasedInputRow(timestamp, dims, event)
);
File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete();
tmpFile.mkdirs();
tmpFile.deleteOnExit();
indexMerger.persist(rowIndex, tmpFile, INDEX_SPEC, null);
ROW_PERSISTED_INDEXES.add(indexIO.loadIndex(tmpFile));
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static IncrementalIndex makeIncrementalIndex(final String resourceFilename, AggregatorFactory[] aggs)
{
URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
log.info("Realtime loading resource[%s]", resource);
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
final IncrementalIndex retVal = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(aggs)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
try {
final List<Object> events = JSON_MAPPER.readValue(new File(filename), List.class);
for (Object obj : events) {
final Map<String, Object> event = JSON_MAPPER.convertValue(obj, Map.class);
final List<String> dims = new ArrayList<>();
for (Map.Entry<String, Object> entry : event.entrySet()) {
if (!entry.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(entry.getKey())) {
dims.add(entry.getKey());
}
}
retVal.add(
new MapBasedInputRow(
new DateTime(event.get(TIMESTAMP), ISOChronology.getInstanceUTC()).getMillis(),
dims,
event
)
);
}
}
catch (IOException e) {
index = null;
throw new RuntimeException(e);
}
return retVal;
}
private List<File> makeFilesToMap(File tmpFile, Iterable<Pair<String, AggregatorFactory[]>> files) throws IOException
{
List<File> filesToMap = new ArrayList<>();
for (Pair<String, AggregatorFactory[]> file : files) {
IncrementalIndex index = makeIncrementalIndex(file.lhs, file.rhs);
File theFile = new File(tmpFile, file.lhs);
theFile.mkdirs();
theFile.deleteOnExit();
filesToMap.add(theFile);
indexMerger.persist(index, theFile, INDEX_SPEC, null);
}
return filesToMap;
}
private QueryableIndex makeAppendedMMappedIndex(
Iterable<Pair<String, AggregatorFactory[]>> files,
final List<Interval> intervals
)
{
try {
File tmpFile = File.createTempFile("yay", "boo");
tmpFile.delete();
File mergedFile = new File(tmpFile, "merged");
mergedFile.mkdirs();
mergedFile.deleteOnExit();
List<File> filesToMap = makeFilesToMap(tmpFile, files);
VersionedIntervalTimeline<Integer, OvershadowableFile> timeline = new VersionedIntervalTimeline<>(
Comparators.naturalNullsFirst()
);
ShardSpec noneShardSpec = NoneShardSpec.instance();
for (int i = 0; i < intervals.size(); i++) {
timeline.add(intervals.get(i), i, noneShardSpec.createChunk(new OvershadowableFile(i, filesToMap.get(i))));
}
final List<IndexableAdapter> adapters = Lists.newArrayList(
Iterables.concat(
// TimelineObjectHolder is actually an iterable of iterable of indexable adapters
Iterables.transform(
timeline.lookup(Intervals.of("1000-01-01/3000-01-01")),
new Function<TimelineObjectHolder<Integer, OvershadowableFile>, Iterable<IndexableAdapter>>()
{
@Override
public Iterable<IndexableAdapter> apply(final TimelineObjectHolder<Integer, OvershadowableFile> timelineObjectHolder)
{
return Iterables.transform(
timelineObjectHolder.getObject(),
// Each chunk can be used to build the actual IndexableAdapter
new Function<PartitionChunk<OvershadowableFile>, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(PartitionChunk<OvershadowableFile> chunk)
{
try {
return new RowFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(indexIO.loadIndex(chunk.getObject().file)),
rowPointer -> timelineObjectHolder.getInterval().contains(rowPointer.getTimestamp())
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
);
}
}
)
)
);
return indexIO.loadIndex(indexMerger.append(adapters, null, mergedFile, INDEX_SPEC, null));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private QueryableIndex makeMergedMMappedIndex(Iterable<Pair<String, AggregatorFactory[]>> files)
{
try {
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File mergedFile = new File(tmpFile, "merged");
mergedFile.mkdirs();
mergedFile.deleteOnExit();
List<File> filesToMap = makeFilesToMap(tmpFile, files);
return indexIO.loadIndex(
indexMerger.mergeQueryableIndex(
Lists.newArrayList(
Iterables.transform(
filesToMap,
new Function<File, QueryableIndex>()
{
@Override
public QueryableIndex apply(@Nullable File input)
{
try {
return indexIO.loadIndex(input);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
)
),
true,
METRIC_AGGS,
mergedFile,
INDEX_SPEC,
null
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private static class OvershadowableFile implements Overshadowable<OvershadowableFile>
{
private final String majorVersion;
private final File file;
OvershadowableFile(int majorVersion, File file)
{
this.majorVersion = Integer.toString(majorVersion);
this.file = file;
}
@Override
public boolean overshadows(OvershadowableFile other)
{
return false;
}
@Override
public int getStartRootPartitionId()
{
return 0;
}
@Override
public int getEndRootPartitionId()
{
return 0;
}
@Override
public String getVersion()
{
return majorVersion;
}
@Override
public short getMinorVersion()
{
return 0;
}
@Override
public short getAtomicUpdateGroupSize()
{
return 0;
}
}
}