blob: 964f0fe276457fe36b0f65648ea2702afd8054e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.adapter;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createElementMessage;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createEndOfStreamMessage;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createWatermarkMessage;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.expectWrappedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.junit.Test;
/** Tests for {@link BoundedSourceSystem}. */
public class BoundedSourceSystemTest {
private static final SystemStreamPartition DEFAULT_SSP =
new SystemStreamPartition("default-system", "default-system", new Partition(0));
// A reasonable time to wait to get all messages from the bounded source assuming no blocking.
private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
private static final String NULL_STRING = null;
@Test
public void testConsumerStartStop() throws IOException, InterruptedException {
final TestBoundedSource<String> source = TestBoundedSource.<String>createBuilder().build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
assertEquals(
Arrays.asList(
createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(DEFAULT_SSP)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testConsumeOneMessage() throws IOException, InterruptedException {
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder().addElements("test").build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(DEFAULT_SSP)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testAdvanceTimestamp() throws InterruptedException {
final Instant timestamp = Instant.now();
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder()
.addElements("before")
.setTimestamp(timestamp)
.addElements("after")
.build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE),
createElementMessage(DEFAULT_SSP, "1", "after", timestamp),
createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(DEFAULT_SSP)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testConsumeMultipleMessages() throws IOException, InterruptedException {
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder()
.addElements("test", "a", "few", "messages")
.build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE),
createElementMessage(DEFAULT_SSP, "1", "a", BoundedWindow.TIMESTAMP_MIN_VALUE),
createElementMessage(DEFAULT_SSP, "2", "few", BoundedWindow.TIMESTAMP_MIN_VALUE),
createElementMessage(DEFAULT_SSP, "3", "messages", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(DEFAULT_SSP)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testReaderThrowsAtStart() throws Exception {
final IOException exception = new IOException("Expected exception");
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder().addException(exception).build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
expectWrappedException(
exception, () -> consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testReaderThrowsAtAdvance() throws Exception {
final IOException exception = new IOException("Expected exception");
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder()
.addElements("test", "a", "few", "good", "messages", "then", "...")
.addException(exception)
.build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
expectWrappedException(
exception, () -> consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testTimeout() throws Exception {
final CountDownLatch advanceLatch = new CountDownLatch(1);
final TestBoundedSource<String> source =
TestBoundedSource.<String>createBuilder()
.addElements("before")
.addLatch(advanceLatch)
.addElements("after")
.build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source);
consumer.register(DEFAULT_SSP, "0");
consumer.start();
assertEquals(
Collections.singletonList(
createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
advanceLatch.countDown();
assertEquals(
Arrays.asList(
createElementMessage(DEFAULT_SSP, "1", "after", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(DEFAULT_SSP)),
consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
consumer.stop();
}
@Test
public void testSplit() throws IOException, InterruptedException {
final TestBoundedSource.SplittableBuilder<String> builder =
TestBoundedSource.<String>createSplits(3);
builder.forSplit(0).addElements("split-0");
builder.forSplit(1).addElements("split-1");
builder.forSplit(2).addElements("split-2");
final TestBoundedSource<String> source = builder.build();
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source, 3);
consumer.register(ssp(0), NULL_STRING);
consumer.register(ssp(1), NULL_STRING);
consumer.register(ssp(2), NULL_STRING);
consumer.start();
final Set<String> offsets = new HashSet<>();
// check split0
List<IncomingMessageEnvelope> envelopes =
consumeUntilTimeoutOrEos(consumer, ssp(0), DEFAULT_TIMEOUT_MILLIS);
assertEquals(
Arrays.asList(
createElementMessage(
ssp(0), envelopes.get(0).getOffset(), "split-0", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(ssp(0), BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(ssp(0))),
envelopes);
offsets.add(envelopes.get(0).getOffset());
// check split1
envelopes = consumeUntilTimeoutOrEos(consumer, ssp(1), DEFAULT_TIMEOUT_MILLIS);
assertEquals(
Arrays.asList(
createElementMessage(
ssp(1), envelopes.get(0).getOffset(), "split-1", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(ssp(1), BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(ssp(1))),
envelopes);
offsets.add(envelopes.get(0).getOffset());
// check split2
envelopes = consumeUntilTimeoutOrEos(consumer, ssp(2), DEFAULT_TIMEOUT_MILLIS);
assertEquals(
Arrays.asList(
createElementMessage(
ssp(2), envelopes.get(0).getOffset(), "split-2", BoundedWindow.TIMESTAMP_MIN_VALUE),
createWatermarkMessage(ssp(2), BoundedWindow.TIMESTAMP_MAX_VALUE),
createEndOfStreamMessage(ssp(2))),
envelopes);
offsets.add(envelopes.get(0).getOffset());
// check offsets
assertEquals(Sets.newHashSet("0", "1", "2"), offsets);
consumer.stop();
}
private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos(
SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
throws InterruptedException {
assertTrue("Expected timeoutMillis (" + timeoutMillis + ") >= 0", timeoutMillis >= 0);
final List<IncomingMessageEnvelope> accumulator = new ArrayList<>();
final long start = System.currentTimeMillis();
long now = start;
while (timeoutMillis + start >= now) {
accumulator.addAll(pollOnce(consumer, ssp, now - start - timeoutMillis));
if (!accumulator.isEmpty() && accumulator.get(accumulator.size() - 1).isEndOfStream()) {
break;
}
now = System.currentTimeMillis();
}
return accumulator;
}
private static List<IncomingMessageEnvelope> pollOnce(
SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
throws InterruptedException {
final Set<SystemStreamPartition> sspSet = Collections.singleton(ssp);
final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollResult =
consumer.poll(sspSet, timeoutMillis);
assertEquals(sspSet, pollResult.keySet());
assertNotNull(pollResult.get(ssp));
return pollResult.get(ssp);
}
private static <T> BoundedSourceSystem.Consumer<String> createConsumer(
BoundedSource<String> source) {
return createConsumer(source, 1);
}
private static BoundedSourceSystem.Consumer<String> createConsumer(
BoundedSource<String> source, int splitNum) {
SamzaPipelineOptions pipelineOptions = PipelineOptionsFactory.as(SamzaPipelineOptions.class);
pipelineOptions.setMaxSourceParallelism(splitNum);
return new BoundedSourceSystem.Consumer<>(
source, pipelineOptions, new SamzaMetricsContainer(new MetricsRegistryMap()), "test-step");
}
private static SystemStreamPartition ssp(int partition) {
return new SystemStreamPartition("default-system", "default-system", new Partition(partition));
}
}