blob: 9939becc9c1fa3751bfb66a65100aaf24358c556 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.TransactionContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.test.util.ConcurrencyUtils;
import org.apache.kafka.connect.test.util.MockitoUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.OngoingStubbing;
import org.mockito.verification.VerificationMode;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_INTERVAL_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class ExactlyOnceWorkerSourceTaskTest {
private static final String TOPIC = "topic";
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, ?> OFFSET = offset(12);
// Connect-format data
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
private static final Integer KEY = -1;
private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
private static final Long VALUE_1 = 12L;
private static final Long VALUE_2 = 13L;
// Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place.
private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config;
private SourceConnectorConfig sourceConfig;
private Plugins plugins;
private MockConnectMetrics metrics;
@Mock private ErrorHandlingMetrics errorHandlingMetrics;
private Time time;
private ExactlyOnceWorkerSourceTask workerTask;
@Mock private SourceTask sourceTask;
@Mock private Converter keyConverter;
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
@Mock private Producer<byte[], byte[]> producer;
@Mock private TopicAdmin admin;
@Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
@Mock private ClusterConfigState clusterConfigState;
@Mock private TaskStatus.Listener statusListener;
@Mock private StatusBackingStore statusBackingStore;
@Mock private ConnectorOffsetBackingStore offsetStore;
@Mock private Runnable preProducerCheck;
@Mock private Runnable postProducerCheck;
@Rule public MockitoRule rule = MockitoJUnit.rule();
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
}
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private static final SourceRecord SOURCE_RECORD_1 =
new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
private static final SourceRecord SOURCE_RECORD_2 =
new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_2);
private static final List<SourceRecord> RECORDS = Arrays.asList(SOURCE_RECORD_1, SOURCE_RECORD_2);
private final AtomicReference<CountDownLatch> pollLatch = new AtomicReference<>(new CountDownLatch(0));
private final AtomicReference<List<SourceRecord>> pollRecords = new AtomicReference<>(RECORDS);
private final boolean enableTopicCreation;
private boolean taskStarted;
private Future<?> workerTaskFuture;
@ParameterizedTest.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public ExactlyOnceWorkerSourceTaskTest(boolean enableTopicCreation) {
this.enableTopicCreation = enableTopicCreation;
this.taskStarted = false;
}
@Before
public void setup() throws Exception {
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorProps(), true);
metrics = new MockConnectMetrics();
time = Time.SYSTEM;
when(offsetStore.primaryOffsetsTopic()).thenReturn("offsets-topic");
when(sourceTask.poll()).thenAnswer(invocation -> {
// Capture the records we'll return before decrementing the poll latch,
// since test cases may mutate the poll records field immediately
// after awaiting the latch
List<SourceRecord> result = pollRecords.get();
pollLatch.get().countDown();
Thread.sleep(10);
return result;
});
}
@After
public void teardown() throws Exception {
// In most tests, we don't really care about how many times the task got polled,
// how many times we prepared to write offsets, etc.
// We add these verify(..., anyTimes())... calls to make sure that those interactions
// with our mocks don't cause the verifyNoMoreInteractions(...) call to fail.
// Individual tests can add more fine-grained verification logic if necessary; the
// verifications here will not conflict with them
verify(sourceTask, MockitoUtils.anyTimes()).poll();
verify(sourceTask, MockitoUtils.anyTimes()).commit();
verify(sourceTask, MockitoUtils.anyTimes()).commitRecord(any(), any());
verify(offsetWriter, MockitoUtils.anyTimes()).offset(PARTITION, OFFSET);
verify(offsetWriter, MockitoUtils.anyTimes()).beginFlush();
verify(offsetWriter, MockitoUtils.anyTimes()).doFlush(any());
if (enableTopicCreation) {
verify(admin, MockitoUtils.anyTimes()).describeTopics(TOPIC);
}
verify(statusBackingStore, MockitoUtils.anyTimes()).getTopic(any(), any());
verify(offsetStore, MockitoUtils.anyTimes()).primaryOffsetsTopic();
verifyNoMoreInteractions(statusListener, producer, sourceTask, admin, offsetWriter, statusBackingStore, offsetStore, preProducerCheck, postProducerCheck);
if (metrics != null) metrics.stop();
}
private Map<String, String> workerProps() {
Map<String, String> props = new HashMap<>();
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put("offset.storage.file.filename", "/tmp/connect.offsets");
props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
return props;
}
private Map<String, String> sourceConnectorProps() {
return sourceConnectorProps(SourceTask.TransactionBoundary.DEFAULT);
}
private Map<String, String> sourceConnectorProps(SourceTask.TransactionBoundary transactionBoundary) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put("name", "foo-connector");
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(1));
props.put(TOPIC_CONFIG, TOPIC);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(TRANSACTION_BOUNDARY_CONFIG, transactionBoundary.toString());
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, TOPIC);
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, TOPIC);
return props;
}
private static Map<String, ?> offset(int n) {
return Collections.singletonMap("key", n);
}
private void createWorkerTask() {
createWorkerTask(TargetState.STARTED);
}
private void createWorkerTask(TargetState initialState) {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
}
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore,
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
}
@Test
public void testRemoveMetrics() {
createWorkerTask();
workerTask.removeMetrics();
assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
}
private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
return metricNames
.stream()
.filter(m -> metrics.registry().taskGroupName().equals(m.group())
|| metrics.registry().sourceTaskGroupName().equals(m.group()))
.collect(Collectors.toSet());
}
@Test
public void testStartPaused() throws Exception {
createWorkerTask(TargetState.PAUSED);
final CountDownLatch pauseLatch = new CountDownLatch(1);
doAnswer(invocation -> {
pauseLatch.countDown();
return null;
}).when(statusListener).onPause(eq(taskId));
startTaskThread();
assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
awaitShutdown();
verify(statusListener).onPause(taskId);
verify(sourceTask, never()).start(any());
verify(sourceTask, never()).poll();
verifyCleanShutdown();
assertPollMetrics(0);
}
@Test
public void testPause() throws Exception {
createWorkerTask();
expectSuccessfulSends();
expectSuccessfulFlushes();
final CountDownLatch pauseLatch = new CountDownLatch(1);
doAnswer(invocation -> {
pauseLatch.countDown();
return null;
}).when(statusListener).onPause(eq(taskId));
startTaskThread();
awaitPolls(2);
workerTask.transitionTo(TargetState.PAUSED);
long pollsBeforePause = pollCount();
ConcurrencyUtils.awaitLatch(pauseLatch, "task did not pause in time");
// since the transition is observed asynchronously, the count could be off by one loop iteration
assertTrue(pollCount() - pollsBeforePause <= 1);
awaitShutdown();
verify(statusListener).onPause(taskId);
// Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit
verifyTransactions(pollCount() + 2, pollCount() + 2);
verifySends();
verifyPossibleTopicCreation();
// make sure we didn't poll again after triggering shutdown
assertTrue(pollCount() - pollsBeforePause <= 1);
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
}
@Test
public void testFailureInPreProducerCheck() throws Exception {
createWorkerTask();
Exception exception = new ConnectException("Failed to perform zombie fencing");
doThrow(exception).when(preProducerCheck).run();
workerTask.initialize(TASK_CONFIG);
// No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
workerTask.run();
verify(preProducerCheck).run();
verifyShutdown(true, false);
}
@Test
public void testFailureInProducerInitialization() throws Exception {
createWorkerTask();
Exception exception = new ConnectException("You can't do that!");
doThrow(exception).when(producer).initTransactions();
workerTask.initialize(TASK_CONFIG);
// No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
workerTask.run();
verify(preProducerCheck).run();
verify(producer).initTransactions();
verifyShutdown(true, false);
}
@Test
public void testFailureInPostProducerCheck() throws Exception {
createWorkerTask();
Exception exception = new ConnectException("New task configs for the connector have already been generated");
doThrow(exception).when(postProducerCheck).run();
workerTask.initialize(TASK_CONFIG);
// No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
workerTask.run();
verify(preProducerCheck).run();
verify(producer).initTransactions();
verify(postProducerCheck).run();
verifyShutdown(true, false);
}
@Test
public void testFailureInOffsetStoreStart() throws Exception {
createWorkerTask();
Exception exception = new ConnectException("No soup for you!");
doThrow(exception).when(offsetStore).start();
workerTask.initialize(TASK_CONFIG);
// No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
workerTask.run();
verify(preProducerCheck).run();
verify(producer).initTransactions();
verify(postProducerCheck).run();
verify(offsetStore).start();
verifyShutdown(true, false);
}
@Test
public void testPollsInBackground() throws Exception {
createWorkerTask();
expectSuccessfulSends();
expectSuccessfulFlushes();
startTaskThread();
int minPolls = 10;
awaitPolls(minPolls);
awaitShutdown();
// Task should have flushed offsets for every record poll and for end-of-life offset commit
verifyTransactions(pollCount() + 1, pollCount() + 1);
verifySends();
verifyPossibleTopicCreation();
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
assertPollMetrics(minPolls);
assertTransactionMetrics(RECORDS.size());
}
@Test
public void testFailureInPoll() throws Exception {
createWorkerTask();
final CountDownLatch pollLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
when(sourceTask.poll()).thenAnswer(invocation -> {
pollLatch.countDown();
throw exception;
});
startTaskThread();
ConcurrencyUtils.awaitLatch(pollLatch, "task was not polled in time");
//Failure in poll should trigger automatic stop of the worker
awaitShutdown(false);
verifyPreflight();
verifyStartup();
verifyShutdown(true, false);
assertPollMetrics(0);
}
@Test
public void testFailureInPollAfterCancel() throws Exception {
createWorkerTask();
final CountDownLatch pollLatch = new CountDownLatch(1);
final CountDownLatch workerCancelLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
when(sourceTask.poll()).thenAnswer(invocation -> {
pollLatch.countDown();
ConcurrencyUtils.awaitLatch(workerCancelLatch, "task was not cancelled in time");
throw exception;
});
startTaskThread();
ConcurrencyUtils.awaitLatch(pollLatch, "task was not polled in time");
workerTask.cancel();
workerCancelLatch.countDown();
awaitShutdown(false);
verifyPreflight();
verifyStartup();
verifyShutdown(false, true);
assertPollMetrics(0);
}
@Test
public void testFailureInPollAfterStop() throws Exception {
createWorkerTask();
final CountDownLatch pollLatch = new CountDownLatch(1);
final CountDownLatch workerStopLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
// We check if there are offsets that need to be committed before shutting down the task
when(sourceTask.poll()).thenAnswer(invocation -> {
pollLatch.countDown();
ConcurrencyUtils.awaitLatch(workerStopLatch, "task was not stopped in time");
throw exception;
});
startTaskThread();
ConcurrencyUtils.awaitLatch(pollLatch, "task was not polled in time");
workerTask.stop();
workerStopLatch.countDown();
awaitShutdown(false);
verifyTransactions(0, 0);
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
assertPollMetrics(0);
}
@Test
public void testPollReturnsNoRecords() throws Exception {
// Test that the task handles an empty list of records
createWorkerTask();
// Make sure the task returns empty batches from poll before we start polling it
pollRecords.set(Collections.emptyList());
when(offsetWriter.beginFlush()).thenReturn(false);
startTaskThread();
awaitEmptyPolls(10);
awaitShutdown();
// task commits for each empty poll, plus the final commit
verifyTransactions(pollCount() + 1, 0);
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
assertPollMetrics(0);
}
@Test
public void testPollBasedCommit() throws Exception {
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.POLL);
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
createWorkerTask();
expectSuccessfulSends();
expectSuccessfulFlushes();
startTaskThread();
awaitPolls(10);
awaitShutdown();
// Task should have flushed offsets for every record poll, and for end-of-life offset commit
verifyTransactions(pollCount() + 1, pollCount() + 1);
verifySends();
verifyPossibleTopicCreation();
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
assertPollMetrics(1);
assertTransactionMetrics(RECORDS.size());
}
@Test
public void testIntervalBasedCommit() throws Exception {
long commitInterval = 618;
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.INTERVAL);
connectorProps.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, Long.toString(commitInterval));
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
time = new MockTime();
createWorkerTask();
expectSuccessfulSends();
expectSuccessfulFlushes();
startTaskThread();
awaitPolls(2);
assertEquals("No flushes should have taken place before offset commit interval has elapsed", 0, flushCount());
time.sleep(commitInterval);
awaitPolls(2);
assertEquals("One flush should have taken place after offset commit interval has elapsed", 1, flushCount());
time.sleep(commitInterval * 2);
awaitPolls(2);
assertEquals("Two flushes should have taken place after offset commit interval has elapsed again", 2, flushCount());
awaitShutdown();
// Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit
verifyTransactions(3, 3);
verifySends();
verifyPossibleTopicCreation();
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
assertPollMetrics(2);
assertTransactionMetrics(RECORDS.size() * 2);
}
@Test
public void testConnectorCommitOnBatch() throws Exception {
testConnectorBasedCommit(TransactionContext::commitTransaction, false);
}
@Test
public void testConnectorCommitOnRecord() throws Exception {
testConnectorBasedCommit(ctx -> ctx.commitTransaction(SOURCE_RECORD_2), false);
}
@Test
public void testConnectorAbortOnBatch() throws Exception {
testConnectorBasedCommit(TransactionContext::abortTransaction, true);
}
@Test
public void testConnectorAbortOnRecord() throws Exception {
testConnectorBasedCommit(ctx -> ctx.abortTransaction(SOURCE_RECORD_2), true);
}
private void testConnectorBasedCommit(Consumer<TransactionContext> requestCommit, boolean abort) throws Exception {
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
createWorkerTask();
expectSuccessfulSends();
expectSuccessfulFlushes();
TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
startTaskThread();
awaitPolls(3);
assertEquals("No flushes should have taken place without connector requesting transaction commit",
0, flushCount());
requestCommit.accept(transactionContext);
awaitPolls(3);
assertEquals("One flush should have taken place after transaction commit/abort was requested",
1, flushCount());
awaitPolls(3);
assertEquals("Only one flush should still have taken place without connector re-requesting commit/abort, even on identical records",
1, flushCount());
awaitShutdown();
assertEquals("Task should have flushed offsets once based on connector-defined boundaries, and skipped final end-of-life offset commit",
1, flushCount());
// We begin a new transaction after connector-requested aborts so that we can still write offsets for the source records that were aborted
verify(producer, times(abort ? 3 : 2)).beginTransaction();
verifySends();
if (abort) {
verify(producer).abortTransaction();
}
verify(producer).commitTransaction();
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
verifyPossibleTopicCreation();
assertPollMetrics(1);
assertTransactionMetrics(abort ? 0 : (3 * RECORDS.size()));
}
@Test
public void testConnectorAbortsEmptyTransaction() throws Exception {
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
createWorkerTask();
expectPossibleTopicCreation();
expectTaskGetTopic();
expectApplyTransformationChain();
expectConvertHeadersAndKeyValue();
TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
startTaskThread();
// Ensure the task produces at least one non-empty batch
awaitPolls(1);
// Start returning empty batches from SourceTask::poll
awaitEmptyPolls(1);
// The task commits the active transaction, which should not be empty and should
// result in a single call to Producer::commitTransaction
transactionContext.commitTransaction();
// Ensure the task produces at least one empty batch after the transaction has been committed
awaitEmptyPolls(1);
// The task aborts the active transaction, which has no records in it and should not trigger a call to
// Producer::abortTransaction
transactionContext.abortTransaction();
// Make sure we go through at least one more poll, to give the framework a chance to handle the
// transaction abort request
awaitEmptyPolls(1);
awaitShutdown(true);
verify(producer, times(1)).beginTransaction();
verify(producer, times(1)).commitTransaction();
verify(producer, atLeast(RECORDS.size())).send(any(), any());
// We never abort transactions in this test!
verify(producer, never()).abortTransaction();
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
verifyPossibleTopicCreation();
}
@Test
public void testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() throws Exception {
// We fail tasks that try to abort and commit a transaction for the same record or same batch
// But we don't fail if they try to commit the last record of a batch and abort the entire batch
// Instead, we give precedence to the record-based operation
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
createWorkerTask();
expectPossibleTopicCreation();
expectTaskGetTopic();
expectApplyTransformationChain();
expectConvertHeadersAndKeyValue();
when(offsetWriter.beginFlush())
.thenReturn(true)
.thenReturn(false);
workerTask.initialize(TASK_CONFIG);
TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
// Request that the batch be aborted
transactionContext.abortTransaction();
// Request that the last record in the batch be committed
transactionContext.commitTransaction(RECORDS.get(RECORDS.size() - 1));
workerTask.toSend = RECORDS;
assertTrue(workerTask.sendRecords());
verifyTransactions(2, 1);
verifySends(RECORDS.size());
// We never abort transactions in this test!
verify(producer, never()).abortTransaction();
verifyPossibleTopicCreation();
}
@Test
public void testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() throws Exception {
// We fail tasks that try to abort and commit a transaction for the same record or same batch
// But we don't fail if they try to abort the last record of a batch and commit the entire batch
// Instead, we give precedence to the record-based operation
Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
createWorkerTask();
expectPossibleTopicCreation();
expectTaskGetTopic();
expectApplyTransformationChain();
expectConvertHeadersAndKeyValue();
when(offsetWriter.beginFlush())
.thenReturn(true)
.thenReturn(false);
workerTask.initialize(TASK_CONFIG);
TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
// Request that the last record in the batch be aborted
transactionContext.abortTransaction(RECORDS.get(RECORDS.size() - 1));
// Request that the batch be committed
transactionContext.commitTransaction();
workerTask.toSend = RECORDS;
assertTrue(workerTask.sendRecords());
verify(offsetWriter, times(2)).beginFlush();
verify(sourceTask, times(2)).commit();
// We open a transaction once for the aborted batch, and once to commit offsets for that batch
verify(producer, times(2)).beginTransaction();
verify(producer, times(1)).commitTransaction();
verify(offsetWriter, times(1)).doFlush(any());
verify(producer, times(1)).abortTransaction();
verifySends(RECORDS.size());
verifyPossibleTopicCreation();
}
@Test
public void testCommitFlushSyncCallbackFailure() throws Exception {
Exception failure = new RecordTooLargeException();
when(offsetWriter.beginFlush()).thenReturn(true);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
Callback<Void> callback = invocation.getArgument(0);
callback.onCompletion(failure, null);
return null;
});
testCommitFailure(failure, false);
}
@Test
public void testCommitFlushAsyncCallbackFailure() throws Exception {
Exception failure = new RecordTooLargeException();
when(offsetWriter.beginFlush()).thenReturn(true);
// doFlush delegates its callback to the producer,
// which delays completing the callback until commitTransaction
AtomicReference<Callback<Void>> callback = new AtomicReference<>();
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
callback.set(invocation.getArgument(0));
return null;
});
doAnswer(invocation -> {
callback.get().onCompletion(failure, null);
return null;
}).when(producer).commitTransaction();
testCommitFailure(failure, true);
}
@Test
public void testCommitTransactionFailure() throws Exception {
Exception failure = new RecordTooLargeException();
when(offsetWriter.beginFlush()).thenReturn(true);
doThrow(failure).when(producer).commitTransaction();
testCommitFailure(failure, true);
}
private void testCommitFailure(Exception commitException, boolean executeCommit) throws Exception {
createWorkerTask();
// Unlike the standard WorkerSourceTask class, this one fails permanently when offset commits don't succeed
final CountDownLatch taskFailure = new CountDownLatch(1);
final AtomicReference<Exception> taskException = new AtomicReference<>();
doAnswer(invocation -> {
taskFailure.countDown();
taskException.set(invocation.getArgument(1));
return null;
}).when(statusListener).onFailure(eq(taskId), any());
expectSuccessfulSends();
startTaskThread();
ConcurrencyUtils.awaitLatch(taskFailure, "task did not fail in time");
awaitShutdown();
assertEquals(commitException, taskException.get().getCause());
verifySends();
verifyPossibleTopicCreation();
verify(producer).beginTransaction();
if (executeCommit) {
verify(producer).commitTransaction();
}
verify(offsetWriter).cancelFlush();
verifyPreflight();
verifyStartup();
verifyShutdown(true, false);
}
@Test
public void testSendRecordsRetries() {
createWorkerTask();
// Differentiate only by Kafka partition so we can reuse conversion expectations
SourceRecord record1 = new SourceRecord(PARTITION, offset(1), TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
SourceRecord record2 = new SourceRecord(PARTITION, offset(2), TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
SourceRecord record3 = new SourceRecord(PARTITION, offset(3), TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
expectPossibleTopicCreation();
expectTaskGetTopic();
expectApplyTransformationChain();
expectConvertHeadersAndKeyValue();
// We're trying to send three records
workerTask.toSend = Arrays.asList(record1, record2, record3);
OngoingStubbing<Future<RecordMetadata>> producerSend = when(producer.send(any(), any()));
// The first one is sent successfully
producerSend = expectSuccessfulSend(producerSend);
// The second one encounters a retriable failure
producerSend = expectSynchronousFailedSend(producerSend, new org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
// All subsequent sends (which should include a retry of the second record and an initial try for the third) succeed
expectSuccessfulSend(producerSend);
assertFalse(workerTask.sendRecords());
assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
verify(producer).beginTransaction();
// When using poll-based transaction boundaries, we do not commit transactions while retrying delivery for a batch
verify(producer, never()).commitTransaction();
verifySends(2);
verifyPossibleTopicCreation();
// Next they all succeed
assertTrue(workerTask.sendRecords());
assertNull(workerTask.toSend);
// After the entire batch is successfully dispatched to the producer, we can finally commit the transaction
verify(producer, times(1)).commitTransaction();
verifySends(4);
verify(offsetWriter).offset(PARTITION, offset(1));
verify(offsetWriter).offset(PARTITION, offset(2));
verify(offsetWriter).offset(PARTITION, offset(3));
}
@Test
public void testSendRecordsProducerSendFailsImmediately() {
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1);
expectPossibleTopicCreation();
expectConvertHeadersAndKeyValue();
expectApplyTransformationChain();
when(producer.send(any(), any()))
.thenThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC)));
workerTask.toSend = Arrays.asList(record1, record2);
assertThrows(ConnectException.class, workerTask::sendRecords);
verify(producer).beginTransaction();
verify(producer).send(any(), any());
verify(offsetWriter, never()).offset(any(), any());
verifyPossibleTopicCreation();
}
@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
final CountDownLatch finishStartupLatch = new CountDownLatch(1);
createWorkerTask();
doAnswer(invocation -> {
startupLatch.countDown();
ConcurrencyUtils.awaitLatch(finishStartupLatch, "task was not allowed to finish startup in time");
return null;
}).when(sourceTask).start(eq(TASK_PROPS));
when(offsetWriter.beginFlush()).thenReturn(false);
startTaskThread();
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
// exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
// cannot be invoked immediately in the thread trying to stop the task.
ConcurrencyUtils.awaitLatch(startupLatch, "task did not start up in time");
workerTask.stop();
finishStartupLatch.countDown();
awaitShutdown(false);
verify(sourceTask, never()).poll();
// task commit called on shutdown
verifyTransactions(1, 0);
verifySends(0);
verifyPreflight();
verifyStartup();
verifyCleanShutdown();
}
@Test
public void testCancel() {
createWorkerTask();
// workerTask said something dumb on twitter
workerTask.cancel();
verify(offsetReader).close();
verify(producer).close(Duration.ZERO);
}
/**
* @return how many times the source task was {@link SourceTask#poll() polled}
*/
private int pollCount() {
return (int) MockitoUtils.countInvocations(sourceTask, "poll");
}
/**
* @return how many times an {@link OffsetStorageWriter#doFlush(Callback) offset flush} took place
*/
private int flushCount() {
return (int) MockitoUtils.countInvocations(offsetWriter, "doFlush", Callback.class);
}
private void awaitPolls(int minimum, List<SourceRecord> records) {
pollRecords.set(records);
pollLatch.set(new CountDownLatch(minimum));
ConcurrencyUtils.awaitLatch(pollLatch.get(), "task was not polled " + minimum + " time(s) quickly enough");
}
private void awaitEmptyPolls(int minimum) throws InterruptedException {
awaitPolls(minimum, Collections.emptyList());
}
private void awaitPolls(int minimum) throws InterruptedException {
awaitPolls(minimum, RECORDS);
}
private void awaitShutdown() throws InterruptedException, ExecutionException {
awaitShutdown(true);
}
private void awaitShutdown(boolean triggerStop) throws InterruptedException, ExecutionException {
if (triggerStop)
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
workerTaskFuture.get();
}
private void startTaskThread() {
workerTask.initialize(TASK_CONFIG);
workerTaskFuture = executor.submit(workerTask);
}
private void expectSuccessfulFlushes() throws InterruptedException, TimeoutException {
when(offsetWriter.beginFlush()).thenReturn(true);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
Callback<Void> cb = invocation.getArgument(0);
cb.onCompletion(null, null);
return null;
});
}
private void expectSuccessfulSends() {
expectConvertHeadersAndKeyValue();
expectApplyTransformationChain();
expectSuccessfulSend(when(producer.send(any(), any())));
expectTaskGetTopic();
expectPossibleTopicCreation();
}
private OngoingStubbing<Future<RecordMetadata>> expectSuccessfulSend(OngoingStubbing<Future<RecordMetadata>> whenSend) {
return whenSend.thenAnswer(invocation -> {
org.apache.kafka.clients.producer.Callback cb = invocation.getArgument(1);
cb.onCompletion(
new RecordMetadata(
new TopicPartition(TOPIC, 0),
0,
0,
0L,
0,
0),
null);
return null;
});
}
private OngoingStubbing<Future<RecordMetadata>> expectSynchronousFailedSend(OngoingStubbing<Future<RecordMetadata>> whenSend, Exception failure) {
return whenSend.thenThrow(failure);
}
private void expectConvertHeadersAndKeyValue() {
Headers headers = new RecordHeaders();
for (Header header : headers) {
when(headerConverter.fromConnectHeader(eq(TOPIC), eq(header.key()), eq(Schema.STRING_SCHEMA), eq(new String(header.value()))))
.thenReturn(header.value());
}
when(keyConverter.fromConnectData(eq(TOPIC), eq(headers), eq(KEY_SCHEMA), eq(KEY)))
.thenReturn(SERIALIZED_KEY);
when(valueConverter.fromConnectData(eq(TOPIC), eq(headers), eq(RECORD_SCHEMA), eq(VALUE_1)))
.thenReturn(SERIALIZED_RECORD);
}
private void expectApplyTransformationChain() {
when(transformationChain.apply(any()))
.thenAnswer(invocation -> invocation.getArgument(0));
}
private void expectTaskGetTopic() {
when(statusBackingStore.getTopic(any(), eq(TOPIC)))
.thenAnswer(invocation -> new TopicStatus(
invocation.getArgument(1),
new ConnectorTaskId(invocation.getArgument(0), 0),
time.milliseconds()
));
}
private void expectPossibleTopicCreation() {
if (config.topicCreationEnable()) {
Set<String> created = Collections.singleton(TOPIC);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse creationResponse = new TopicAdmin.TopicCreationResponse(created, existing);
when(admin.createOrFindTopics(any())).thenReturn(creationResponse);
}
}
private void verifyPreflight() {
verify(preProducerCheck).run();
verify(producer).initTransactions();
verify(postProducerCheck).run();
verify(offsetStore).start();
}
private void verifyStartup() {
verify(sourceTask).initialize(any());
verify(sourceTask).start(TASK_PROPS);
taskStarted = true;
verify(statusListener).onStartup(taskId);
}
private void verifyCleanShutdown() throws Exception {
verifyShutdown(false, false);
}
private void verifyShutdown(boolean taskFailed, boolean cancelled) throws Exception {
if (cancelled) {
verify(producer).close(Duration.ZERO);
verify(producer, times(2)).close(any());
verify(offsetReader, times(2)).close();
} else {
verify(producer).close(any());
verify(offsetReader).close();
}
verify(offsetStore).stop();
verify(admin).close(any());
verify(transformationChain).close();
verify(headerConverter).close();
if (taskStarted) {
verify(sourceTask).stop();
}
if (taskFailed) {
verify(statusListener).onFailure(eq(taskId), any());
} else if (!cancelled) {
verify(statusListener).onShutdown(taskId);
}
}
private void verifyPossibleTopicCreation() {
if (enableTopicCreation) {
verify(admin).createOrFindTopics(any());
} else {
verify(admin, never()).createOrFindTopics(any());
}
}
private void verifySends() {
verifySends(pollCount() * RECORDS.size());
}
private void verifySends(int count) {
verify(producer, times(count)).send(any(), any());
}
private void verifyTransactions(int numTaskCommits, int numProducerCommits) throws InterruptedException {
// these operations happen on every commit opportunity
VerificationMode commitOpportunities = times(numTaskCommits);
verify(offsetWriter, commitOpportunities).beginFlush();
verify(sourceTask, commitOpportunities).commit();
// these operations only happen on non-empty commits
VerificationMode commits = times(numProducerCommits);
verify(producer, commits).beginTransaction();
verify(producer, commits).commitTransaction();
verify(offsetWriter, commits).doFlush(any());
}
private void assertTransactionMetrics(int minimumMaxSizeExpected) {
MetricGroup transactionGroup = workerTask.transactionMetricsGroup().metricGroup();
double actualMin = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-min");
double actualMax = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-max");
double actualAvg = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-avg");
assertTrue(actualMin >= 0);
assertTrue(actualMax >= minimumMaxSizeExpected);
if (actualMax - actualMin <= 0.000001d) {
assertEquals(actualMax, actualAvg, 0.000002d);
} else {
assertTrue("Average transaction size should be greater than minimum transaction size", actualAvg > actualMin);
assertTrue("Average transaction size should be less than maximum transaction size", actualAvg < actualMax);
}
}
private void assertPollMetrics(int minimumPollCountExpected) {
MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate");
double pollTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-total");
if (minimumPollCountExpected > 0) {
assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-max"), 0.000001d);
assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d);
assertTrue(pollRate > 0.0d);
} else {
assertEquals(0.0d, pollRate, 0.0);
}
assertTrue(pollTotal >= minimumPollCountExpected);
double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate");
double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total");
if (minimumPollCountExpected > 0) {
assertTrue(writeRate > 0.0d);
} else {
assertEquals(0.0d, writeRate, 0.0);
}
assertTrue(writeTotal >= minimumPollCountExpected);
double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms");
double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms");
if (minimumPollCountExpected > 0) {
assertTrue(pollBatchTimeMax >= 0.0d);
}
assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
assertEquals(0, activeCount, 0.000001d);
if (minimumPollCountExpected > 0) {
assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
}
}
private abstract static class TestSourceTask extends SourceTask {
}
}