blob: e11c27c38c778295564bd741f8b677f8a5f50b41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.firstlast.first;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongLong;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class LongFirstAggregationTest extends InitializedNullHandlingTest
{
private LongFirstAggregatorFactory longFirstAggFactory;
private LongFirstAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestLongColumnSelector customTimeSelector;
private TestLongColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE};
private long[] times = {1123126751, 1784247991, 1854329816, 1000000000};
private long[] customTimes = {2, 1, 3, 4};
private SerializablePairLongLong[] pairs = {
new SerializablePairLongLong(1L, 113267L),
new SerializablePairLongLong(1L, 5437384L),
new SerializablePairLongLong(6L, 34583458L),
new SerializablePairLongLong(88L, 34583452L)
};
@Before
public void setup()
{
longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly", null);
combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
customTimeSelector = new TestLongColumnSelector(customTimes);
valueSelector = new TestLongColumnSelector(longValues);
objectSelector = new TestObjectColumnSelector<>(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.LONG));
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testLongFirstAggregator()
{
Aggregator agg = longFirstAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong());
Assert.assertEquals(longValues[3], agg.getFloat(), 0.0001);
}
@Test
public void testLongFirstAggregatorWithTimeColumn()
{
Aggregator agg = new LongFirstAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Assert.assertEquals(customTimes[1], result.lhs.longValue());
Assert.assertEquals(longValues[1], result.rhs.longValue());
Assert.assertEquals(longValues[1], agg.getLong());
Assert.assertEquals(longValues[1], agg.getFloat(), 0.0001);
}
@Test
public void testLongFirstBufferAggregator()
{
BufferAggregator agg = new LongFirstAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Assert.assertEquals(customTimes[1], result.lhs.longValue());
Assert.assertEquals(longValues[1], result.rhs.longValue());
Assert.assertEquals(longValues[1], agg.getLong(buffer, 0));
Assert.assertEquals(longValues[1], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testLongFirstBufferAggregatorWithTimeColumn()
{
BufferAggregator agg = longFirstAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong(buffer, 0));
Assert.assertEquals(longValues[3], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 1263L);
SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, 752713L);
Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2));
}
@Test
public void testComparatorWithNulls()
{
SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 1263L);
SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, null);
Comparator comparator = longFirstAggFactory.getComparator();
Assert.assertEquals(1, comparator.compare(pair1, pair2));
Assert.assertEquals(0, comparator.compare(pair1, pair1));
Assert.assertEquals(0, comparator.compare(pair2, pair2));
Assert.assertEquals(-1, comparator.compare(pair2, pair1));
}
@Test
public void testLongFirstCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Pair<Long, Long> expected = (Pair<Long, Long>) pairs[0];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);
}
@Test
public void testLongFirstCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Pair<Long, Long> expected = (Pair<Long, Long>) pairs[0];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
AggregatorFactory deserialized = mapper.readValue(longSpecJson, AggregatorFactory.class);
Assert.assertEquals(longFirstAggFactory, deserialized);
Assert.assertArrayEquals(longFirstAggFactory.getCacheKey(), deserialized.getCacheKey());
}
@Test
public void testLongFirstAggregeCombiner()
{
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs);
AggregateCombiner longFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner();
longFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject());
columnSelector.increment();
longFirstAggregateCombiner.fold(columnSelector);
Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject());
columnSelector.increment();
longFirstAggregateCombiner.fold(columnSelector);
Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject());
longFirstAggregateCombiner.reset(columnSelector);
Assert.assertEquals(pairs[2], longFirstAggregateCombiner.getObject());
}
private void aggregate(
Aggregator agg
)
{
agg.aggregate();
timeSelector.increment();
customTimeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
BufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
customTimeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}