blob: 20e4d487107eba6b19350b1cad1b5acfca4f6168 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class SortMergeJoinFrameProcessorTest extends InitializedNullHandlingTest
{
private static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0);
private static final long MAX_BUFFERED_BYTES = 10_000_000;
private final int rowsPerInputFrame;
private final int rowsPerOutputFrame;
private FrameProcessorExecutor exec;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public SortMergeJoinFrameProcessorTest(int rowsPerInputFrame, int rowsPerOutputFrame)
{
this.rowsPerInputFrame = rowsPerInputFrame;
this.rowsPerOutputFrame = rowsPerOutputFrame;
}
@Parameterized.Parameters(name = "rowsPerInputFrame = {0}, rowsPerOutputFrame = {1}")
public static Iterable<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (final int rowsPerInputFrame : new int[]{1, 2, 7, Integer.MAX_VALUE}) {
for (final int rowsPerOutputFrame : new int[]{1, 2, 7, Integer.MAX_VALUE}) {
constructors.add(new Object[]{rowsPerInputFrame, rowsPerOutputFrame});
}
}
return constructors;
}
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
}
@Test
public void testLeftJoinEmptyLeftSide() throws Exception
{
final ReadableInput factChannel = ReadableInput.channel(
ReadableNilFrameChannel.INSTANCE,
FrameReader.create(JoinTestHelper.FACT_SIGNATURE),
STAGE_PARTITION
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.LEFT,
MAX_BUFFERED_BYTES
);
assertResult(processor, outputChannel.readable(), joinSignature, Collections.emptyList());
}
@Test
public void testLeftJoinEmptyRightSide() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel = ReadableInput.channel(
ReadableNilFrameChannel.INSTANCE,
FrameReader.create(JoinTestHelper.COUNTRIES_SIGNATURE),
STAGE_PARTITION
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.LEFT,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", null, null, null, null),
Arrays.asList("Apamea abruzzorum", null, null, null, null),
Arrays.asList("Atractus flammigerus", null, null, null, null),
Arrays.asList("Rallicula", null, null, null, null),
Arrays.asList("Talk:Oswald Tilghman", null, null, null, null),
Arrays.asList("Peremptory norm", "AU", null, null, null),
Arrays.asList("Didier Leclair", "CA", null, null, null),
Arrays.asList("Les Argonautes", "CA", null, null, null),
Arrays.asList("Sarah Michelle Gellar", "CA", null, null, null),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", null, null, null),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", null, null, null),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", null, null, null),
Arrays.asList("Saison 9 de Secret Story", "FR", null, null, null),
Arrays.asList("Glasgow", "GB", null, null, null),
Arrays.asList("Giusy Ferreri discography", "IT", null, null, null),
Arrays.asList("Roma-Bangkok", "IT", null, null, null),
Arrays.asList("青野武", "JP", null, null, null),
Arrays.asList("유희왕 GX", "KR", null, null, null),
Arrays.asList("History of Fourems", "MMMM", null, null, null),
Arrays.asList("Mathis Bolly", "MX", null, null, null),
Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
Arrays.asList("Алиса в Зазеркалье", "NO", null, null, null),
Arrays.asList("Cream Soda", "SU", null, null, null),
Arrays.asList("Wendigo", "SV", null, null, null),
Arrays.asList("Carlo Curti", "US", null, null, null),
Arrays.asList("DirecTV", "US", null, null, null),
Arrays.asList("Old Anatolian Turkish", "US", null, null, null),
Arrays.asList("Otjiwarongo Airport", "US", null, null, null),
Arrays.asList("President of India", "US", null, null, null)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinEmptyRightSide() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel = ReadableInput.channel(
ReadableNilFrameChannel.INSTANCE,
FrameReader.create(JoinTestHelper.COUNTRIES_SIGNATURE),
STAGE_PARTITION
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
MAX_BUFFERED_BYTES
);
assertResult(processor, outputChannel.readable(), joinSignature, Collections.emptyList());
}
@Test
public void testLeftJoinCountryIsoCode() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.LEFT,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", null, null, null, null),
Arrays.asList("Apamea abruzzorum", null, null, null, null),
Arrays.asList("Atractus flammigerus", null, null, null, null),
Arrays.asList("Rallicula", null, null, null, null),
Arrays.asList("Talk:Oswald Tilghman", null, null, null, null),
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testCrossJoin() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel = makeChannelFromResourceWithLimit(
JoinTestHelper.COUNTRIES_RESOURCE,
JoinTestHelper.COUNTRIES_SIGNATURE,
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
2
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
countriesChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(Collections.emptyList(), Collections.emptyList()),
new int[0],
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", "AU"),
Arrays.asList("Agama mossambica", "CA"),
Arrays.asList("Apamea abruzzorum", "AU"),
Arrays.asList("Apamea abruzzorum", "CA"),
Arrays.asList("Atractus flammigerus", "AU"),
Arrays.asList("Atractus flammigerus", "CA"),
Arrays.asList("Rallicula", "AU"),
Arrays.asList("Rallicula", "CA"),
Arrays.asList("Talk:Oswald Tilghman", "AU"),
Arrays.asList("Talk:Oswald Tilghman", "CA"),
Arrays.asList("Peremptory norm", "AU"),
Arrays.asList("Peremptory norm", "CA"),
Arrays.asList("Didier Leclair", "AU"),
Arrays.asList("Didier Leclair", "CA"),
Arrays.asList("Les Argonautes", "AU"),
Arrays.asList("Les Argonautes", "CA"),
Arrays.asList("Sarah Michelle Gellar", "AU"),
Arrays.asList("Sarah Michelle Gellar", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "AU"),
Arrays.asList("Golpe de Estado en Chile de 1973", "CA"),
Arrays.asList("Diskussion:Sebastian Schulz", "AU"),
Arrays.asList("Diskussion:Sebastian Schulz", "CA"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "AU"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "CA"),
Arrays.asList("Saison 9 de Secret Story", "AU"),
Arrays.asList("Saison 9 de Secret Story", "CA"),
Arrays.asList("Glasgow", "AU"),
Arrays.asList("Glasgow", "CA"),
Arrays.asList("Giusy Ferreri discography", "AU"),
Arrays.asList("Giusy Ferreri discography", "CA"),
Arrays.asList("Roma-Bangkok", "AU"),
Arrays.asList("Roma-Bangkok", "CA"),
Arrays.asList("青野武", "AU"),
Arrays.asList("青野武", "CA"),
Arrays.asList("유희왕 GX", "AU"),
Arrays.asList("유희왕 GX", "CA"),
Arrays.asList("History of Fourems", "AU"),
Arrays.asList("History of Fourems", "CA"),
Arrays.asList("Mathis Bolly", "AU"),
Arrays.asList("Mathis Bolly", "CA"),
Arrays.asList("Orange Soda", "AU"),
Arrays.asList("Orange Soda", "CA"),
Arrays.asList("Алиса в Зазеркалье", "AU"),
Arrays.asList("Алиса в Зазеркалье", "CA"),
Arrays.asList("Cream Soda", "AU"),
Arrays.asList("Cream Soda", "CA"),
Arrays.asList("Wendigo", "AU"),
Arrays.asList("Wendigo", "CA"),
Arrays.asList("Carlo Curti", "AU"),
Arrays.asList("Carlo Curti", "CA"),
Arrays.asList("DirecTV", "AU"),
Arrays.asList("DirecTV", "CA"),
Arrays.asList("Old Anatolian Turkish", "AU"),
Arrays.asList("Old Anatolian Turkish", "CA"),
Arrays.asList("Otjiwarongo Airport", "AU"),
Arrays.asList("Otjiwarongo Airport", "CA"),
Arrays.asList("President of India", "AU"),
Arrays.asList("President of India", "CA")
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testLeftJoinRegions() throws Exception
{
final ReadableInput factChannel =
buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput regionsChannel =
buildRegionsInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
)
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("j0.regionName", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
regionsChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
),
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
)
),
new int[]{0, 1},
JoinType.LEFT,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", null, null),
Arrays.asList("Apamea abruzzorum", null, null),
Arrays.asList("Atractus flammigerus", null, null),
Arrays.asList("Rallicula", null, null),
Arrays.asList("Talk:Oswald Tilghman", null, null),
Arrays.asList("Peremptory norm", "New South Wales", "AU"),
Arrays.asList("Didier Leclair", "Ontario", "CA"),
Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
Arrays.asList("Les Argonautes", "Quebec", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago Metropolitan", "CL"),
Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "EC"),
Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", "IT"),
Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
Arrays.asList("青野武", "Tōkyō", "JP"),
Arrays.asList("유희왕 GX", "Seoul", "KR"),
Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
Arrays.asList("Orange Soda", null, "MatchNothing"),
Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
Arrays.asList("Cream Soda", "Ainigriv", "SU"),
Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
Arrays.asList("Carlo Curti", "California", "US"),
Arrays.asList("Otjiwarongo Airport", "California", "US"),
Arrays.asList("President of India", "California", "US"),
Arrays.asList("DirecTV", "North Carolina", "US"),
Arrays.asList("Old Anatolian Turkish", "Virginia", "US")
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testRightJoinRegionCodeOnly() throws Exception
{
// This join generates duplicates.
final ReadableInput factChannel =
buildFactInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput regionsChannel =
buildRegionsInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
)
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("regionName", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
regionsChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.RIGHT,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", null, null),
Arrays.asList("Apamea abruzzorum", null, null),
Arrays.asList("Atractus flammigerus", null, null),
Arrays.asList("Rallicula", null, null),
Arrays.asList("Talk:Oswald Tilghman", null, null),
Arrays.asList("유희왕 GX", "Seoul", "KR"),
Arrays.asList("青野武", "Tōkyō", "JP"),
Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
Arrays.asList("Cream Soda", "Ainigriv", "SU"),
Arrays.asList("Carlo Curti", "California", "US"),
Arrays.asList("Otjiwarongo Airport", "California", "US"),
Arrays.asList("President of India", "California", "US"),
Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "EC"),
Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
Arrays.asList("Orange Soda", null, "MatchNothing"),
Arrays.asList("DirecTV", "North Carolina", "US"),
Arrays.asList("Peremptory norm", "New South Wales", "AU"),
Arrays.asList("Didier Leclair", "Ontario", "CA"),
Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
Arrays.asList("Les Argonautes", "Quebec", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago Metropolitan", "CL"),
Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", "IT"),
Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
Arrays.asList("Roma-Bangkok", "Virginia", "IT")
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testFullOuterJoinRegionCodeOnly() throws Exception
{
// This join generates duplicates.
final ReadableInput factChannel =
buildFactInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput regionsChannel =
buildRegionsInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
)
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("regionName", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
regionsChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.FULL,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList(null, "Nulland", null),
Arrays.asList("Agama mossambica", null, null),
Arrays.asList("Apamea abruzzorum", null, null),
Arrays.asList("Atractus flammigerus", null, null),
Arrays.asList("Rallicula", null, null),
Arrays.asList("Talk:Oswald Tilghman", null, null),
Arrays.asList("유희왕 GX", "Seoul", "KR"),
Arrays.asList("青野武", "Tōkyō", "JP"),
Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
Arrays.asList(null, "Foureis Province", null),
Arrays.asList("Cream Soda", "Ainigriv", "SU"),
Arrays.asList("Carlo Curti", "California", "US"),
Arrays.asList("Otjiwarongo Airport", "California", "US"),
Arrays.asList("President of India", "California", "US"),
Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "EC"),
Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
Arrays.asList("Orange Soda", null, "MatchNothing"),
Arrays.asList("DirecTV", "North Carolina", "US"),
Arrays.asList("Peremptory norm", "New South Wales", "AU"),
Arrays.asList("Didier Leclair", "Ontario", "CA"),
Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
Arrays.asList("Les Argonautes", "Quebec", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago Metropolitan", "CL"),
Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", "IT"),
Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
Arrays.asList("Roma-Bangkok", "Virginia", "IT"),
Arrays.asList(null, "Usca City", null)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinRegionCodeOnly() throws Exception
{
// This join generates duplicates.
final ReadableInput factChannel =
buildFactInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput regionsChannel =
buildRegionsInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
)
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("regionName", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
regionsChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("유희왕 GX", "Seoul", "KR"),
Arrays.asList("青野武", "Tōkyō", "JP"),
Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
Arrays.asList("Cream Soda", "Ainigriv", "SU"),
Arrays.asList("Carlo Curti", "California", "US"),
Arrays.asList("Otjiwarongo Airport", "California", "US"),
Arrays.asList("President of India", "California", "US"),
Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "EC"),
Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
Arrays.asList("DirecTV", "North Carolina", "US"),
Arrays.asList("Peremptory norm", "New South Wales", "AU"),
Arrays.asList("Didier Leclair", "Ontario", "CA"),
Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
Arrays.asList("Les Argonautes", "Quebec", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago Metropolitan", "CL"),
Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", "IT"),
Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
Arrays.asList("Roma-Bangkok", "Virginia", "IT")
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinRegionCodeOnlyIsNotDistinctFrom() throws Exception
{
// This join generates duplicates.
final ReadableInput factChannel =
buildFactInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput regionsChannel =
buildRegionsInput(
ImmutableList.of(
new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
)
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("regionName", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
regionsChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("regionIsoCode", KeyOrder.ASCENDING))
),
new int[0], // empty array: act as if IS NOT DISTINCT FROM
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Agama mossambica", "Nulland", null),
Arrays.asList("Apamea abruzzorum", "Nulland", null),
Arrays.asList("Atractus flammigerus", "Nulland", null),
Arrays.asList("Rallicula", "Nulland", null),
Arrays.asList("Talk:Oswald Tilghman", "Nulland", null),
Arrays.asList("유희왕 GX", "Seoul", "KR"),
Arrays.asList("青野武", "Tōkyō", "JP"),
Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
Arrays.asList("Cream Soda", "Ainigriv", "SU"),
Arrays.asList("Carlo Curti", "California", "US"),
Arrays.asList("Otjiwarongo Airport", "California", "US"),
Arrays.asList("President of India", "California", "US"),
Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "EC"),
Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
Arrays.asList("DirecTV", "North Carolina", "US"),
Arrays.asList("Peremptory norm", "New South Wales", "AU"),
Arrays.asList("Didier Leclair", "Ontario", "CA"),
Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
Arrays.asList("Les Argonautes", "Quebec", "CA"),
Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago Metropolitan", "CL"),
Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", "IT"),
Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
Arrays.asList("Roma-Bangkok", "Virginia", "IT")
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testLeftJoinCountryNumber() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryNumber", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.LEFT,
MAX_BUFFERED_BYTES
);
final String countryCodeForNull;
final String countryNameForNull;
final Long countryNumberForNull;
if (NullHandling.sqlCompatible()) {
countryCodeForNull = null;
countryNameForNull = null;
countryNumberForNull = null;
} else {
// In default-value mode, null country number from the left-hand table converts to zero, which matches Australia.
countryCodeForNull = "AU";
countryNameForNull = "Australia";
countryNumberForNull = 0L;
}
final List<List<Object>> expectedRows = Lists.newArrayList(
Arrays.asList("Agama mossambica", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Apamea abruzzorum", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Atractus flammigerus", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Rallicula", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Talk:Oswald Tilghman", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L)
);
if (!NullHandling.sqlCompatible()) {
// Sorting order is different in default-value mode, since 0 and null collapse.
// "Peremptory norm" moves before "Rallicula".
expectedRows.add(3, expectedRows.remove(5));
}
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testRightJoinCountryNumber() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryNumber", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("countryName", ColumnType.STRING)
.add("countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
countriesChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryNumber", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.RIGHT,
MAX_BUFFERED_BYTES
);
final String countryCodeForNull;
final String countryNameForNull;
final Long countryNumberForNull;
if (NullHandling.sqlCompatible()) {
countryCodeForNull = null;
countryNameForNull = null;
countryNumberForNull = null;
} else {
// In default-value mode, null country number from the left-hand table converts to zero, which matches Australia.
countryCodeForNull = "AU";
countryNameForNull = "Australia";
countryNumberForNull = 0L;
}
final List<List<Object>> expectedRows = Lists.newArrayList(
Arrays.asList("Agama mossambica", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Apamea abruzzorum", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Atractus flammigerus", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Rallicula", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Talk:Oswald Tilghman", null, countryCodeForNull, countryNameForNull, countryNumberForNull),
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L)
);
if (!NullHandling.sqlCompatible()) {
// Sorting order is different in default-value mode, since 0 and null collapse.
// "Peremptory norm" moves before "Rallicula".
expectedRows.add(3, expectedRows.remove(5));
}
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinCountryIsoCode() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinCountryIsoCodeNotDistinctFrom() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[0], // empty array: act as if IS NOT DISTINCT FROM
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinCountryIsoCode_withMaxBufferedBytesLimit_succeeds() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("page", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("j0.countryName", ColumnType.STRING)
.add("j0.countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel,
countriesChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
1
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testInnerJoinCountryIsoCode_backwards_withMaxBufferedBytesLimit_succeeds() throws Exception
{
final ReadableInput factChannel = buildFactInput(
ImmutableList.of(
new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
new KeyColumn("page", KeyOrder.ASCENDING)
)
);
final ReadableInput countriesChannel =
buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("j0.page", ColumnType.STRING)
.add("j0.countryIsoCode", ColumnType.STRING)
.add("countryIsoCode", ColumnType.STRING)
.add("countryName", ColumnType.STRING)
.add("countryNumber", ColumnType.LONG)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
countriesChannel,
factChannel,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
1
);
final List<List<Object>> expectedRows = Arrays.asList(
Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 2L),
Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 3L),
Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L),
Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
Arrays.asList("DirecTV", "US", "US", "United States", 13L),
Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 13L),
Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
Arrays.asList("President of India", "US", "US", "United States", 13L)
);
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testCountrySelfJoin() throws Exception
{
final ReadableInput factChannel1 = buildFactInput(ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)));
final ReadableInput factChannel2 = buildFactInput(ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("channel", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel1,
factChannel2,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
MAX_BUFFERED_BYTES
);
final List<List<Object>> expectedRows = new ArrayList<>();
final ImmutableMap<String, Long> expectedCounts =
ImmutableMap.<String, Long>builder()
.put("#ca.wikipedia", 1L)
.put("#de.wikipedia", 1L)
.put("#en.wikipedia", 196L)
.put("#es.wikipedia", 16L)
.put("#fr.wikipedia", 9L)
.put("#ja.wikipedia", 1L)
.put("#ko.wikipedia", 1L)
.put("#ru.wikipedia", 1L)
.put("#vi.wikipedia", 9L)
.build();
for (final Map.Entry<String, Long> entry : expectedCounts.entrySet()) {
for (int i = 0; i < Ints.checkedCast(entry.getValue()); i++) {
expectedRows.add(Collections.singletonList(entry.getKey()));
}
}
assertResult(processor, outputChannel.readable(), joinSignature, expectedRows);
}
@Test
public void testCountrySelfJoin_withMaxBufferedBytesLimit_fails() throws Exception
{
// Test is only valid when rowsPerInputFrame is low enough that we get multiple frames.
Assume.assumeThat(rowsPerInputFrame, Matchers.lessThanOrEqualTo(7));
final ReadableInput factChannel1 = buildFactInput(ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)));
final ReadableInput factChannel2 = buildFactInput(ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)));
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final RowSignature joinSignature =
RowSignature.builder()
.add("channel", ColumnType.STRING)
.build();
final SortMergeJoinFrameProcessor processor = new SortMergeJoinFrameProcessor(
factChannel1,
factChannel2,
outputChannel.writable(),
makeFrameWriterFactory(joinSignature),
"j0.",
ImmutableList.of(
ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING)),
ImmutableList.of(new KeyColumn("channel", KeyOrder.ASCENDING))
),
new int[]{0},
JoinType.INNER,
1
);
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
() -> run(processor, outputChannel.readable(), joinSignature)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(MSQException.class));
MatcherAssert.assertThat(
((MSQException) e.getCause().getCause()).getFault(),
CoreMatchers.instanceOf(TooManyRowsWithSameKeyFault.class)
);
}
private void assertResult(
final SortMergeJoinFrameProcessor processor,
final ReadableFrameChannel readableOutputChannel,
final RowSignature joinSignature,
final List<List<Object>> expectedRows
)
{
final List<List<Object>> rowsFromProcessor = run(processor, readableOutputChannel, joinSignature);
FrameTestUtil.assertRowsEqual(Sequences.simple(expectedRows), Sequences.simple(rowsFromProcessor));
}
private List<List<Object>> run(
final SortMergeJoinFrameProcessor processor,
final ReadableFrameChannel readableOutputChannel,
final RowSignature joinSignature
)
{
final ListenableFuture<Object> retValFromProcessor = exec.runFully(processor, null);
final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
readableOutputChannel,
FrameReader.create(joinSignature)
);
final List<List<Object>> rows = rowsFromProcessor.toList();
Assert.assertEquals(Unit.instance(), FutureUtils.getUnchecked(retValFromProcessor, true));
return rows;
}
private ReadableInput buildFactInput(final List<KeyColumn> keyColumns) throws IOException
{
return makeChannelFromResource(
JoinTestHelper.FACT_RESOURCE,
JoinTestHelper.FACT_SIGNATURE,
keyColumns
);
}
private ReadableInput buildCountriesInput(final List<KeyColumn> keyColumns) throws IOException
{
return makeChannelFromResource(
JoinTestHelper.COUNTRIES_RESOURCE,
JoinTestHelper.COUNTRIES_SIGNATURE,
keyColumns
);
}
private ReadableInput buildRegionsInput(final List<KeyColumn> keyColumns) throws IOException
{
return makeChannelFromResource(
JoinTestHelper.REGIONS_RESOURCE,
JoinTestHelper.REGIONS_SIGNATURE,
keyColumns
);
}
private ReadableInput makeChannelFromResource(
final String resource,
final RowSignature signature,
final List<KeyColumn> keyColumns
) throws IOException
{
return makeChannelFromResourceWithLimit(resource, signature, keyColumns, -1);
}
private ReadableInput makeChannelFromResourceWithLimit(
final String resource,
final RowSignature signature,
final List<KeyColumn> keyColumns,
final long limit
) throws IOException
{
try (final RowBasedSegment<Map<String, Object>> segment = JoinTestHelper.withRowsFromResource(
resource,
rows -> new RowBasedSegment<>(
SegmentId.dummy(resource),
limit < 0 ? Sequences.simple(rows) : Sequences.simple(rows).limit(limit),
columnName -> m -> m.get(columnName),
signature
)
)) {
final StorageAdapter adapter = segment.asStorageAdapter();
return makeChannelFromAdapter(adapter, keyColumns);
}
}
private ReadableInput makeChannelFromAdapter(
final StorageAdapter adapter,
final List<KeyColumn> keyColumns
) throws IOException
{
// Create a single, sorted frame.
final FrameSequenceBuilder singleFrameBuilder =
FrameSequenceBuilder.fromAdapter(adapter)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(Integer.MAX_VALUE)
.sortBy(keyColumns);
final RowSignature signature = singleFrameBuilder.signature();
final Frame frame = Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
// Split it up into frames that match rowsPerFrame. Set max size enough to hold all rows we might ever want to use.
final BlockingQueueFrameChannel channel = new BlockingQueueFrameChannel(10_000);
final FrameReader frameReader = FrameReader.create(signature);
final FrameSequenceBuilder frameSequenceBuilder =
FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY))
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(rowsPerInputFrame);
final Sequence<Frame> frames = frameSequenceBuilder.frames();
frames.forEach(
f -> {
try {
channel.writable().write(f);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
);
channel.writable().close();
return ReadableInput.channel(channel.readable(), FrameReader.create(signature), STAGE_PARTITION);
}
private FrameWriterFactory makeFrameWriterFactory(final RowSignature signature)
{
return new LimitedFrameWriterFactory(
FrameWriters.makeFrameWriterFactory(
FrameType.ROW_BASED,
new SingleMemoryAllocatorFactory(ArenaMemoryAllocator.createOnHeap(1_000_000)),
signature,
Collections.emptyList()
),
rowsPerOutputFrame
);
}
}