blob: bfb0503f047cab3b85c5af25d86287207b1cc61b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.ListColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.scan.ScanQueryRunnerTest;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@RunWith(Parameterized.class)
public class DoubleStorageTest
{
private static final SegmentMetadataQueryRunnerFactory METADATA_QR_FACTORY = new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
private static final ScanQueryQueryToolChest SCAN_QUERY_QUERY_TOOL_CHEST = new ScanQueryQueryToolChest(
new ScanQueryConfig(),
DefaultGenericQueryMetricsFactory.instance()
);
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
SCAN_QUERY_QUERY_TOOL_CHEST,
new ScanQueryEngine(),
new ScanQueryConfig()
);
private Druids.ScanQueryBuilder newTestQuery()
{
return Druids.newScanQueryBuilder()
.dataSource(new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE))
.columns(Collections.emptyList())
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.limit(Integer.MAX_VALUE)
.legacy(false);
}
private static final IndexMergerV9 INDEX_MERGER_V9 =
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final Integer MAX_ROWS = 10;
private static final String TIME_COLUMN = "__time";
private static final String DIM_NAME = "testDimName";
private static final String DIM_VALUE = "testDimValue";
private static final String DIM_FLOAT_NAME = "testDimFloatName";
private static final SegmentId SEGMENT_ID = SegmentId.dummy("segmentId");
private static final Interval INTERVAL = Intervals.of("2011-01-13T00:00:00.000Z/2011-01-22T00:00:00.001Z");
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)),
ImmutableList.of(DIM_FLOAT_NAME),
ImmutableList.of()
),
null,
null
)
);
private QueryableIndex index;
private final SegmentAnalysis expectedSegmentAnalysis;
private final String storeDoubleAs;
public DoubleStorageTest(
String storeDoubleAs,
SegmentAnalysis expectedSegmentAnalysis
)
{
this.storeDoubleAs = storeDoubleAs;
this.expectedSegmentAnalysis = expectedSegmentAnalysis;
}
@Parameterized.Parameters
public static Collection<?> dataFeeder()
{
SegmentAnalysis expectedSegmentAnalysisDouble = new SegmentAnalysis(
SEGMENT_ID.toString(),
ImmutableList.of(INTERVAL),
ImmutableMap.of(
TIME_COLUMN,
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
100,
null,
null,
null,
null
),
DIM_NAME,
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
120,
1,
DIM_VALUE,
DIM_VALUE,
null
),
DIM_FLOAT_NAME,
new ColumnAnalysis(
ValueType.DOUBLE.toString(),
false,
80,
null,
null,
null,
null
)
), 330,
MAX_ROWS,
null,
null,
null,
null
);
SegmentAnalysis expectedSegmentAnalysisFloat = new SegmentAnalysis(
SEGMENT_ID.toString(),
ImmutableList.of(INTERVAL),
ImmutableMap.of(
TIME_COLUMN,
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
100,
null,
null,
null,
null
),
DIM_NAME,
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
120,
1,
DIM_VALUE,
DIM_VALUE,
null
),
DIM_FLOAT_NAME,
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
80,
null,
null,
null,
null
)
), 330,
MAX_ROWS,
null,
null,
null,
null
);
return ImmutableList.of(
new Object[]{"double", expectedSegmentAnalysisDouble},
new Object[]{"float", expectedSegmentAnalysisFloat}
);
}
@Before
public void setup() throws IOException
{
index = buildIndex(storeDoubleAs);
}
@Test
public void testMetaDataAnalysis()
{
QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
METADATA_QR_FACTORY,
SEGMENT_ID,
new QueryableIndexSegment(index, SEGMENT_ID),
null
);
SegmentMetadataQuery segmentMetadataQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals(ImmutableList.of(INTERVAL))
.toInclude(new ListColumnIncluderator(Arrays.asList(
TIME_COLUMN,
DIM_NAME,
DIM_FLOAT_NAME
)))
.analysisTypes(
SegmentMetadataQuery.AnalysisType.CARDINALITY,
SegmentMetadataQuery.AnalysisType.SIZE,
SegmentMetadataQuery.AnalysisType.INTERVAL,
SegmentMetadataQuery.AnalysisType.MINMAX
)
.merge(true)
.build();
List<SegmentAnalysis> results = runner.run(QueryPlus.wrap(segmentMetadataQuery)).toList();
Assert.assertEquals(Collections.singletonList(expectedSegmentAnalysis), results);
}
@Test
public void testSelectValues()
{
QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
SCAN_QUERY_RUNNER_FACTORY,
SEGMENT_ID,
new QueryableIndexSegment(index, SEGMENT_ID),
null
);
ScanQuery query = newTestQuery()
.intervals(new LegacySegmentSpec(INTERVAL))
.virtualColumns()
.build();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList();
ScanResultValue expectedScanResult = new ScanResultValue(
SEGMENT_ID.toString(),
ImmutableList.of(TIME_COLUMN, DIM_NAME, DIM_FLOAT_NAME),
getStreamOfEvents().collect(Collectors.toList())
);
List<ScanResultValue> expectedResults = Collections.singletonList(expectedScanResult);
ScanQueryRunnerTest.verify(expectedResults, results);
}
private static QueryableIndex buildIndex(String storeDoubleAsFloat) throws IOException
{
String oldValue = System.getProperty(ColumnHolder.DOUBLE_STORAGE_TYPE_PROPERTY);
System.setProperty(ColumnHolder.DOUBLE_STORAGE_TYPE_PROPERTY, storeDoubleAsFloat);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-13T00:00:00.000Z").getMillis())
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new DoubleSumAggregatorFactory(DIM_FLOAT_NAME, DIM_FLOAT_NAME)
)
.build();
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
getStreamOfEvents().forEach(o -> {
try {
index.add(ROW_PARSER.parseBatch((Map<String, Object>) o).get(0));
}
catch (IndexSizeExceededException e) {
throw new RuntimeException(e);
}
});
if (oldValue == null) {
System.clearProperty(ColumnHolder.DOUBLE_STORAGE_TYPE_PROPERTY);
} else {
System.setProperty(ColumnHolder.DOUBLE_STORAGE_TYPE_PROPERTY, oldValue);
}
File someTmpFile = File.createTempFile("billy", "yay");
someTmpFile.delete();
someTmpFile.mkdirs();
INDEX_MERGER_V9.persist(index, someTmpFile, new IndexSpec(), null);
someTmpFile.delete();
return INDEX_IO.loadIndex(someTmpFile);
}
@After
public void cleanUp()
{
index.close();
}
private static Stream getStreamOfEvents()
{
return IntStream.range(0, MAX_ROWS).mapToObj(i -> ImmutableMap.of(
TIME_COLUMN, DateTimes.of("2011-01-13T00:00:00.000Z").plusDays(i).getMillis(),
DIM_NAME, DIM_VALUE,
DIM_FLOAT_NAME, i / 1.6179775280898876
));
}
}