blob: 4cb00e657fbd1573bec555351bda53ef66164c61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.transform.function;
import java.io.File;
import java.math.BigDecimal;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.core.operator.DocIdSetOperator;
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class NullHandlingTransformFunctionTest {
private static final String SEGMENT_NAME = "testSegmentWithNulls";
private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
private static final Random RANDOM = new Random();
protected static final int NUM_ROWS = 1000;
protected static final String INT_SV_COLUMN = "intSV";
protected static final String LONG_SV_COLUMN = "longSV";
protected static final String FLOAT_SV_COLUMN = "floatSV";
protected static final String DOUBLE_SV_COLUMN = "doubleSV";
protected static final String STRING_SV_COLUMN = "stringSV";
protected static final String BYTES_SV_COLUMN = "bytesSV";
protected static final String TIMESTAMP_COLUMN = "timestampColumn";
protected static final String TIME_COLUMN = "timeColumn";
protected final long[] _timeValues = new long[NUM_ROWS];
protected final int[] _intSVValues = new int[NUM_ROWS];
protected final long[] _longSVValues = new long[NUM_ROWS];
protected final float[] _floatSVValues = new float[NUM_ROWS];
protected final double[] _doubleSVValues = new double[NUM_ROWS];
protected final String[] _stringSVValues = new String[NUM_ROWS];
protected final byte[][] _bytesSVValues = new byte[NUM_ROWS][];
protected final boolean[] _nullValues = new boolean[NUM_ROWS];
protected Map<String, DataSource> _dataSourceMap;
protected ProjectionBlock _projectionBlock;
@BeforeClass
public void setup()
throws Exception {
FileUtils.deleteQuietly(new File(INDEX_DIR_PATH));
DecimalFormat df = new DecimalFormat("0", DecimalFormatSymbols.getInstance(Locale.ENGLISH));
df.setMaximumFractionDigits(340); // 340 = DecimalFormat.DOUBLE_FRACTION_DIGITS
Random nullValueRandom = new Random(42);
long currentTimeMs = System.currentTimeMillis();
for (int i = 0; i < NUM_ROWS; i++) {
_intSVValues[i] = RANDOM.nextInt();
_longSVValues[i] = RANDOM.nextLong();
_floatSVValues[i] = _intSVValues[i] * RANDOM.nextFloat();
_doubleSVValues[i] = _intSVValues[i] * RANDOM.nextDouble();
_stringSVValues[i] = df.format(_intSVValues[i] * RANDOM.nextDouble());
_bytesSVValues[i] = RandomStringUtils.randomAlphanumeric(26).getBytes();
_timeValues[i] = currentTimeMs - RANDOM.nextInt(365 * 24 * 3600) * 1000L;
_nullValues[i] = nullValueRandom.nextInt(2) > 0;
}
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
Map<String, Object> map = new HashMap<>();
if (!_nullValues[i]) {
map.put(INT_SV_COLUMN, _intSVValues[i]);
map.put(LONG_SV_COLUMN, _longSVValues[i]);
map.put(FLOAT_SV_COLUMN, _floatSVValues[i]);
map.put(DOUBLE_SV_COLUMN, _doubleSVValues[i]);
map.put(STRING_SV_COLUMN, _stringSVValues[i]);
map.put(BYTES_SV_COLUMN, _bytesSVValues[i]);
} else {
map.put(INT_SV_COLUMN, null);
map.put(LONG_SV_COLUMN, null);
map.put(FLOAT_SV_COLUMN, null);
map.put(DOUBLE_SV_COLUMN, null);
map.put(STRING_SV_COLUMN, null);
map.put(BYTES_SV_COLUMN, null);
}
map.put(TIMESTAMP_COLUMN, _timeValues[i]);
map.put(TIME_COLUMN, _timeValues[i]);
GenericRow row = new GenericRow();
row.init(map);
rows.add(row);
}
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN, DataType.INT)
.addSingleValueDimension(LONG_SV_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_SV_COLUMN, DataType.FLOAT)
.addSingleValueDimension(DOUBLE_SV_COLUMN, DataType.DOUBLE)
.addSingleValueDimension(STRING_SV_COLUMN, DataType.STRING)
.addSingleValueDimension(BYTES_SV_COLUMN, DataType.BYTES)
.addDateTime(TIMESTAMP_COLUMN, DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testWithNulls").setNullHandlingEnabled(true)
.setTimeColumnName(TIME_COLUMN).build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR_PATH);
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, new GenericRowRecordReader(rows));
driver.build();
IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR_PATH, SEGMENT_NAME), ReadMode.heap);
Set<String> columnNames = indexSegment.getPhysicalColumnNames();
_dataSourceMap = new HashMap<>(columnNames.size());
for (String columnName : columnNames) {
_dataSourceMap.put(columnName, indexSegment.getDataSource(columnName));
}
_projectionBlock = new ProjectionOperator(_dataSourceMap,
new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
}
@Test
public void testIsNullTransformFunction()
throws Exception {
testIsNullTransformFunction(INT_SV_COLUMN);
testIsNullTransformFunction(LONG_SV_COLUMN);
testIsNullTransformFunction(FLOAT_SV_COLUMN);
testIsNullTransformFunction(DOUBLE_SV_COLUMN);
testIsNullTransformFunction(STRING_SV_COLUMN);
testIsNullTransformFunction(BYTES_SV_COLUMN);
}
public void testIsNullTransformFunction(String columnName)
throws Exception {
ExpressionContext expression = RequestContextUtils.getExpression(String.format("%s IS NULL", columnName));
TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
assertTrue(transformFunction instanceof IsNullTransformFunction);
assertEquals(transformFunction.getName(), TransformFunctionType.IS_NULL.getName());
TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
assertEquals(resultMetadata.getDataType(), DataType.BOOLEAN);
assertTrue(resultMetadata.isSingleValue());
assertFalse(resultMetadata.hasDictionary());
boolean[] expectedValues = new boolean[NUM_ROWS];
for (int i = 0; i < NUM_ROWS; i++) {
expectedValues[i] = _nullValues[i];
}
testTransformFunction(expression, expectedValues);
}
@Test
public void testIsNotNullTransformFunction()
throws Exception {
testIsNotNullTransformFunction(INT_SV_COLUMN);
testIsNotNullTransformFunction(LONG_SV_COLUMN);
testIsNotNullTransformFunction(FLOAT_SV_COLUMN);
testIsNotNullTransformFunction(DOUBLE_SV_COLUMN);
testIsNotNullTransformFunction(STRING_SV_COLUMN);
testIsNotNullTransformFunction(BYTES_SV_COLUMN);
}
public void testIsNotNullTransformFunction(String columnName)
throws Exception {
ExpressionContext expression = RequestContextUtils.getExpression(String.format("%s IS NOT NULL", columnName));
TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
assertTrue(transformFunction instanceof IsNotNullTransformFunction);
assertEquals(transformFunction.getName(), TransformFunctionType.IS_NOT_NULL.getName());
TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
assertEquals(resultMetadata.getDataType(), DataType.BOOLEAN);
assertTrue(resultMetadata.isSingleValue());
assertFalse(resultMetadata.hasDictionary());
boolean[] expectedValues = new boolean[NUM_ROWS];
for (int i = 0; i < NUM_ROWS; i++) {
expectedValues[i] = !_nullValues[i];
}
testTransformFunction(expression, expectedValues);
}
protected void testTransformFunction(ExpressionContext expression, boolean[] expectedValues)
throws Exception {
int[] intValues = getTransformFunctionInstance(expression).transformToIntValuesSV(_projectionBlock);
long[] longValues = getTransformFunctionInstance(expression).transformToLongValuesSV(_projectionBlock);
float[] floatValues = getTransformFunctionInstance(expression).transformToFloatValuesSV(_projectionBlock);
double[] doubleValues = getTransformFunctionInstance(expression).transformToDoubleValuesSV(_projectionBlock);
BigDecimal[] bigDecimalValues =
getTransformFunctionInstance(expression).transformToBigDecimalValuesSV(_projectionBlock);
// TODO: Support implicit cast from BOOLEAN to STRING
// String[] stringValues = getTransformFunctionInstance(expression).transformToStringValuesSV(_projectionBlock);
for (int i = 0; i < NUM_ROWS; i++) {
assertEquals(intValues[i] == 1, expectedValues[i]);
assertEquals(longValues[i] == 1, expectedValues[i]);
assertEquals(floatValues[i] == 1, expectedValues[i]);
assertEquals(doubleValues[i] == 1, expectedValues[i]);
assertEquals(bigDecimalValues[i].intValue() == 1, expectedValues[i]);
// assertEquals(stringValues[i], Boolean.toString(expectedValues[i]));
}
}
private TransformFunction getTransformFunctionInstance(ExpressionContext expression) {
return TransformFunctionFactory.get(expression, _dataSourceMap);
}
}