blob: 68aea948d6b172ade7a5a290800024eec0fdc890 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.firstlast.last;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
public class StringLastVectorAggregatorTest extends InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final String[] VALUES = new String[]{"a", "b", null, "c"};
private static final int[] DICT_VALUES = new int[]{1, 2, 0, 3};
private static final long[] LONG_VALUES = new long[]{1L, 2L, 3L, 4L};
private static final String[] STRING_VALUES = new String[]{"1", "2", "3", "4"};
private static final float[] FLOAT_VALUES = new float[]{1.0f, 2.0f, 3.0f, 4.0f};
private static final double[] DOUBLE_VALUES = new double[]{1.0, 2.0, 3.0, 4.0};
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
private static final boolean[] NULLS1 = new boolean[]{false, false};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String FIELD_NAME_LONG = "LONG_NAME";
private static final String TIME_COL = "__time";
private final long[] times = {2436, 6879, 7888, 8224};
private final long[] timesSame = {2436, 2436};
private final SerializablePairLongString[] pairs = {
new SerializablePairLongString(2345100L, "last"),
new SerializablePairLongString(2345001L, "notLast")
};
private VectorObjectSelector selector;
private BaseLongVectorValueSelector timeSelector;
private VectorValueSelector nonStringValueSelector;
private ByteBuffer buf;
private StringLastVectorAggregator target;
private StringLastVectorAggregator targetWithPairs;
private StringLastAggregatorFactory stringLastAggregatorFactory;
private StringLastAggregatorFactory stringLastAggregatorFactory1;
private SingleStringLastDimensionVectorAggregator targetSingleDim;
private VectorColumnSelectorFactory selectorFactory;
@Before
public void setup()
{
byte[] randomBytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(randomBytes);
buf = ByteBuffer.wrap(randomBytes);
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length))
{
@Override
public long[] getLongVector()
{
return times;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return null;
}
};
nonStringValueSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(
LONG_VALUES.length,
0,
LONG_VALUES.length
))
{
@Override
public long[] getLongVector()
{
return LONG_VALUES;
}
@Override
public float[] getFloatVector()
{
return FLOAT_VALUES;
}
@Override
public double[] getDoubleVector()
{
return DOUBLE_VALUES;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return NULLS;
}
@Override
public int getMaxVectorSize()
{
return 4;
}
@Override
public int getCurrentVectorSize()
{
return 4;
}
};
selector = new VectorObjectSelector()
{
@Override
public Object[] getObjectVector()
{
return VALUES;
}
@Override
public int getMaxVectorSize()
{
return 0;
}
@Override
public int getCurrentVectorSize()
{
return 0;
}
};
BaseLongVectorValueSelector timeSelectorForPairs = new BaseLongVectorValueSelector(new NoFilterVectorOffset(
timesSame.length,
0,
timesSame.length
))
{
@Override
public long[] getLongVector()
{
return timesSame;
}
@Nullable
@Override
public boolean[] getNullVector()
{
return NULLS1;
}
};
VectorObjectSelector selectorForPairs = new VectorObjectSelector()
{
@Override
public Object[] getObjectVector()
{
return pairs;
}
@Override
public int getMaxVectorSize()
{
return 2;
}
@Override
public int getCurrentVectorSize()
{
return 2;
}
};
selectorFactory = new VectorColumnSelectorFactory()
{
@Override
public ReadableVectorInspector getReadableVectorInspector()
{
return new NoFilterVectorOffset(LONG_VALUES.length, 0, LONG_VALUES.length);
}
@Override
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
return new SingleValueDimensionVectorSelector()
{
@Override
public int[] getRowVector()
{
return DICT_VALUES;
}
@Override
public int getValueCardinality()
{
return DICT_VALUES.length;
}
@Nullable
@Override
public String lookupName(int id)
{
switch (id) {
case 1:
return "a";
case 2:
return "b";
case 3:
return "c";
default:
return null;
}
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
return null;
}
@Override
public int getMaxVectorSize()
{
return DICT_VALUES.length;
}
@Override
public int getCurrentVectorSize()
{
return DICT_VALUES.length;
}
};
}
@Override
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public VectorValueSelector makeValueSelector(String column)
{
if (TIME_COL.equals(column)) {
return timeSelector;
} else if (FIELD_NAME_LONG.equals(column)) {
return nonStringValueSelector;
}
return null;
}
@Override
public VectorObjectSelector makeObjectSelector(String column)
{
if (FIELD_NAME.equals(column)) {
return selector;
} else {
return null;
}
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (FIELD_NAME.equals(column)) {
return ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities();
} else if (FIELD_NAME_LONG.equals(column)) {
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
}
return null;
}
};
target = new StringLastVectorAggregator(timeSelector, selector, 10);
targetWithPairs = new StringLastVectorAggregator(timeSelectorForPairs, selectorForPairs, 10);
targetSingleDim = new SingleStringLastDimensionVectorAggregator(timeSelector, selectorFactory.makeSingleValueDimensionSelector(
DefaultDimensionSpec.of(FIELD_NAME)), 10);
clearBufferForPositions(0, 0);
stringLastAggregatorFactory = new StringLastAggregatorFactory(NAME, FIELD_NAME, TIME_COL, 10);
stringLastAggregatorFactory1 = new StringLastAggregatorFactory(NAME, FIELD_NAME_LONG, TIME_COL, 10);
}
@Test
public void testAggregateWithPairs()
{
targetWithPairs.aggregate(buf, 0, 0, pairs.length);
Pair<Long, String> result = (Pair<Long, String>) targetWithPairs.get(buf, 0);
//Should come 0 as the last value as the left of the pair is greater
Assert.assertEquals(pairs[0].lhs.longValue(), result.lhs.longValue());
Assert.assertEquals(pairs[0].rhs, result.rhs);
}
@Test
public void testFactory()
{
Assert.assertTrue(stringLastAggregatorFactory.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = stringLastAggregatorFactory.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(StringLastVectorAggregator.class, vectorAggregator.getClass());
}
@Test
public void testStringLastOnNonStringColumns()
{
Assert.assertTrue(stringLastAggregatorFactory1.canVectorize(selectorFactory));
VectorAggregator vectorAggregator = stringLastAggregatorFactory1.factorizeVector(selectorFactory);
Assert.assertNotNull(vectorAggregator);
Assert.assertEquals(StringLastVectorAggregator.class, vectorAggregator.getClass());
vectorAggregator.aggregate(buf, 0, 0, LONG_VALUES.length);
Pair<Long, String> result = (Pair<Long, String>) vectorAggregator.get(buf, 0);
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(STRING_VALUES[3], result.rhs);
}
@Test
public void testInit()
{
target.init(buf, 0);
long initVal = buf.getLong(0);
Assert.assertEquals(DateTimes.MIN.getMillis(), initVal);
}
@Test
public void testAggregate()
{
target.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, 0);
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(VALUES[3], result.rhs);
}
@Test
public void testAggregateNoOp()
{
// Test that aggregates run just fine when the input field does not exist
StringLastVectorAggregator aggregator = new StringLastVectorAggregator(null, selector, 10);
aggregator.aggregate(buf, 0, 0, VALUES.length);
}
@Test
public void testAggregateBatchNoOp()
{
// Test that aggregates run just fine when the input field does not exist
StringLastVectorAggregator aggregator = new StringLastVectorAggregator(null, selector, 10);
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
aggregator.aggregate(buf, 3, positions, null, positionOffset);
}
@Test
public void testAggregateBatchWithoutRows()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
Assert.assertEquals(NullHandling.nullToEmptyIfNeeded(VALUES[i]), result.rhs);
}
}
@Test
public void testAggregateBatchWithRows()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
target.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) target.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
Assert.assertEquals(NullHandling.nullToEmptyIfNeeded(VALUES[rows[i]]), result.rhs);
}
}
@Test
public void testAggregateSingleDim()
{
targetSingleDim.aggregate(buf, 0, 0, VALUES.length);
Pair<Long, String> result = (Pair<Long, String>) targetSingleDim.get(buf, 0);
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(VALUES[3], result.rhs);
}
@Test
public void testAggregateBatchWithoutRowsSingleDim()
{
int[] positions = new int[]{0, 43, 70};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
targetSingleDim.aggregate(buf, 3, positions, null, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) targetSingleDim.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[i], result.lhs.longValue());
Assert.assertEquals(VALUES[i], result.rhs);
}
}
@Test
public void testAggregateBatchWithRowsSingleDim()
{
int[] positions = new int[]{0, 43, 70};
int[] rows = new int[]{3, 2, 0};
int positionOffset = 2;
clearBufferForPositions(positionOffset, positions);
targetSingleDim.aggregate(buf, 3, positions, rows, positionOffset);
for (int i = 0; i < positions.length; i++) {
Pair<Long, String> result = (Pair<Long, String>) targetSingleDim.get(buf, positions[i] + positionOffset);
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
Assert.assertEquals(VALUES[rows[i]], result.rhs);
}
}
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
target.init(buf, offset + position);
}
}
}