blob: a8b48e844b621b953d87df090e13b300bb394e7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.stream.DataStreams.BlockingQueueIterator;
import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link DataStreams}. */
@RunWith(Enclosed.class)
public class DataStreamsTest {
/** Tests for {@link DataStreams.BlockingQueueIterator}. */
@RunWith(JUnit4.class)
public static class BlockingQueueIteratorTest {
@Test(timeout = 10_000)
public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
BlockingQueueIterator<String> iterator =
new BlockingQueueIterator<>(new ArrayBlockingQueue<String>(3));
iterator.accept("A");
iterator.accept("B");
iterator.close();
assertEquals(
Arrays.asList("A", "B"), Arrays.asList(Iterators.toArray(iterator, String.class)));
}
@Test(timeout = 10_000)
public void testBlockingQueueIteratorWithBlocking() throws Exception {
// The synchronous queue only allows for one element to transfer at a time and blocks
// the sending/receiving parties until both parties are there.
final BlockingQueueIterator<String> iterator =
new BlockingQueueIterator<>(new SynchronousQueue<String>());
final SettableFuture<List<String>> valuesFuture = SettableFuture.create();
Thread appender =
new Thread(
() -> valuesFuture.set(Arrays.asList(Iterators.toArray(iterator, String.class))));
appender.start();
iterator.accept("A");
iterator.accept("B");
iterator.close();
assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
appender.join();
}
}
/** Tests for {@link DataStreams.DataStreamDecoder}. */
@RunWith(JUnit4.class)
public static class DataStreamDecoderTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testEmptyInputStream() throws Exception {
testDecoderWith(StringUtf8Coder.of());
}
@Test
public void testNonEmptyInputStream() throws Exception {
testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
}
@Test
public void testNonEmptyInputStreamWithZeroLengthEncoding() throws Exception {
CountingOutputStream countingOutputStream =
new CountingOutputStream(ByteStreams.nullOutputStream());
GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
assumeTrue(countingOutputStream.getCount() == 0);
testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
}
@Test
public void testPrefetch() throws Exception {
List<ByteString> encodings = new ArrayList<>();
{
ByteString.Output encoding = ByteString.newOutput();
StringUtf8Coder.of().encode("A", encoding);
StringUtf8Coder.of().encode("BC", encoding);
encodings.add(encoding.toByteString());
}
encodings.add(ByteString.EMPTY);
{
ByteString.Output encoding = ByteString.newOutput();
StringUtf8Coder.of().encode("DEF", encoding);
StringUtf8Coder.of().encode("GHIJ", encoding);
encodings.add(encoding.toByteString());
}
PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<ByteString> iterator =
new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(encodings.iterator());
PrefetchableIterator<String> decoder =
new DataStreamDecoder<>(StringUtf8Coder.of(), iterator);
assertFalse(decoder.isReady());
decoder.prefetch();
assertTrue(decoder.isReady());
assertEquals(1, iterator.getNumPrefetchCalls());
decoder.next();
// Now we will have moved off of the empty byte array that we start with so prefetch will
// do nothing since we are ready
assertTrue(decoder.isReady());
decoder.prefetch();
assertEquals(1, iterator.getNumPrefetchCalls());
decoder.next();
// Now we are at the end of the first ByteString so we expect a prefetch to pass through
assertFalse(decoder.isReady());
decoder.prefetch();
assertEquals(2, iterator.getNumPrefetchCalls());
// We also expect the decoder to not be ready since the next byte string is empty which
// would require us to move to the next page. This typically wouldn't happen in practice
// though because we expect non empty pages.
assertFalse(decoder.isReady());
// Prefetching will allow us to move to the third ByteString
decoder.prefetch();
assertEquals(3, iterator.getNumPrefetchCalls());
assertTrue(decoder.isReady());
}
private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
ByteString.Output output = ByteString.newOutput();
for (T value : expected) {
int size = output.size();
coder.encode(value, output);
// Pad an arbitrary byte when values encode to zero bytes
if (output.size() - size == 0) {
output.write(0);
}
}
testDecoderWith(coder, expected, Arrays.asList(output.toByteString()));
testDecoderWith(coder, expected, Arrays.asList(ByteString.EMPTY, output.toByteString()));
testDecoderWith(coder, expected, Arrays.asList(output.toByteString(), ByteString.EMPTY));
}
private <T> void testDecoderWith(Coder<T> coder, T[] expected, List<ByteString> encoded) {
Iterator<T> decoder =
new DataStreamDecoder<>(
coder, PrefetchableIterators.maybePrefetchable(encoded.iterator()));
Object[] actual = Iterators.toArray(decoder, Object.class);
assertArrayEquals(expected, actual);
// Ensure that we are consistent on hasNext at end of stream
assertFalse(decoder.hasNext());
assertFalse(decoder.hasNext());
thrown.expect(NoSuchElementException.class);
decoder.next();
}
}
/** Tests for {@link ElementDelimitedOutputStream delimited streams}. */
@RunWith(JUnit4.class)
public static class ElementDelimitedOutputStreamTest {
@Test
public void testNothingWritten() throws Exception {
List<ByteString> output = new ArrayList<>();
ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
outputStream.close();
assertThat(output, hasSize(0));
}
@Test
public void testEmptyElementsArePadded() throws Exception {
List<ByteString> output = new ArrayList<>();
ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
outputStream.delimitElement();
outputStream.delimitElement();
outputStream.delimitElement();
outputStream.delimitElement();
outputStream.delimitElement();
outputStream.close();
assertThat(
output, contains(ByteString.copyFrom(new byte[3]), ByteString.copyFrom(new byte[2])));
}
@Test
public void testNonEmptyElementsAreChunked() throws Exception {
List<ByteString> output = new ArrayList<>();
ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
outputStream.write(new byte[] {0x01, 0x02});
outputStream.delimitElement();
outputStream.write(new byte[] {0x03, 0x04, 0x05, 0x06, 0x07, 0x08});
outputStream.delimitElement();
outputStream.write(0x09);
outputStream.delimitElement();
outputStream.close();
assertThat(
output,
contains(
ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03}),
ByteString.copyFrom(new byte[] {0x04, 0x05, 0x06}),
ByteString.copyFrom(new byte[] {0x07, 0x08, 0x09})));
}
}
}