blob: 176f1199e8694b8f8f5b106098f75e7a185a84bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.common;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
public class DataFetcherTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "DataFetcherTest");
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
private static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
private static final int NUM_ROWS = 1000;
private static final int MAX_VALUE = 1000000;
private static final String INT_COLUMN = "int";
private static final String LONG_COLUMN = "long";
private static final String FLOAT_COLUMN = "float";
private static final String DOUBLE_COLUMN = "double";
private static final String BIG_DECIMAL_COLUMN = "bigDecimal";
private static final String STRING_COLUMN = "string";
private static final String BYTES_COLUMN = "bytes";
private static final String HEX_STRING_COLUMN = "hex_string";
private static final String NO_DICT_INT_COLUMN = "no_dict_int";
private static final String NO_DICT_LONG_COLUMN = "no_dict_long";
private static final String NO_DICT_FLOAT_COLUMN = "no_dict_float";
private static final String NO_DICT_DOUBLE_COLUMN = "no_dict_double";
private static final String NO_DICT_BIG_DECIMAL_COLUMN = "no_dict_big_decimal";
private static final String NO_DICT_STRING_COLUMN = "no_dict_string";
private static final String NO_DICT_BYTES_COLUMN = "no_dict_bytes";
private static final String NO_DICT_HEX_STRING_COLUMN = "no_dict_hex_string";
private static final int MAX_STEP_LENGTH = 5;
private final int[] _values = new int[NUM_ROWS];
private IndexSegment _indexSegment;
private DataFetcher _dataFetcher;
@BeforeClass
private void setup()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);
// Generate random values
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
int value = RANDOM.nextInt(MAX_VALUE);
_values[i] = value;
GenericRow row = new GenericRow();
row.putValue(INT_COLUMN, value);
row.putValue(NO_DICT_INT_COLUMN, value);
row.putValue(LONG_COLUMN, (long) value);
row.putValue(NO_DICT_LONG_COLUMN, (long) value);
row.putValue(FLOAT_COLUMN, (float) value);
row.putValue(NO_DICT_FLOAT_COLUMN, (float) value);
row.putValue(DOUBLE_COLUMN, (double) value);
row.putValue(NO_DICT_DOUBLE_COLUMN, (double) value);
BigDecimal bigDecimalValue = BigDecimal.valueOf(value);
row.putValue(BIG_DECIMAL_COLUMN, bigDecimalValue);
row.putValue(NO_DICT_BIG_DECIMAL_COLUMN, bigDecimalValue);
String stringValue = Integer.toString(value);
row.putValue(STRING_COLUMN, stringValue);
row.putValue(NO_DICT_STRING_COLUMN, stringValue);
byte[] bytesValue = stringValue.getBytes(UTF_8);
row.putValue(BYTES_COLUMN, bytesValue);
row.putValue(NO_DICT_BYTES_COLUMN, bytesValue);
String hexStringValue = BytesUtils.toHexString(bytesValue);
row.putValue(HEX_STRING_COLUMN, hexStringValue);
row.putValue(NO_DICT_HEX_STRING_COLUMN, hexStringValue);
rows.add(row);
}
// Create the segment
String tableName = "testTable";
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNoDictionaryColumns(
Arrays.asList(NO_DICT_INT_COLUMN, NO_DICT_LONG_COLUMN, NO_DICT_FLOAT_COLUMN, NO_DICT_DOUBLE_COLUMN,
NO_DICT_STRING_COLUMN, NO_DICT_BYTES_COLUMN, NO_DICT_HEX_STRING_COLUMN)).build();
Schema schema =
new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension(INT_COLUMN, DataType.INT)
.addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
.addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
.addMetric(BIG_DECIMAL_COLUMN, DataType.BIG_DECIMAL)
.addSingleValueDimension(STRING_COLUMN, DataType.STRING)
.addSingleValueDimension(BYTES_COLUMN, DataType.BYTES)
.addSingleValueDimension(HEX_STRING_COLUMN, DataType.STRING)
.addSingleValueDimension(NO_DICT_INT_COLUMN, DataType.INT)
.addSingleValueDimension(NO_DICT_LONG_COLUMN, DataType.LONG)
.addSingleValueDimension(NO_DICT_FLOAT_COLUMN, DataType.FLOAT)
.addSingleValueDimension(NO_DICT_DOUBLE_COLUMN, DataType.DOUBLE)
.addMetric(NO_DICT_BIG_DECIMAL_COLUMN, DataType.BIG_DECIMAL)
.addSingleValueDimension(NO_DICT_STRING_COLUMN, DataType.STRING)
.addSingleValueDimension(NO_DICT_BYTES_COLUMN, DataType.BYTES)
.addSingleValueDimension(NO_DICT_HEX_STRING_COLUMN, DataType.STRING).build();
String segmentName = "testSegment";
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(TEMP_DIR.getPath());
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, new GenericRowRecordReader(rows));
driver.build();
_indexSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, segmentName), ReadMode.mmap);
Map<String, DataSource> dataSourceMap = new HashMap<>();
for (String column : _indexSegment.getPhysicalColumnNames()) {
dataSourceMap.put(column, _indexSegment.getDataSource(column));
}
_dataFetcher = new DataFetcher(dataSourceMap);
}
@Test
public void testFetchIntValues() {
testFetchIntValues(INT_COLUMN);
testFetchIntValues(LONG_COLUMN);
testFetchIntValues(FLOAT_COLUMN);
testFetchIntValues(DOUBLE_COLUMN);
testFetchIntValues(BIG_DECIMAL_COLUMN);
testFetchIntValues(STRING_COLUMN);
testFetchIntValues(NO_DICT_INT_COLUMN);
testFetchIntValues(NO_DICT_LONG_COLUMN);
testFetchIntValues(NO_DICT_FLOAT_COLUMN);
testFetchIntValues(NO_DICT_DOUBLE_COLUMN);
testFetchIntValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchIntValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchLongValues() {
testFetchLongValues(INT_COLUMN);
testFetchLongValues(LONG_COLUMN);
testFetchLongValues(FLOAT_COLUMN);
testFetchLongValues(DOUBLE_COLUMN);
testFetchLongValues(BIG_DECIMAL_COLUMN);
testFetchLongValues(STRING_COLUMN);
testFetchLongValues(NO_DICT_INT_COLUMN);
testFetchLongValues(NO_DICT_LONG_COLUMN);
testFetchLongValues(NO_DICT_FLOAT_COLUMN);
testFetchLongValues(NO_DICT_DOUBLE_COLUMN);
testFetchLongValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchLongValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchFloatValues() {
testFetchFloatValues(INT_COLUMN);
testFetchFloatValues(LONG_COLUMN);
testFetchFloatValues(FLOAT_COLUMN);
testFetchFloatValues(DOUBLE_COLUMN);
testFetchFloatValues(BIG_DECIMAL_COLUMN);
testFetchFloatValues(STRING_COLUMN);
testFetchFloatValues(NO_DICT_INT_COLUMN);
testFetchFloatValues(NO_DICT_LONG_COLUMN);
testFetchFloatValues(NO_DICT_FLOAT_COLUMN);
testFetchFloatValues(NO_DICT_DOUBLE_COLUMN);
testFetchFloatValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchFloatValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchDoubleValues() {
testFetchDoubleValues(INT_COLUMN);
testFetchDoubleValues(LONG_COLUMN);
testFetchDoubleValues(FLOAT_COLUMN);
testFetchDoubleValues(DOUBLE_COLUMN);
testFetchDoubleValues(BIG_DECIMAL_COLUMN);
testFetchDoubleValues(STRING_COLUMN);
testFetchDoubleValues(NO_DICT_INT_COLUMN);
testFetchDoubleValues(NO_DICT_LONG_COLUMN);
testFetchDoubleValues(NO_DICT_FLOAT_COLUMN);
testFetchDoubleValues(NO_DICT_DOUBLE_COLUMN);
testFetchDoubleValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchDoubleValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchBigDecimalValues() {
testFetchBigDecimalValues(INT_COLUMN);
testFetchBigDecimalValues(LONG_COLUMN);
testFetchBigDecimalValues(FLOAT_COLUMN);
testFetchBigDecimalValues(DOUBLE_COLUMN);
testFetchBigDecimalValues(BIG_DECIMAL_COLUMN);
testFetchBigDecimalValues(STRING_COLUMN);
testFetchBigDecimalValues(NO_DICT_INT_COLUMN);
testFetchBigDecimalValues(NO_DICT_LONG_COLUMN);
testFetchBigDecimalValues(NO_DICT_FLOAT_COLUMN);
testFetchBigDecimalValues(NO_DICT_DOUBLE_COLUMN);
testFetchBigDecimalValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchBigDecimalValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchStringValues() {
testFetchStringValues(INT_COLUMN);
testFetchStringValues(LONG_COLUMN);
testFetchStringValues(FLOAT_COLUMN);
testFetchStringValues(DOUBLE_COLUMN);
testFetchStringValues(BIG_DECIMAL_COLUMN);
testFetchStringValues(STRING_COLUMN);
testFetchStringValues(NO_DICT_INT_COLUMN);
testFetchStringValues(NO_DICT_LONG_COLUMN);
testFetchStringValues(NO_DICT_FLOAT_COLUMN);
testFetchStringValues(NO_DICT_DOUBLE_COLUMN);
testFetchStringValues(NO_DICT_BIG_DECIMAL_COLUMN);
testFetchStringValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchBytesValues() {
testFetchBytesValues(BYTES_COLUMN);
testFetchBytesValues(NO_DICT_BYTES_COLUMN);
testFetchBytesValues(STRING_COLUMN);
testFetchBytesValues(NO_DICT_STRING_COLUMN);
}
@Test
public void testFetchHexStringValues() {
testFetchHexStringValues(BYTES_COLUMN);
testFetchHexStringValues(HEX_STRING_COLUMN);
testFetchHexStringValues(NO_DICT_BYTES_COLUMN);
testFetchHexStringValues(NO_DICT_HEX_STRING_COLUMN);
}
public void testFetchIntValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
int[] intValues = new int[length];
_dataFetcher.fetchIntValues(column, docIds, length, intValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals(intValues[i], _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchLongValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
long[] longValues = new long[length];
_dataFetcher.fetchLongValues(column, docIds, length, longValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals((int) longValues[i], _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchFloatValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
float[] floatValues = new float[length];
_dataFetcher.fetchFloatValues(column, docIds, length, floatValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals((int) floatValues[i], _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchDoubleValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
double[] doubleValues = new double[length];
_dataFetcher.fetchDoubleValues(column, docIds, length, doubleValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals((int) doubleValues[i], _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchBigDecimalValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
BigDecimal[] bigDecimalValues = new BigDecimal[length];
_dataFetcher.fetchBigDecimalValues(column, docIds, length, bigDecimalValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals(bigDecimalValues[i].intValue(), _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchStringValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
String[] stringValues = new String[length];
_dataFetcher.fetchStringValues(column, docIds, length, stringValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals((int) Double.parseDouble(stringValues[i]), _values[docIds[i]], ERROR_MESSAGE);
}
}
public void testFetchBytesValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
byte[][] bytesValues = new byte[length][];
_dataFetcher.fetchBytesValues(column, docIds, length, bytesValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals(new String(bytesValues[i], UTF_8), Integer.toString(_values[docIds[i]]), ERROR_MESSAGE);
}
}
public void testFetchHexStringValues(String column) {
int[] docIds = new int[NUM_ROWS];
int length = 0;
for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
docIds[length++] = i;
}
String[] hexStringValues = new String[length];
_dataFetcher.fetchStringValues(column, docIds, length, hexStringValues);
for (int i = 0; i < length; i++) {
Assert.assertEquals(new String(BytesUtils.toBytes(hexStringValues[i]), UTF_8),
Integer.toString(_values[docIds[i]]), ERROR_MESSAGE);
}
}
@AfterClass
public void tearDown()
throws IOException {
_indexSegment.destroy();
FileUtils.deleteDirectory(TEMP_DIR);
}
}