blob: 387d2d3329f8d068e02934e7f463e56ead7d5973 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockSerializationSchema;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** Suite of {@link FlinkKinesisProducer} tests. */
public class FlinkKinesisProducerTest extends TestLogger {
@Rule public ExpectedException exception = ExpectedException.none();
// ----------------------------------------------------------------------
// Tests to verify serializability
// ----------------------------------------------------------------------
@Test
public void testCreateWithNonSerializableDeserializerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided serialization schema is not serializable");
new FlinkKinesisProducer<>(
new NonSerializableSerializationSchema(), TestUtils.getStandardProperties());
}
@Test
public void testCreateWithSerializableDeserializer() {
new FlinkKinesisProducer<>(
new SerializableSerializationSchema(), TestUtils.getStandardProperties());
}
@Test
public void testConfigureWithNonSerializableCustomPartitionerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided custom partitioner is not serializable");
new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
.setCustomPartitioner(new NonSerializableCustomPartitioner());
}
@Test
public void testConfigureWithSerializableCustomPartitioner() {
new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
.setCustomPartitioner(new SerializableCustomPartitioner());
}
@Test
public void testProducerIsSerializable() {
FlinkKinesisProducer<String> producer =
new FlinkKinesisProducer<>(
new SimpleStringSchema(), TestUtils.getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(producer));
}
// ----------------------------------------------------------------------
// Tests to verify at-least-once guarantee
// ----------------------------------------------------------------------
/**
* Test ensuring that if an invoke call happens right after an async exception is caught, it
* should be rethrown.
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKinesisProducer<String> producer =
new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
producer.getPendingRecordFutures()
.get(0)
.setException(new Exception("artificial async exception"));
try {
testHarness.processElement(new StreamRecord<>("msg-2"));
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(
ExceptionUtils.findThrowableWithMessage(e, "artificial async exception")
.isPresent());
// test succeeded
return;
}
Assert.fail();
}
/**
* Test ensuring that if a snapshot call happens right after an async exception is caught, it
* should be rethrown.
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKinesisProducer<String> producer =
new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
producer.getPendingRecordFutures()
.get(0)
.setException(new Exception("artificial async exception"));
try {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next checkpoint should rethrow the async exception
Assert.assertTrue(
ExceptionUtils.findThrowableWithMessage(e, "artificial async exception")
.isPresent());
// test succeeded
return;
}
Assert.fail();
}
/**
* Test ensuring that if an async exception is caught for one of the flushed requests on
* checkpoint, it should be rethrown; we set a timeout because the test will not finish if the
* logic is broken.
*
* <p>Note that this test does not test the snapshot method is blocked correctly when there are
* pending records. The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test(timeout = 10000)
public void testAsyncErrorRethrownAfterFlush() throws Throwable {
final DummyFlinkKinesisProducer<String> producer =
new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
// only let the first record succeed for now
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
producer.getPendingRecordFutures().get(0).set(result);
CheckedThread snapshotThread =
new CheckedThread() {
@Override
public void go() throws Exception {
// this should block at first, since there are still two pending records
// that needs to be flushed
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// let the 2nd message fail with an async exception
producer.getPendingRecordFutures()
.get(1)
.setException(new Exception("artificial async failure for 2nd message"));
producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
try {
snapshotThread.sync();
} catch (Exception e) {
// after the flush, the async exception should have been rethrown
Assert.assertTrue(
ExceptionUtils.findThrowableWithMessage(
e, "artificial async failure for 2nd message")
.isPresent());
// test succeeded
return;
}
Assert.fail();
}
/**
* Test ensuring that the producer is not dropping buffered records; we set a timeout because
* the test will not finish if the logic is broken.
*/
@SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKinesisProducer<String> producer =
new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
// start a thread to perform checkpointing
CheckedThread snapshotThread =
new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until all records are flushed;
// if the snapshot implementation returns before pending records are
// flushed,
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// before proceeding, make sure that flushing has started and that the snapshot is still
// blocked;
// this would block forever if the snapshot didn't perform a flush
producer.waitUntilFlushStarted();
Assert.assertTrue(
"Snapshot returned before all records were flushed", snapshotThread.isAlive());
// now, complete the callbacks
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
producer.getPendingRecordFutures().get(0).set(result);
Assert.assertTrue(
"Snapshot returned before all records were flushed", snapshotThread.isAlive());
producer.getPendingRecordFutures().get(1).set(result);
Assert.assertTrue(
"Snapshot returned before all records were flushed", snapshotThread.isAlive());
producer.getPendingRecordFutures().get(2).set(result);
// this would fail with an exception if flushing wasn't completed before the snapshot method
// returned
snapshotThread.sync();
testHarness.close();
}
/**
* Test ensuring that the producer blocks if the queue limit is exceeded, until the queue length
* drops below the limit; we set a timeout because the test will not finish if the logic is
* broken.
*/
@Test(timeout = 10000)
public void testBackpressure() throws Throwable {
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
final DummyFlinkKinesisProducer<String> producer =
new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
producer.setQueueLimit(1);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
CheckedThread msg1 =
new CheckedThread() {
@Override
public void go() throws Exception {
testHarness.processElement(new StreamRecord<>("msg-1"));
}
};
msg1.start();
msg1.trySync(deadline.timeLeftIfAny().toMillis());
assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
// consume msg-1 so that queue is empty again
producer.getPendingRecordFutures().get(0).set(result);
CheckedThread msg2 =
new CheckedThread() {
@Override
public void go() throws Exception {
testHarness.processElement(new StreamRecord<>("msg-2"));
}
};
msg2.start();
msg2.trySync(deadline.timeLeftIfAny().toMillis());
assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
CheckedThread moreElementsThread =
new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until msg-2 is consumed
testHarness.processElement(new StreamRecord<>("msg-3"));
// this should block until msg-3 is consumed
testHarness.processElement(new StreamRecord<>("msg-4"));
}
};
moreElementsThread.start();
assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
// consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
while (producer.getPendingRecordFutures().size() < 2) {
Thread.sleep(50);
}
producer.getPendingRecordFutures().get(1).set(result);
assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
// consume msg-3, blocked msg-4 can be inserted into the queue and block is released
while (producer.getPendingRecordFutures().size() < 3) {
Thread.sleep(50);
}
producer.getPendingRecordFutures().get(2).set(result);
moreElementsThread.trySync(deadline.timeLeftIfAny().toMillis());
assertFalse(
"Prodcuer still blocks although the queue is flushed",
moreElementsThread.isAlive());
producer.getPendingRecordFutures().get(3).set(result);
testHarness.close();
}
@Test
public void testOpen() throws Exception {
MockSerializationSchema<Object> serializationSchema = new MockSerializationSchema<>();
Properties config = TestUtils.getStandardProperties();
FlinkKinesisProducer<Object> producer =
new FlinkKinesisProducer<>(serializationSchema, config);
AbstractStreamOperatorTestHarness<Object> testHarness =
new AbstractStreamOperatorTestHarness<>(new StreamSink<>(producer), 1, 1, 0);
testHarness.open();
assertThat("Open method was not called", serializationSchema.isOpenCalled(), is(true));
}
// ----------------------------------------------------------------------
// Utility test classes
// ----------------------------------------------------------------------
/**
* A non-serializable {@link KinesisSerializationSchema} (because it is a nested class with
* reference to the enclosing class, which is not serializable) used for testing.
*/
private final class NonSerializableSerializationSchema
implements KinesisSerializationSchema<String> {
private static final long serialVersionUID = 3361337188490178780L;
@Override
public ByteBuffer serialize(String element) {
return ByteBuffer.wrap(element.getBytes());
}
@Override
public String getTargetStream(String element) {
return "test-stream";
}
}
/** A static, serializable {@link KinesisSerializationSchema}. */
private static final class SerializableSerializationSchema
implements KinesisSerializationSchema<String> {
private static final long serialVersionUID = 6298573834520052886L;
@Override
public ByteBuffer serialize(String element) {
return ByteBuffer.wrap(element.getBytes());
}
@Override
public String getTargetStream(String element) {
return "test-stream";
}
}
/**
* A non-serializable {@link KinesisPartitioner} (because it is a nested class with reference to
* the enclosing class, which is not serializable) used for testing.
*/
private final class NonSerializableCustomPartitioner extends KinesisPartitioner<String> {
private static final long serialVersionUID = -5961578876056779161L;
@Override
public String getPartitionId(String element) {
return "test-partition";
}
}
/** A static, serializable {@link KinesisPartitioner}. */
private static final class SerializableCustomPartitioner extends KinesisPartitioner<String> {
private static final long serialVersionUID = -4996071893997035695L;
@Override
public String getPartitionId(String element) {
return "test-partition";
}
}
private static class DummyFlinkKinesisProducer<T> extends FlinkKinesisProducer<T> {
private static final long serialVersionUID = -1212425318784651817L;
private static final String DUMMY_STREAM = "dummy-stream";
private static final String DUMMY_PARTITION = "dummy-partition";
private transient KinesisProducer mockProducer;
private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
private transient MultiShotLatch flushLatch;
DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
super(schema, TestUtils.getStandardProperties());
setDefaultStream(DUMMY_STREAM);
setDefaultPartition(DUMMY_PARTITION);
setFailOnError(true);
// set up mock producer
this.mockProducer = mock(KinesisProducer.class);
when(mockProducer.addUserRecord(
anyString(),
anyString(),
nullable(String.class),
any(ByteBuffer.class)))
.thenAnswer(
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
SettableFuture<UserRecordResult> future =
SettableFuture.create();
pendingRecordFutures.add(future);
return future;
}
});
when(mockProducer.getOutstandingRecordsCount())
.thenAnswer(
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
return getNumPendingRecordFutures();
}
});
doAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
flushLatch.trigger();
return null;
}
})
.when(mockProducer)
.flush();
this.flushLatch = new MultiShotLatch();
}
@Override
protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
return mockProducer;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
super.snapshotState(context);
// if the snapshot implementation doesn't wait until all pending records are flushed, we
// should fail the test
if (mockProducer.getOutstandingRecordsCount() > 0) {
throw new RuntimeException(
"Flushing is enabled; snapshots should be blocked until all pending records are flushed");
}
}
List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
return pendingRecordFutures;
}
void waitUntilFlushStarted() throws Exception {
flushLatch.await();
}
private int getNumPendingRecordFutures() {
int numPending = 0;
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
numPending++;
}
}
return numPending;
}
}
}