blob: 5813f6d298ecf7555e03140ef571f39866bb7680 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
/**
* Helps tests make segments.
*/
public class IndexBuilder
{
private static final int ROWS_PER_INDEX_FOR_MERGING = 1;
private static final int DEFAULT_MAX_ROWS = Integer.MAX_VALUE;
private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
.build();
private SegmentWriteOutMediumFactory segmentWriteOutMediumFactory = OffHeapMemorySegmentWriteOutMediumFactory.instance();
private IndexMerger indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
private File tmpDir;
private IndexSpec indexSpec = new IndexSpec();
private int maxRows = DEFAULT_MAX_ROWS;
private final List<InputRow> rows = new ArrayList<>();
private IndexBuilder()
{
// Callers must use "create".
}
public static IndexBuilder create()
{
return new IndexBuilder();
}
public IndexBuilder schema(IncrementalIndexSchema schema)
{
this.schema = schema;
return this;
}
public IndexBuilder segmentWriteOutMediumFactory(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
{
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
return this;
}
public IndexBuilder indexSpec(IndexSpec indexSpec)
{
this.indexSpec = indexSpec;
return this;
}
public IndexBuilder tmpDir(File tmpDir)
{
this.tmpDir = tmpDir;
return this;
}
public IndexBuilder rows(Iterable<InputRow> rows)
{
this.rows.clear();
Iterables.addAll(this.rows, rows);
return this;
}
public IncrementalIndex buildIncrementalIndex()
{
return buildIncrementalIndexWithRows(schema, maxRows, rows);
}
public QueryableIndex buildMMappedIndex()
{
ColumnConfig noCacheColumnConfig = () -> 0;
return buildMMappedIndex(noCacheColumnConfig);
}
public QueryableIndex buildMMappedIndex(ColumnConfig columnConfig)
{
Preconditions.checkNotNull(indexMerger, "indexMerger");
Preconditions.checkNotNull(tmpDir, "tmpDir");
try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) {
return TestHelper.getTestIndexIO(columnConfig).loadIndex(
indexMerger.persist(
incrementalIndex,
new File(
tmpDir,
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))
),
indexSpec,
null
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public QueryableIndex buildMMappedMergedIndex()
{
IndexMerger indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
Preconditions.checkNotNull(tmpDir, "tmpDir");
final List<QueryableIndex> persisted = new ArrayList<>();
try {
for (int i = 0; i < rows.size(); i += ROWS_PER_INDEX_FOR_MERGING) {
persisted.add(
TestHelper.getTestIndexIO().loadIndex(
indexMerger.persist(
buildIncrementalIndexWithRows(
schema,
maxRows,
rows.subList(i, Math.min(rows.size(), i + ROWS_PER_INDEX_FOR_MERGING))
),
new File(tmpDir, StringUtils.format("testIndex-%s", UUID.randomUUID().toString())),
indexSpec,
null
)
)
);
}
final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex(
indexMerger.merge(
Lists.transform(
persisted,
QueryableIndexIndexableAdapter::new
),
true,
Iterables.toArray(
Iterables.transform(
Arrays.asList(schema.getMetrics()),
AggregatorFactory::getCombiningFactory
),
AggregatorFactory.class
),
new File(tmpDir, StringUtils.format("testIndex-%s", UUID.randomUUID())),
indexSpec
)
);
for (QueryableIndex index : persisted) {
index.close();
}
return merged;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public RowBasedSegment<InputRow> buildRowBasedSegmentWithoutTypeSignature()
{
return new RowBasedSegment<>(
SegmentId.dummy("IndexBuilder"),
rows,
RowAdapters.standardRow(),
RowSignature.empty()
);
}
public RowBasedSegment<InputRow> buildRowBasedSegmentWithTypeSignature()
{
// Determine row signature by building an mmapped index first.
try (final QueryableIndex index = buildMMappedIndex()) {
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
for (final String columnName : index.getColumnNames()) {
final ColumnCapabilities capabilities = index.getColumnHolder(columnName).getCapabilities();
rowSignatureBuilder.add(columnName, capabilities.getType());
}
return new RowBasedSegment<>(
SegmentId.dummy("IndexBuilder"),
rows,
RowAdapters.standardRow(),
rowSignatureBuilder.build()
);
}
}
private static IncrementalIndex buildIncrementalIndexWithRows(
IncrementalIndexSchema schema,
int maxRows,
Iterable<InputRow> rows
)
{
Preconditions.checkNotNull(schema, "schema");
final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRows)
.buildOnheap();
for (InputRow row : rows) {
try {
incrementalIndex.add(row);
}
catch (IndexSizeExceededException e) {
throw new RuntimeException(e);
}
}
return incrementalIndex;
}
}