blob: f53f5d9ede60bb36c7ff21ac001d50f6bf68899b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.CharSource;
import com.google.common.io.LineProcessor;
import com.google.common.io.Resources;
import org.apache.druid.data.input.impl.DelimitedParseSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class TestIndex
{
public static final String[] COLUMNS = new String[]{
"ts",
"market",
"quality",
"qualityLong",
"qualityFloat",
"qualityDouble",
"qualityNumericString",
"longNumericNull",
"floatNumericNull",
"doubleNumericNull",
"placement",
"placementish",
"index",
"partial_null_column",
"null_column",
"quality_uniques",
"indexMin",
"indexMaxPlusTen"
};
public static final List<DimensionSchema> DIMENSION_SCHEMAS = Arrays.asList(
new StringDimensionSchema("market"),
new StringDimensionSchema("quality"),
new LongDimensionSchema("qualityLong"),
new FloatDimensionSchema("qualityFloat"),
new DoubleDimensionSchema("qualityDouble"),
new StringDimensionSchema("qualityNumericString"),
new LongDimensionSchema("longNumericNull"),
new FloatDimensionSchema("floatNumericNull"),
new DoubleDimensionSchema("doubleNumericNull"),
new StringDimensionSchema("placement"),
new StringDimensionSchema("placementish"),
new StringDimensionSchema("partial_null_column"),
new StringDimensionSchema("null_column")
);
public static final List<DimensionSchema> DIMENSION_SCHEMAS_NO_BITMAP = Arrays.asList(
new StringDimensionSchema("market", null, false),
new StringDimensionSchema("quality", null, false),
new LongDimensionSchema("qualityLong"),
new FloatDimensionSchema("qualityFloat"),
new DoubleDimensionSchema("qualityDouble"),
new StringDimensionSchema("qualityNumericString", null, false),
new LongDimensionSchema("longNumericNull"),
new FloatDimensionSchema("floatNumericNull"),
new DoubleDimensionSchema("doubleNumericNull"),
new StringDimensionSchema("placement", null, false),
new StringDimensionSchema("placementish", null, false),
new StringDimensionSchema("partial_null_column", null, false),
new StringDimensionSchema("null_column", null, false)
);
public static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DIMENSION_SCHEMAS,
null,
null
);
public static final DimensionsSpec DIMENSIONS_SPEC_NO_BITMAPS = new DimensionsSpec(
DIMENSION_SCHEMAS_NO_BITMAP,
null,
null
);
public static final String[] DOUBLE_METRICS = new String[]{"index", "indexMin", "indexMaxPlusTen"};
public static final String[] FLOAT_METRICS = new String[]{"indexFloat", "indexMinFloat", "indexMaxFloat"};
public static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final Logger log = new Logger(TestIndex.class);
private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
Collections.singletonList(
new ExpressionVirtualColumn("expr", "index + 10", ValueType.FLOAT, TestExprMacroTable.INSTANCE)
)
);
public static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(DOUBLE_METRICS[0], "index"),
new FloatSumAggregatorFactory(FLOAT_METRICS[0], "index"),
new DoubleMinAggregatorFactory(DOUBLE_METRICS[1], "index"),
new FloatMinAggregatorFactory(FLOAT_METRICS[1], "index"),
new FloatMaxAggregatorFactory(FLOAT_METRICS[2], "index"),
new DoubleMaxAggregatorFactory(DOUBLE_METRICS[2], VIRTUAL_COLUMNS.getVirtualColumns()[0].getOutputName()),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};
private static final IndexSpec INDEX_SPEC = new IndexSpec();
private static final IndexMerger INDEX_MERGER =
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
static {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
}
private static Supplier<IncrementalIndex> realtimeIndex = Suppliers.memoize(
() -> makeRealtimeIndex("druid.sample.numeric.tsv")
);
private static Supplier<IncrementalIndex> noRollupRealtimeIndex = Suppliers.memoize(
() -> makeRealtimeIndex("druid.sample.numeric.tsv", false)
);
private static Supplier<IncrementalIndex> noBitmapRealtimeIndex = Suppliers.memoize(
() -> makeRealtimeIndex("druid.sample.numeric.tsv", false, false)
);
private static Supplier<QueryableIndex> mmappedIndex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(realtimeIndex.get())
);
private static Supplier<QueryableIndex> noRollupMmappedIndex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(noRollupRealtimeIndex.get())
);
private static Supplier<QueryableIndex> noBitmapMmappedIndex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(noBitmapRealtimeIndex.get())
);
private static Supplier<QueryableIndex> mergedRealtime = Suppliers.memoize(() -> {
try {
IncrementalIndex top = makeRealtimeIndex("druid.sample.numeric.tsv.top");
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File topFile = new File(tmpFile, "top");
File bottomFile = new File(tmpFile, "bottom");
File mergedFile = new File(tmpFile, "merged");
topFile.mkdirs();
topFile.deleteOnExit();
bottomFile.mkdirs();
bottomFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER.persist(top, DATA_INTERVAL, topFile, INDEX_SPEC, null);
INDEX_MERGER.persist(bottom, DATA_INTERVAL, bottomFile, INDEX_SPEC, null);
return INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(INDEX_IO.loadIndex(topFile), INDEX_IO.loadIndex(bottomFile)),
true,
METRIC_AGGS,
mergedFile,
INDEX_SPEC,
null
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
public static IncrementalIndex getIncrementalTestIndex()
{
return realtimeIndex.get();
}
public static IncrementalIndex getNoRollupIncrementalTestIndex()
{
return noRollupRealtimeIndex.get();
}
public static IncrementalIndex getNoBitmapIncrementalTestIndex()
{
return noBitmapRealtimeIndex.get();
}
public static QueryableIndex getMMappedTestIndex()
{
return mmappedIndex.get();
}
public static QueryableIndex getNoRollupMMappedTestIndex()
{
return noRollupMmappedIndex.get();
}
public static QueryableIndex getNoBitmapMMappedTestIndex()
{
return noBitmapMmappedIndex.get();
}
public static QueryableIndex mergedRealtimeIndex()
{
return mergedRealtime.get();
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
{
return makeRealtimeIndex(resourceFilename, true);
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename, boolean rollup)
{
return makeRealtimeIndex(resourceFilename, rollup, true);
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename, boolean rollup, boolean bitmap)
{
CharSource stream = getResourceCharSource(resourceFilename);
return makeRealtimeIndex(stream, rollup, bitmap);
}
public static CharSource getResourceCharSource(final String resourceFilename)
{
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
if (resource == null) {
throw new IllegalArgumentException("cannot find resource " + resourceFilename);
}
log.info("Realtime loading index file[%s]", resource);
return Resources.asByteSource(resource).asCharSource(StandardCharsets.UTF_8);
}
public static IncrementalIndex makeRealtimeIndex(final CharSource source)
{
return makeRealtimeIndex(source, true, true);
}
public static IncrementalIndex makeRealtimeIndex(final CharSource source, boolean rollup, boolean bitmap)
{
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withDimensionsSpec(bitmap ? DIMENSIONS_SPEC : DIMENSIONS_SPEC_NO_BITMAPS)
.withVirtualColumns(VIRTUAL_COLUMNS)
.withMetrics(METRIC_AGGS)
.withRollup(rollup)
.build();
final IncrementalIndex retVal = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
.buildOnheap();
try {
return loadIncrementalIndex(retVal, source);
}
catch (Exception e) {
if (rollup) {
realtimeIndex = null;
} else {
noRollupRealtimeIndex = null;
}
throw new RuntimeException(e);
}
}
public static IncrementalIndex loadIncrementalIndex(
final IncrementalIndex retVal,
final CharSource source
) throws IOException
{
final StringInputRowParser parser = new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(DIMENSION_SCHEMAS, null, null),
"\t",
"\u0001",
Arrays.asList(COLUMNS),
false,
0
),
"utf8"
);
return loadIncrementalIndex(() -> retVal, source, parser);
}
public static IncrementalIndex loadIncrementalIndex(
final Supplier<IncrementalIndex> indexSupplier,
final CharSource source,
final StringInputRowParser parser
) throws IOException
{
final IncrementalIndex retVal = indexSupplier.get();
final AtomicLong startTime = new AtomicLong();
int lineCount = source.readLines(
new LineProcessor<Integer>()
{
boolean runOnce = false;
int lineCount = 0;
@Override
public boolean processLine(String line) throws IOException
{
if (!runOnce) {
startTime.set(System.currentTimeMillis());
runOnce = true;
}
retVal.add(parser.parse(line));
++lineCount;
return true;
}
@Override
public Integer getResult()
{
return lineCount;
}
}
);
log.info("Loaded %,d lines in %,d millis.", lineCount, System.currentTimeMillis() - startTime.get());
return retVal;
}
public static QueryableIndex persistRealtimeAndLoadMMapped(IncrementalIndex index)
{
try {
File someTmpFile = File.createTempFile("billy", "yay");
someTmpFile.delete();
someTmpFile.mkdirs();
someTmpFile.deleteOnExit();
INDEX_MERGER.persist(index, someTmpFile, INDEX_SPEC, null);
return INDEX_IO.loadIndex(someTmpFile);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}