blob: 1a87b3c9f86ad44024c47baac281b2c279d47228 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink.SinkWriter;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link ShuffleSink}. */
@RunWith(JUnit4.class)
public class ShuffleSinkTest {
private static final List<KV<Integer, String>> NO_KVS = Collections.emptyList();
private static final List<KV<Integer, String>> KVS =
Arrays.asList(
KV.of(1, "in 1a"),
KV.of(1, "in 1b"),
KV.of(2, "in 2a"),
KV.of(2, "in 2b"),
KV.of(3, "in 3"),
KV.of(4, "in 4a"),
KV.of(4, "in 4b"),
KV.of(4, "in 4c"),
KV.of(4, "in 4d"),
KV.of(5, "in 5"));
private static final List<KV<Integer, KV<String, Integer>>> NO_SORTING_KVS =
Collections.emptyList();
private static final List<KV<Integer, KV<String, Integer>>> SORTING_KVS =
Arrays.asList(
KV.of(1, KV.of("in 1a", 3)),
KV.of(1, KV.of("in 1b", 9)),
KV.of(2, KV.of("in 2a", 2)),
KV.of(2, KV.of("in 2b", 77)),
KV.of(3, KV.of("in 3", 33)),
KV.of(4, KV.of("in 4a", -123)),
KV.of(4, KV.of("in 4b", 0)),
KV.of(4, KV.of("in 4c", -1)),
KV.of(4, KV.of("in 4d", 1)),
KV.of(5, KV.of("in 5", 666)));
private static final Instant timestamp = new Instant(123000);
private static final IntervalWindow window = new IntervalWindow(timestamp, timestamp.plus(1000));
private void runTestWriteUngroupingShuffleSink(List<Integer> expected) throws Exception {
Coder<WindowedValue<Integer>> windowedValueCoder =
WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), new GlobalWindows().windowCoder());
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "STAGE");
ShuffleSink<Integer> shuffleSink =
new ShuffleSink<>(
PipelineOptionsFactory.create(),
null,
ShuffleSink.ShuffleKind.UNGROUPED,
windowedValueCoder,
executionContext,
TestOperationContext.create());
TestShuffleWriter shuffleWriter = new TestShuffleWriter();
List<Long> actualSizes = new ArrayList<>();
try (Sink.SinkWriter<WindowedValue<Integer>> shuffleSinkWriter =
shuffleSink.writer(shuffleWriter, "dataset")) {
for (Integer value : expected) {
actualSizes.add(shuffleSinkWriter.add(WindowedValue.valueInGlobalWindow(value)));
}
}
List<ShuffleEntry> records = shuffleWriter.getRecords();
List<Integer> actual = new ArrayList<>();
for (ShuffleEntry record : records) {
// Ignore the key.
byte[] valueBytes = record.getValue();
WindowedValue<Integer> value = CoderUtils.decodeFromByteArray(windowedValueCoder, valueBytes);
Assert.assertEquals(Lists.newArrayList(GlobalWindow.INSTANCE), value.getWindows());
actual.add(value.getValue());
}
Assert.assertEquals(expected, actual);
Assert.assertEquals(shuffleWriter.getSizes(), actualSizes);
}
void runTestWriteGroupingShuffleSink(List<KV<Integer, String>> expected) throws Exception {
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "STAGE");
ShuffleSink<KV<Integer, String>> shuffleSink =
new ShuffleSink<>(
PipelineOptionsFactory.create(),
null,
ShuffleSink.ShuffleKind.GROUP_KEYS,
WindowedValue.getFullCoder(
KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()),
IntervalWindow.getCoder()),
executionContext,
TestOperationContext.create());
TestShuffleWriter shuffleWriter = new TestShuffleWriter();
List<Long> actualSizes = new ArrayList<>();
try (SinkWriter<WindowedValue<KV<Integer, String>>> shuffleSinkWriter =
shuffleSink.writer(shuffleWriter, "dataset")) {
for (KV<Integer, String> kv : expected) {
actualSizes.add(
shuffleSinkWriter.add(
WindowedValue.of(
KV.of(kv.getKey(), kv.getValue()),
timestamp,
Lists.newArrayList(window),
PaneInfo.NO_FIRING)));
}
}
List<ShuffleEntry> records = shuffleWriter.getRecords();
List<KV<Integer, String>> actual = new ArrayList<>();
for (ShuffleEntry record : records) {
byte[] keyBytes = record.getKey();
byte[] valueBytes = record.getValue();
Assert.assertEquals(
timestamp, CoderUtils.decodeFromByteArray(InstantCoder.of(), record.getSecondaryKey()));
Integer key = CoderUtils.decodeFromByteArray(BigEndianIntegerCoder.of(), keyBytes);
String valueElem = CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), valueBytes);
actual.add(KV.of(key, valueElem));
}
Assert.assertEquals(expected, actual);
Assert.assertEquals(shuffleWriter.getSizes(), actualSizes);
}
void runTestWriteGroupingSortingShuffleSink(List<KV<Integer, KV<String, Integer>>> expected)
throws Exception {
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "STAGE");
ShuffleSink<KV<Integer, KV<String, Integer>>> shuffleSink =
new ShuffleSink<>(
PipelineOptionsFactory.create(),
null,
ShuffleSink.ShuffleKind.GROUP_KEYS_AND_SORT_VALUES,
WindowedValue.getFullCoder(
KvCoder.of(
BigEndianIntegerCoder.of(),
KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())),
new GlobalWindows().windowCoder()),
executionContext,
TestOperationContext.create());
TestShuffleWriter shuffleWriter = new TestShuffleWriter();
List<Long> actualSizes = new ArrayList<>();
try (Sink.SinkWriter<WindowedValue<KV<Integer, KV<String, Integer>>>> shuffleSinkWriter =
shuffleSink.writer(shuffleWriter, "dataset")) {
for (KV<Integer, KV<String, Integer>> kv : expected) {
actualSizes.add(shuffleSinkWriter.add(WindowedValue.valueInGlobalWindow(kv)));
}
}
List<ShuffleEntry> records = shuffleWriter.getRecords();
List<KV<Integer, KV<String, Integer>>> actual = new ArrayList<>();
for (ShuffleEntry record : records) {
byte[] keyBytes = record.getKey();
byte[] valueBytes = record.getValue();
byte[] sortKeyBytes = record.getSecondaryKey();
Integer key = CoderUtils.decodeFromByteArray(BigEndianIntegerCoder.of(), keyBytes);
ByteArrayInputStream bais = new ByteArrayInputStream(sortKeyBytes);
String sortKey = StringUtf8Coder.of().decode(bais);
Integer sortValue = CoderUtils.decodeFromByteArray(BigEndianIntegerCoder.of(), valueBytes);
actual.add(KV.of(key, KV.of(sortKey, sortValue)));
}
Assert.assertEquals(expected, actual);
Assert.assertEquals(shuffleWriter.getSizes(), actualSizes);
}
@Test
public void testWriteEmptyUngroupingShuffleSink() throws Exception {
runTestWriteUngroupingShuffleSink(TestUtils.NO_INTS);
}
@Test
public void testWriteNonEmptyUngroupingShuffleSink() throws Exception {
runTestWriteUngroupingShuffleSink(TestUtils.INTS);
}
@Test
public void testWriteEmptyGroupingShuffleSink() throws Exception {
runTestWriteGroupingShuffleSink(NO_KVS);
}
@Test
public void testWriteNonEmptyGroupingShuffleSink() throws Exception {
runTestWriteGroupingShuffleSink(KVS);
}
@Test
public void testWriteEmptyGroupingSortingShuffleSink() throws Exception {
runTestWriteGroupingSortingShuffleSink(NO_SORTING_KVS);
}
@Test
public void testWriteNonEmptyGroupingSortingShuffleSink() throws Exception {
runTestWriteGroupingSortingShuffleSink(SORTING_KVS);
}
}