blob: 1ed1c37bc3a1da6d3c2af31be10c0bd351900e79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.HashJoinSegment;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class IndexedTableJoinCursorBenchmark
{
static {
NullHandling.initializeForTests();
}
private static final List<Set<String>> PROJECTIONS = ImmutableList.of(
// 0 string key rhs
ImmutableSet.of("j0.stringKey"),
// 1 string key lhs
ImmutableSet.of("stringKey"),
// 2 numeric key rhs
ImmutableSet.of("j0.longKey"),
// 3 numeric key lhs
ImmutableSet.of("longKey"),
// 4 string rhs
ImmutableSet.of("j0.string5"),
// 5 string lhs
ImmutableSet.of("string5"),
// 6 numeric lhs
ImmutableSet.of("j0.long4"),
// 7 numeric rhs
ImmutableSet.of("long4"),
// 8 multi column projection all rhs
ImmutableSet.of("j0.stringKey", "j0.longKey", "j0.string1"),
// 9 multi column projection all lhs
ImmutableSet.of("stringKey", "longKey", "string1"),
// 10 big projection all rhs
ImmutableSet.of("j0.string1", "j0.string2", "j0.string3", "j0.string4", "j0.string5", "j0.long1", "j0.float1", "j0.double1"),
// 11 big projection, all lhs
ImmutableSet.of("string1", "string2", "string3", "string4", "string5", "long1", "float1", "double1"),
// 12 big projection, mix of lhs and rhs
ImmutableSet.of("j0.string1", "string2", "j0.string3", "string4", "j0.string5", "long1", "j0.float1", "j0.double1")
);
@Param({"50000"})
int rowsPerSegment;
@Param({"5000000"})
int rowsPerTableSegment;
@Param({"segment"})
String indexedTableType;
@Param({"0", "1", "2", "3", "6", "7", "8", "9", "10", "11", "12"})
int projection;
@Param({"string1,stringKey", "stringKey,stringKey", "long3,longKey", "longKey,longKey"})
String joinColumns;
private Set<String> keyColumns = ImmutableSet.of("stringKey", "longKey");
boolean enableFilterPushdown = false;
boolean enableFilterRewrite = false;
boolean enableFilterRewriteValueFilters = false;
private Set<String> projectionColumns = null;
private IndexedTable table = null;
private QueryableIndexSegment baseSegment = null;
private QueryableIndexSegment joinSegment = null;
private Segment hashJoinSegment = null;
private Closer closer = Closer.create();
@Setup(Level.Trial)
public void setup()
{
baseSegment = makeQueryableIndexSegment(closer, "regular", rowsPerSegment);
joinSegment = makeQueryableIndexSegment(closer, "join", rowsPerTableSegment);
table = closer.register(makeTable(indexedTableType, keyColumns, joinSegment));
final String prefix = "j0.";
projectionColumns = PROJECTIONS.get(projection);
final String[] split = joinColumns.split(",");
final String lhsJoinColumn = split[0];
final String rhsJoinColumn = split[1];
final List<JoinableClause> clauses = ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(table),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("%s == \"%s%s\"", lhsJoinColumn, prefix, rhsJoinColumn),
prefix,
ExprMacroTable.nil()
)
)
);
final JoinFilterPreAnalysis preAnalysis =
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
new JoinFilterRewriteConfig(
enableFilterPushdown,
enableFilterRewrite,
enableFilterRewriteValueFilters,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
),
clauses,
VirtualColumns.EMPTY,
null
)
);
hashJoinSegment = closer.register(
new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
clauses,
preAnalysis
)
);
}
@TearDown
public void tearDown() throws IOException
{
closer.close();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void hashJoinCursorColumnValueSelectors(Blackhole blackhole)
{
final Sequence<Cursor> cursors = makeCursors();
int rowCount = processRowsValueSelector(blackhole, cursors, projectionColumns);
blackhole.consume(rowCount);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void hashJoinCursorDimensionSelectors(Blackhole blackhole)
{
final Sequence<Cursor> cursors = makeCursors();
int rowCount = processRowsDimensionSelectors(blackhole, cursors, projectionColumns);
blackhole.consume(rowCount);
}
private Sequence<Cursor> makeCursors()
{
return hashJoinSegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
}
public static IndexedTable makeTable(
final String indexedTableType,
Set<String> keyColumns,
QueryableIndexSegment tableSegment
)
{
IndexedTable table;
switch (indexedTableType) {
case "segment":
table = new BroadcastSegmentIndexedTable(tableSegment, keyColumns, tableSegment.getId().getVersion());
break;
default:
throw new IAE("Unknown table type %s", indexedTableType);
}
return table;
}
public static QueryableIndexSegment makeQueryableIndexSegment(Closer closer, String dataSource, int rowsPerSegment)
{
final List<GeneratorColumnSchema> schemaColumnsInfo = ImmutableList.of(
GeneratorColumnSchema.makeSequential("stringKey", ValueType.STRING, false, 1, null, 0, rowsPerSegment),
GeneratorColumnSchema.makeSequential("longKey", ValueType.LONG, false, 1, null, 0, rowsPerSegment),
GeneratorColumnSchema.makeLazyZipf("string1", ValueType.STRING, false, 1, 0.1, 0, rowsPerSegment, 2.0),
GeneratorColumnSchema.makeLazyZipf("string2", ValueType.STRING, false, 1, 0.3, 0, 1000000, 1.5),
GeneratorColumnSchema.makeLazyZipf("string3", ValueType.STRING, false, 1, 0.12, 0, 1000, 1.25),
GeneratorColumnSchema.makeLazyZipf("string4", ValueType.STRING, false, 1, 0.22, 0, 12000, 3.0),
GeneratorColumnSchema.makeLazyZipf("string5", ValueType.STRING, false, 1, 0.05, 0, 33333, 1.8),
GeneratorColumnSchema.makeLazyZipf("long1", ValueType.LONG, false, 1, 0.1, 0, 1001, 2.0),
GeneratorColumnSchema.makeLazyZipf("long2", ValueType.LONG, false, 1, 0.01, 0, 666666, 2.2),
GeneratorColumnSchema.makeLazyZipf("long3", ValueType.LONG, false, 1, 0.12, 0, 1000000, 2.5),
GeneratorColumnSchema.makeLazyZipf("long4", ValueType.LONG, false, 1, 0.4, 0, 23, 1.2),
GeneratorColumnSchema.makeLazyZipf("long5", ValueType.LONG, false, 1, 0.33, 0, 9999, 1.5),
GeneratorColumnSchema.makeLazyZipf("double1", ValueType.DOUBLE, false, 1, 0.1, 0, 333, 2.2),
GeneratorColumnSchema.makeLazyZipf("double2", ValueType.DOUBLE, false, 1, 0.01, 0, 4021, 2.5),
GeneratorColumnSchema.makeLazyZipf("double3", ValueType.DOUBLE, false, 1, 0.41, 0, 90210, 4.0),
GeneratorColumnSchema.makeLazyZipf("double4", ValueType.DOUBLE, false, 1, 0.5, 0, 5555555, 1.2),
GeneratorColumnSchema.makeLazyZipf("double5", ValueType.DOUBLE, false, 1, 0.23, 0, 80, 1.8),
GeneratorColumnSchema.makeLazyZipf("float1", ValueType.FLOAT, false, 1, 0.11, 0, 1000000, 1.7),
GeneratorColumnSchema.makeLazyZipf("float2", ValueType.FLOAT, false, 1, 0.4, 0, 10, 1.5),
GeneratorColumnSchema.makeLazyZipf("float3", ValueType.FLOAT, false, 1, 0.8, 0, 5000, 2.3),
GeneratorColumnSchema.makeLazyZipf("float4", ValueType.FLOAT, false, 1, 0.999, 0, 14440, 2.0),
GeneratorColumnSchema.makeLazyZipf("float5", ValueType.FLOAT, false, 1, 0.001, 0, 1029, 1.5)
);
final List<AggregatorFactory> aggs = new ArrayList<>();
aggs.add(new CountAggregatorFactory("rows"));
final Interval interval = Intervals.of("2000-01-01/P1D");
final GeneratorSchemaInfo schema = new GeneratorSchemaInfo(
schemaColumnsInfo,
aggs,
interval,
false
);
final DataSegment dataSegment = DataSegment.builder()
.dataSource(dataSource)
.interval(schema.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
final QueryableIndex index = closer.register(new SegmentGenerator())
.generate(dataSegment, schema, Granularities.NONE, rowsPerSegment);
return closer.register(new QueryableIndexSegment(index, SegmentId.dummy(dataSource)));
}
private static int processRowsDimensionSelectors(
final Blackhole blackhole,
final Sequence<Cursor> cursors,
final Set<String> columns
)
{
if (columns.size() == 1) {
return processRowsSingleDimensionSelector(blackhole, cursors, Iterables.getOnlyElement(columns));
}
return cursors.map(
cursor -> {
List<DimensionSelector> selectors = columns.stream().map(column -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
return factory.makeDimensionSelector(DefaultDimensionSpec.of(column));
}).collect(Collectors.toList());
int rowCount = 0;
while (!cursor.isDone()) {
for (DimensionSelector selector : selectors) {
if (selector.getValueCardinality() < 0) {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
blackhole.consume(selector.lookupName(row.get(i)));
}
} else {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
blackhole.consume(row.get(i));
}
}
}
rowCount++;
cursor.advance();
}
return rowCount;
}).accumulate(0, (acc, in) -> acc + in);
}
private static int processRowsSingleDimensionSelector(
final Blackhole blackhole,
final Sequence<Cursor> cursors,
final String dimension
)
{
return cursors.map(
cursor -> {
final DimensionSelector selector = cursor.getColumnSelectorFactory()
.makeDimensionSelector(DefaultDimensionSpec.of(dimension));
int rowCount = 0;
if (selector.getValueCardinality() < 0) {
String lastValue;
while (!cursor.isDone()) {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
lastValue = selector.lookupName(row.get(i));
blackhole.consume(lastValue);
}
rowCount++;
cursor.advance();
}
return rowCount;
} else {
int lastValue;
while (!cursor.isDone()) {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
lastValue = row.get(i);
blackhole.consume(lastValue);
}
rowCount++;
cursor.advance();
}
return rowCount;
}
}
).accumulate(0, (acc, in) -> acc + in);
}
private static int processRowsValueSelector(final Blackhole blackhole, final Sequence<Cursor> cursors, final Set<String> columns)
{
return cursors.map(
cursor -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
List<BaseObjectColumnValueSelector> selectors =
columns.stream().map(factory::makeColumnValueSelector).collect(Collectors.toList());
int rowCount = 0;
while (!cursor.isDone()) {
for (BaseObjectColumnValueSelector<?> selector : selectors) {
blackhole.consume(selector.getObject());
}
rowCount++;
cursor.advance();
}
return rowCount;
}).accumulate(0, (acc, in) -> acc + in);
}
}