blob: 15712510db184c2a54c88a4552e6780d577769cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
public class StreamingMergeSortedGrouperTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testAggregate()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(10);
grouper.aggregate(12);
grouper.aggregate(12);
grouper.finish();
final List<Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{30L, 3L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{20L, 2L})
);
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(
expected,
unsortedEntries
);
}
@Test(timeout = 60_000L)
public void testEmptyIterator()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
grouper.finish();
Assert.assertTrue(!grouper.iterator(true).hasNext());
}
@Test(timeout = 60_000L)
public void testStreamingAggregateWithLargeBuffer() throws ExecutionException, InterruptedException
{
testStreamingAggregate(1024);
}
@Test(timeout = 60_000L)
public void testStreamingAggregateWithMinimumBuffer() throws ExecutionException, InterruptedException
{
testStreamingAggregate(83);
}
private void testStreamingAggregate(int bufferSize) throws ExecutionException, InterruptedException
{
final ExecutorService exec = Execs.multiThreaded(2, "merge-sorted-grouper-test-%d");
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, bufferSize);
final List<Entry<Integer>> expected = new ArrayList<>(1024);
for (int i = 0; i < 1024; i++) {
expected.add(new Entry<>(i, new Object[]{100L, 10L}));
}
try {
final Future future = exec.submit(() -> {
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < 1024; i++) {
for (int j = 0; j < 10; j++) {
grouper.aggregate(i);
}
}
grouper.finish();
});
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
final List<Entry<Integer>> actual = Ordering.from((Comparator<Entry<Integer>>) (o1, o2) -> Ints.compare(
o1.getKey(),
o2.getKey()
))
.sortedCopy(unsortedEntries);
if (!actual.equals(expected)) {
future.get(); // Check there is an exception occured
Assert.fail();
}
}
finally {
exec.shutdownNow();
}
}
@Test
public void testNotEnoughBuffer()
{
expectedException.expect(IllegalStateException.class);
if (NullHandling.replaceWithDefault()) {
expectedException.expectMessage("Buffer[50] should be large enough to store at least three records[20]");
} else {
expectedException.expectMessage("Buffer[50] should be large enough to store at least three records[21]");
}
newGrouper(GrouperTestUtil.newColumnSelectorFactory(), 50);
}
@Test
public void testTimeout()
{
expectedException.expect(RuntimeException.class);
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 100);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.iterator();
}
private StreamingMergeSortedGrouper<Integer> newGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize
)
{
final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
final StreamingMergeSortedGrouper<Integer> grouper = new StreamingMergeSortedGrouper<>(
Suppliers.ofInstance(buffer),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
new AggregatorFactory[]{
new LongSumAggregatorFactory("valueSum", "value"),
new CountAggregatorFactory("count")
},
System.currentTimeMillis() + 1000L
);
grouper.init();
return grouper;
}
}