blob: 044b4a4e3a2ca9c2be373a51a9e809163c1395ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WorkerSinkTask.class)
@PowerMockIgnore("javax.management.*")
public class WorkerSinkTaskTest {
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Connect
private static final String TOPIC = "test";
private static final int PARTITION = 12;
private static final int PARTITION2 = 13;
private static final int PARTITION3 = 14;
private static final long FIRST_OFFSET = 45;
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
private static final int KEY = 12;
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
private static final String VALUE = "VALUE";
private static final byte[] RAW_KEY = "key".getBytes();
private static final byte[] RAW_VALUE = "value".getBytes();
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
private static final Set<TopicPartition> INITIAL_ASSIGNMENT =
new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
private TargetState initialState = TargetState.STARTED;
private MockTime time;
private WorkerSinkTask workerTask;
@Mock
private SinkTask sinkTask;
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
private MockConnectMetrics metrics;
@Mock
private PluginClassLoader pluginLoader;
@Mock
private Converter keyConverter;
@Mock
private Converter valueConverter;
@Mock
private HeaderConverter headerConverter;
@Mock
private TransformationChain<SinkRecord> transformationChain;
@Mock
private TaskStatus.Listener statusListener;
@Mock
private StatusBackingStore statusBackingStore;
@Mock
private KafkaConsumer<byte[], byte[]> consumer;
@Mock
private ErrorHandlingMetrics errorHandlingMetrics;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
private Capture<Pattern> topicsRegex = EasyMock.newCapture();
private long recordsReturnedTp1;
private long recordsReturnedTp3;
@Before
public void setUp() {
time = new MockTime();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerConfig = new StandaloneConfig(workerProps);
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
metrics = new MockConnectMetrics(time);
recordsReturnedTp1 = 0;
recordsReturnedTp3 = 0;
}
private void createTask(TargetState initialState) {
createTask(initialState, keyConverter, valueConverter, headerConverter);
}
private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
transformationChain, consumer, pluginLoader, time,
RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
}
@After
public void tearDown() {
if (metrics != null) metrics.stop();
}
@Test
public void testStartPaused() throws Exception {
createTask(TargetState.PAUSED);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
assertSinkMetricValue("partition-count", 2);
assertTaskMetricValue("status", "paused");
assertTaskMetricValue("running-ratio", 0.0);
assertTaskMetricValue("pause-ratio", 1.0);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
PowerMock.verifyAll();
}
@Test
public void testPause() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
// Pause
statusListener.onPause(taskId);
EasyMock.expectLastCall();
expectConsumerWakeup();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
// Offset commit as requested when pausing; No records returned by consumer.poll()
sinkTask.preCommit(EasyMock.anyObject());
EasyMock.expectLastCall().andStubReturn(Collections.emptyMap());
expectConsumerPoll(0);
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
// And unpause
statusListener.onResume(taskId);
EasyMock.expectLastCall();
expectConsumerWakeup();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
INITIAL_ASSIGNMENT.forEach(tp -> {
consumer.resume(Collections.singleton(tp));
PowerMock.expectLastCall();
});
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // initial assignment
workerTask.iteration(); // fetch some data
workerTask.transitionTo(TargetState.PAUSED);
time.sleep(10000L);
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
workerTask.iteration(); // wakeup
workerTask.iteration(); // now paused
time.sleep(30000L);
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "paused");
assertTaskMetricValue("running-ratio", 0.25);
assertTaskMetricValue("pause-ratio", 0.75);
workerTask.transitionTo(TargetState.STARTED);
workerTask.iteration(); // wakeup
workerTask.iteration(); // now unpaused
//printMetrics();
PowerMock.verifyAll();
}
@Test
public void testShutdown() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// first iteration
expectPollInitialAssignment();
// second iteration
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
// WorkerSinkTask::stop
consumer.wakeup();
PowerMock.expectLastCall();
sinkTask.stop();
PowerMock.expectLastCall();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
// WorkerSinkTask::close
consumer.close();
PowerMock.expectLastCall().andAnswer(() -> {
rebalanceListener.getValue().onPartitionsRevoked(
INITIAL_ASSIGNMENT
);
return null;
});
transformationChain.close();
PowerMock.expectLastCall();
headerConverter.close();
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
sinkTaskContext.getValue().requestCommit(); // Force an offset commit
workerTask.iteration();
workerTask.stop();
workerTask.close();
PowerMock.verifyAll();
}
@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
expectConsumerPoll(1);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Pause
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
// Retry delivery should succeed
expectConsumerPoll(0);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall();
// And unpause
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
INITIAL_ASSIGNMENT.forEach(tp -> {
consumer.resume(singleton(tp));
PowerMock.expectLastCall();
});
// Expect commit
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
// Commit advance by one
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
// Nothing polled for this partition
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
EasyMock.expect(sinkTask.preCommit(workerCurrentOffsets)).andReturn(workerCurrentOffsets);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(workerCurrentOffsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(EasyMock.eq(Collections.emptyList()));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 0.0);
assertSinkMetricValue("sink-record-send-total", 0.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 0.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.0);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 0.0);
assertTaskMetricValue("batch-size-avg", 0.0);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
workerTask.iteration();
workerTask.iteration();
time.sleep(30000L);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.5);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
sinkTaskContext.getValue().requestCommit();
time.sleep(10000L);
workerTask.iteration();
assertSinkMetricValue("offset-commit-completion-total", 1.0);
PowerMock.verifyAll();
}
@Test
public void testPollRedeliveryWithConsumerRebalance() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Pause
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
// Empty consumer poll (all partitions are paused) with rebalance; one new partition is assigned
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
return ConsumerRecords.empty();
});
Set<TopicPartition> newAssignment = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
EasyMock.expect(consumer.assignment()).andReturn(newAssignment).times(3);
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
sinkTask.open(Collections.singleton(TOPIC_PARTITION3));
EasyMock.expectLastCall();
// All partitions are re-paused in order to pause any newly-assigned partitions so that redelivery efforts can continue
consumer.pause(newAssignment);
EasyMock.expectLastCall();
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Next delivery attempt fails again
expectConsumerPoll(0);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Non-empty consumer poll; all initially-assigned partitions are revoked in rebalance, and new partitions are allowed to resume
ConsumerRecord<byte[], byte[]> newRecord = new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE);
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptyList());
return new ConsumerRecords<>(Collections.singletonMap(TOPIC_PARTITION3, Collections.singletonList(newRecord)));
});
newAssignment = Collections.singleton(TOPIC_PARTITION3);
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(newAssignment)).times(3);
final Map<TopicPartition, OffsetAndMetadata> offsets = INITIAL_ASSIGNMENT.stream()
.collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(FIRST_OFFSET)));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
// All partitions are resumed, as all previously paused-for-redelivery partitions were revoked
newAssignment.forEach(tp -> {
consumer.resume(Collections.singleton(tp));
EasyMock.expectLastCall();
});
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
PowerMock.verifyAll();
}
@Test
public void testErrorInRebalancePartitionLoss() throws Exception {
RuntimeException exception = new RuntimeException("Revocation error");
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectRebalanceLossError(exception);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
try {
workerTask.iteration();
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
}
PowerMock.verifyAll();
}
@Test
public void testErrorInRebalancePartitionRevocation() throws Exception {
RuntimeException exception = new RuntimeException("Revocation error");
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectRebalanceRevocationError(exception);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
try {
workerTask.iteration();
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
}
PowerMock.verifyAll();
}
@Test
public void testErrorInRebalancePartitionAssignment() throws Exception {
RuntimeException exception = new RuntimeException("Assignment error");
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectRebalanceAssignmentError(exception);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
try {
workerTask.iteration();
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
}
PowerMock.verifyAll();
}
@Test
public void testPartialRevocationAndAssignment() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
return ConsumerRecords.empty();
});
EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION)).times(2);
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(Collections.singleton(TOPIC_PARTITION));
EasyMock.expectLastCall();
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
return ConsumerRecords.empty();
});
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
sinkTask.open(Collections.singleton(TOPIC_PARTITION3));
EasyMock.expectLastCall();
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsLost(Collections.singleton(TOPIC_PARTITION3));
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
return ConsumerRecords.empty();
});
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(4);
sinkTask.close(Collections.singleton(TOPIC_PARTITION3));
EasyMock.expectLastCall();
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
sinkTask.open(Collections.singleton(TOPIC_PARTITION));
EasyMock.expectLastCall();
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// First iteration--first call to poll, first consumer assignment
workerTask.iteration();
// Second iteration--second call to poll, partial consumer revocation
workerTask.iteration();
// Third iteration--third call to poll, partial consumer assignment
workerTask.iteration();
// Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned
workerTask.iteration();
PowerMock.verifyAll();
}
@Test
public void testPreCommitFailureAfterPartialRevocationAndAssignment() throws Exception {
createTask(initialState);
// First poll; assignment is [TP1, TP2]
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// Second poll; a single record is delivered from TP1
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
// Third poll; assignment changes to [TP2]
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
return ConsumerRecords.empty();
});
EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION)).times(2);
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
consumer.commitSync(offsets);
EasyMock.expectLastCall();
sinkTask.close(Collections.singleton(TOPIC_PARTITION));
EasyMock.expectLastCall();
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
// Fourth poll; assignment changes to [TP2, TP3]
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
return ConsumerRecords.empty();
});
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
sinkTask.open(Collections.singleton(TOPIC_PARTITION3));
EasyMock.expectLastCall();
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
// Fifth poll; an offset commit takes place
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
workerCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andThrow(new ConnectException("Failed to flush"));
consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
EasyMock.expectLastCall();
consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
EasyMock.expectLastCall();
expectConsumerPoll(0);
sinkTask.put(EasyMock.eq(Collections.emptyList()));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// First iteration--first call to poll, first consumer assignment
workerTask.iteration();
// Second iteration--second call to poll, delivery of one record
workerTask.iteration();
// Third iteration--third call to poll, partial consumer revocation
workerTask.iteration();
// Fourth iteration--fourth call to poll, partial consumer assignment
workerTask.iteration();
// Fifth iteration--task-requested offset commit with failure in SinkTask::preCommit
sinkTaskContext.getValue().requestCommit();
workerTask.iteration();
PowerMock.verifyAll();
}
@Test
public void testWakeupInCommitSyncCausesRetry() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
// first one raises wakeup
consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
EasyMock.expectLastCall().andThrow(new WakeupException());
// we should retry and complete the commit
consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
EasyMock.expectLastCall();
sinkTask.close(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
INITIAL_ASSIGNMENT.forEach(tp -> EasyMock.expect(consumer.position(tp)).andReturn(FIRST_OFFSET));
sinkTask.open(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(5);
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
return ConsumerRecords.empty();
});
INITIAL_ASSIGNMENT.forEach(tp -> {
consumer.resume(Collections.singleton(tp));
EasyMock.expectLastCall();
});
statusListener.onResume(taskId);
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
time.sleep(30000L);
workerTask.initializeAndStart();
time.sleep(30000L);
workerTask.iteration(); // poll for initial assignment
time.sleep(30000L);
workerTask.iteration(); // first record delivered
workerTask.iteration(); // now rebalance with the wakeup triggered
time.sleep(30000L);
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.33333);
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 1.0);
assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 1.0);
PowerMock.verifyAll();
}
@Test
public void testWakeupNotThrownDuringShutdown() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(() -> {
// stop the task during its second iteration
workerTask.stop();
return new ConsumerRecords<>(Collections.emptyMap());
});
consumer.wakeup();
EasyMock.expectLastCall();
sinkTask.put(EasyMock.eq(Collections.emptyList()));
EasyMock.expectLastCall();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(1);
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall();
// fail the first time
consumer.commitSync(EasyMock.eq(offsets));
EasyMock.expectLastCall().andThrow(new WakeupException());
// and succeed the second time
consumer.commitSync(EasyMock.eq(offsets));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.execute();
assertEquals(0, workerTask.commitFailures());
PowerMock.verifyAll();
}
@Test
public void testRequestCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(offsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initial assignment
time.sleep(30000L);
workerTask.iteration();
assertSinkMetricValue("partition-count", 2);
// First record delivered
workerTask.iteration();
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
// Grab the commit time prior to requesting a commit.
// This time should advance slightly after committing.
// KAFKA-8229
final long previousCommitValue = workerTask.getNextCommit();
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
time.sleep(10000L);
workerTask.iteration(); // triggers the commit
time.sleep(10000L);
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
// Assert the next commit time advances slightly, the amount it advances
// is the normal commit time less the two sleeps since it started each
// of those sleeps were 10 seconds.
// KAFKA-8229
assertEquals("Should have only advanced by 40 seconds",
previousCommitValue +
(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT - 10000L * 2),
workerTask.getNextCommit());
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.2);
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.33333);
assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 1.0);
PowerMock.verifyAll();
}
@Test
public void testPreCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// iter 1
expectPollInitialAssignment();
// iter 2
expectConsumerPoll(2);
expectConversionAndTransformation(2);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>();
taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task
taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset
taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>();
committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andReturn(taskOffsets);
// Expect extra invalid topic partition to be filtered, which causes the consumer assignment to be logged
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(committableOffsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
workerTask.iteration(); // iter 2 -- deliver 2 records
assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit
assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
PowerMock.verifyAll();
}
@Test
public void testPreCommitFailure() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
EasyMock.expect(consumer.assignment()).andStubReturn(INITIAL_ASSIGNMENT);
// iter 1
expectPollInitialAssignment();
// iter 2
expectConsumerPoll(2);
expectConversionAndTransformation(2);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
// iter 3
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andThrow(new ConnectException("Failed to flush"));
consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
EasyMock.expectLastCall();
consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
EasyMock.expectLastCall();
expectConsumerPoll(0);
sinkTask.put(EasyMock.eq(Collections.emptyList()));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 2 records
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit
PowerMock.verifyAll();
}
@Test
public void testIgnoredCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// iter 1
expectPollInitialAssignment();
// iter 2
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
// iter 3
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andReturn(workerStartingOffsets);
// no actual consumer.commit() triggered
expectConsumerPoll(0);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
workerTask.iteration(); // iter 2 -- deliver 2 records
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit
PowerMock.verifyAll();
}
// Test that the commitTimeoutMs timestamp is correctly computed and checked in WorkerSinkTask.iteration()
// when there is a long running commit in process. See KAFKA-4942 for more information.
@Test
public void testLongRunningCommitWithoutTimeout() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// iter 1
expectPollInitialAssignment();
// iter 2
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
// iter 3 - note that we return the current offset to indicate they should be committed
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
// We need to delay the result of trying to commit offsets to Kafka via the consumer.commitAsync
// method. We do this so that we can test that we do not erroneously mark a commit as timed out
// while it is still running and under time. To fake this for tests we have the commit run in a
// separate thread and wait for a latch which we control back in the main thread.
final ExecutorService executor = Executors.newSingleThreadExecutor();
final CountDownLatch latch = new CountDownLatch(1);
consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
// Grab the arguments passed to the consumer.commitAsync method
final Object[] args = EasyMock.getCurrentArguments();
@SuppressWarnings("unchecked")
final Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition, OffsetAndMetadata>) args[0];
final OffsetCommitCallback callback = (OffsetCommitCallback) args[1];
executor.execute(() -> {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
callback.onComplete(offsets, null);
});
return null;
});
// no actual consumer.commit() triggered
expectConsumerPoll(0);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
workerTask.iteration(); // iter 2 -- deliver 2 records
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit in progress
// Make sure the "committing" flag didn't immediately get flipped back to false due to an incorrect timeout
assertTrue("Expected worker to be in the process of committing offsets", workerTask.isCommitting());
// Let the async commit finish and wait for it to end
latch.countDown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
PowerMock.verifyAll();
}
@Test
public void testSinkTasksHandleCloseErrors() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andVoid();
// Stop the task during the next put
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andAnswer(() -> {
workerTask.stop();
return null;
});
consumer.wakeup();
PowerMock.expectLastCall();
// Throw another exception while closing the task's assignment
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
.andStubReturn(Collections.emptyMap());
Throwable closeException = new RuntimeException();
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(closeException);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
try {
workerTask.execute();
fail("workerTask.execute should have thrown an exception");
} catch (RuntimeException e) {
PowerMock.verifyAll();
assertSame("Exception from close should propagate as-is", closeException, e);
}
}
@Test
public void testSuppressCloseErrors() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andVoid();
// Throw an exception on the next put to trigger shutdown behavior
// This exception is the true "cause" of the failure
expectConsumerPoll(1);
expectConversionAndTransformation(1);
Throwable putException = new RuntimeException();
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(putException);
// Throw another exception while closing the task's assignment
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
.andStubReturn(Collections.emptyMap());
Throwable closeException = new RuntimeException();
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(closeException);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
try {
workerTask.execute();
fail("workerTask.execute should have thrown an exception");
} catch (ConnectException e) {
PowerMock.verifyAll();
assertSame("Exception from put should be the cause", putException, e.getCause());
assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0);
assertSame(closeException, e.getSuppressed()[0]);
}
}
@Test
public void testTaskCancelPreventsFinalOffsetCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall();
// the second put will return after the task is stopped and cancelled (asynchronously)
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andAnswer(() -> {
workerTask.stop();
workerTask.cancel();
return null;
});
// stop wakes up the consumer
consumer.wakeup();
EasyMock.expectLastCall();
// task performs normal steps in advance of committing offsets
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.execute();
PowerMock.verifyAll();
}
// Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
// See KAFKA-5731 for more information.
@Test
public void testCommitWithOutOfOrderCallback() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// iter 1
expectPollInitialAssignment();
// iter 2
expectConsumerPoll(1);
expectConversionAndTransformation(4);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final List<TopicPartition> originalPartitions = new ArrayList<>(INITIAL_ASSIGNMENT);
final List<TopicPartition> rebalancedPartitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3);
final Map<TopicPartition, OffsetAndMetadata> rebalanceOffsets = new HashMap<>();
rebalanceOffsets.put(TOPIC_PARTITION, workerCurrentOffsets.get(TOPIC_PARTITION));
rebalanceOffsets.put(TOPIC_PARTITION2, workerCurrentOffsets.get(TOPIC_PARTITION2));
rebalanceOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> postRebalanceCurrentOffsets = new HashMap<>();
postRebalanceCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 3));
postRebalanceCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
postRebalanceCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 2));
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(originalPartitions)).times(2);
// iter 3 - note that we return the current offset to indicate they should be committed
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
// We need to delay the result of trying to commit offsets to Kafka via the consumer.commitAsync
// method. We do this so that we can test that the callback is not called until after the rebalance
// changes the lastCommittedOffsets. To fake this for tests we have the commitAsync build a function
// that will call the callback with the appropriate parameters, and we'll run that function later.
final AtomicReference<Runnable> asyncCallbackRunner = new AtomicReference<>();
final AtomicBoolean asyncCallbackRan = new AtomicBoolean();
consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
// Grab the arguments passed to the consumer.commitAsync method
final Object[] args = EasyMock.getCurrentArguments();
@SuppressWarnings("unchecked")
final Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition, OffsetAndMetadata>) args[0];
final OffsetCommitCallback callback = (OffsetCommitCallback) args[1];
asyncCallbackRunner.set(() -> {
callback.onComplete(offsets, null);
asyncCallbackRan.set(true);
});
return null;
});
// Expect the next poll to discover and perform the rebalance, THEN complete the previous callback handler,
// and then return one record for TP1 and one for TP3.
final AtomicBoolean rebalanced = new AtomicBoolean();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
// Rebalance always begins with revoking current partitions ...
rebalanceListener.getValue().onPartitionsRevoked(originalPartitions);
// Respond to the rebalance
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, rebalanceOffsets.get(TOPIC_PARTITION).offset());
offsets.put(TOPIC_PARTITION2, rebalanceOffsets.get(TOPIC_PARTITION2).offset());
offsets.put(TOPIC_PARTITION3, rebalanceOffsets.get(TOPIC_PARTITION3).offset());
sinkTaskContext.getValue().offset(offsets);
rebalanceListener.getValue().onPartitionsAssigned(rebalancedPartitions);
rebalanced.set(true);
// Run the previous async commit handler
asyncCallbackRunner.get().run();
// And prep the two records to return
long timestamp = RecordBatch.NO_TIMESTAMP;
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, timestamp, timestampType,
0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET + recordsReturnedTp3 + 1, timestamp, timestampType,
0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
recordsReturnedTp1 += 1;
recordsReturnedTp3 += 1;
return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records));
});
// onPartitionsRevoked
sinkTask.preCommit(workerCurrentOffsets);
EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
sinkTask.close(new ArrayList<>(workerCurrentOffsets.keySet()));
EasyMock.expectLastCall();
consumer.commitSync(workerCurrentOffsets);
EasyMock.expectLastCall();
// onPartitionsAssigned - step 1
final long offsetTp1 = rebalanceOffsets.get(TOPIC_PARTITION).offset();
final long offsetTp2 = rebalanceOffsets.get(TOPIC_PARTITION2).offset();
final long offsetTp3 = rebalanceOffsets.get(TOPIC_PARTITION3).offset();
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(offsetTp1);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(offsetTp2);
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(offsetTp3);
EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(rebalancedPartitions)).times(6);
// onPartitionsAssigned - step 2
sinkTask.open(EasyMock.eq(rebalancedPartitions));
EasyMock.expectLastCall();
// onPartitionsAssigned - step 3 rewind
consumer.seek(TOPIC_PARTITION, offsetTp1);
EasyMock.expectLastCall();
consumer.seek(TOPIC_PARTITION2, offsetTp2);
EasyMock.expectLastCall();
consumer.seek(TOPIC_PARTITION3, offsetTp3);
EasyMock.expectLastCall();
// iter 4 - note that we return the current offset to indicate they should be committed
sinkTask.preCommit(postRebalanceCurrentOffsets);
EasyMock.expectLastCall().andReturn(postRebalanceCurrentOffsets);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(postRebalanceCurrentOffsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(postRebalanceCurrentOffsets, null);
return null;
});
// no actual consumer.commit() triggered
expectConsumerPoll(1);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
assertEquals(workerStartingOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
assertEquals(workerStartingOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
workerTask.iteration(); // iter 2 -- deliver 2 records
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit in progress
assertSinkMetricValue("partition-count", 3);
assertSinkMetricValue("sink-record-read-total", 3.0);
assertSinkMetricValue("sink-record-send-total", 3.0);
assertSinkMetricValue("sink-record-active-count", 4.0);
assertSinkMetricValue("sink-record-active-count-max", 4.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.71429);
assertSinkMetricValue("offset-commit-seq-no", 2.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 1.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 2.0);
assertTaskMetricValue("batch-size-avg", 1.0);
assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 1.0);
assertTrue(asyncCallbackRan.get());
assertTrue(rebalanced.get());
// Check that the offsets were not reset by the out-of-order async commit callback
assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
assertEquals(rebalanceOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 4 -- commit in progress
// Check that the offsets were not reset by the out-of-order async commit callback
assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
assertSinkMetricValue("partition-count", 3);
assertSinkMetricValue("sink-record-read-total", 4.0);
assertSinkMetricValue("sink-record-send-total", 4.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 4.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.5555555);
assertSinkMetricValue("offset-commit-seq-no", 3.0);
assertSinkMetricValue("offset-commit-completion-total", 2.0);
assertSinkMetricValue("offset-commit-skip-total", 1.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 2.0);
assertTaskMetricValue("batch-size-avg", 1.0);
assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 1.0);
PowerMock.verifyAll();
}
@Test
public void testDeliveryWithMutatingTransform() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1, "newtopic_");
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(offsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // initial assignment
workerTask.iteration(); // first record delivered
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
workerTask.iteration(); // triggers the commit
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
PowerMock.verifyAll();
}
@Test
public void testMissingTimestampPropagation() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
// we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
assertNull(record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
PowerMock.verifyAll();
}
@Test
public void testTimestampPropagation() throws Exception {
final Long timestamp = System.currentTimeMillis();
final TimestampType timestampType = TimestampType.CREATE_TIME;
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, timestamp, timestampType);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
assertEquals(timestamp, record.timestamp());
assertEquals(timestampType, record.timestampType());
PowerMock.verifyAll();
}
@Test
public void testTopicsRegex() {
Map<String, String> props = new HashMap<>(TASK_PROPS);
props.remove("topics");
props.put("topics.regex", "te.*");
TaskConfig taskConfig = new TaskConfig(props);
createTask(TargetState.PAUSED);
consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(props);
PowerMock.expectLastCall();
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(taskConfig);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
PowerMock.verifyAll();
}
@Test
public void testMetricsGroup() {
SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
SinkTaskMetricsGroup group1 = new SinkTaskMetricsGroup(taskId1, metrics);
for (int i = 0; i != 10; ++i) {
group.recordRead(1);
group.recordSend(2);
group.recordPut(3);
group.recordPartitionCount(4);
group.recordOffsetSequenceNumber(5);
}
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
group.recordCommittedOffsets(committedOffsets);
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
consumedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 10));
group.recordConsumedOffsets(consumedOffsets);
for (int i = 0; i != 20; ++i) {
group1.recordRead(1);
group1.recordSend(2);
group1.recordPut(30);
group1.recordPartitionCount(40);
group1.recordOffsetSequenceNumber(50);
}
committedOffsets = new HashMap<>();
committedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 2));
committedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 3));
group1.recordCommittedOffsets(committedOffsets);
consumedOffsets = new HashMap<>();
consumedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 20));
consumedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 30));
group1.recordConsumedOffsets(consumedOffsets);
assertEquals(0.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-read-rate"), 0.001d);
assertEquals(0.667, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-send-rate"), 0.001d);
assertEquals(9, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-active-count"), 0.001d);
assertEquals(4, metrics.currentMetricValueAsDouble(group.metricGroup(), "partition-count"), 0.001d);
assertEquals(5, metrics.currentMetricValueAsDouble(group.metricGroup(), "offset-commit-seq-no"), 0.001d);
assertEquals(3, metrics.currentMetricValueAsDouble(group.metricGroup(), "put-batch-max-time-ms"), 0.001d);
// Close the group
group.close();
for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
// Metrics for this group should no longer exist
assertFalse(group.metricGroup().groupId().includes(metricName));
}
// Sensors for this group should no longer exist
assertNull(group.metricGroup().metrics().getSensor("source-record-poll"));
assertNull(group.metricGroup().metrics().getSensor("source-record-write"));
assertNull(group.metricGroup().metrics().getSensor("poll-batch-time"));
assertEquals(0.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-read-rate"), 0.001d);
assertEquals(1.333, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-send-rate"), 0.001d);
assertEquals(45, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-active-count"), 0.001d);
assertEquals(40, metrics.currentMetricValueAsDouble(group1.metricGroup(), "partition-count"), 0.001d);
assertEquals(50, metrics.currentMetricValueAsDouble(group1.metricGroup(), "offset-commit-seq-no"), 0.001d);
assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
}
@Test
public void testHeaders() throws Exception {
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, headers);
expectConversionAndTransformation(1, null, headers);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
PowerMock.verifyAll();
}
@Test
public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
createTask(initialState, stringConverter, testConverter, stringConverter);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
String keyA = "a";
String valueA = "Árvíztűrő tükörfúrógép";
Headers headersA = new RecordHeaders();
String encodingA = "latin2";
headersA.add("encoding", encodingA.getBytes());
String keyB = "b";
String valueB = "Тестовое сообщение";
Headers headersB = new RecordHeaders();
String encodingB = "koi8_r";
headersB.add("encoding", encodingB.getBytes());
expectConsumerPoll(Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()),
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty())
));
expectTransformation(2, null);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
Iterator<SinkRecord> iterator = records.getValue().iterator();
SinkRecord recordA = iterator.next();
assertEquals(keyA, recordA.key());
assertEquals(valueA, recordA.value());
SinkRecord recordB = iterator.next();
assertEquals(keyB, recordB.key());
assertEquals(valueB, recordB.value());
PowerMock.verifyAll();
}
private void expectInitializeTask() {
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(TASK_PROPS);
PowerMock.expectLastCall();
}
private void expectRebalanceLossError(RuntimeException e) {
sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
EasyMock.expectLastCall().andThrow(e);
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsLost(INITIAL_ASSIGNMENT);
return ConsumerRecords.empty();
});
}
private void expectRebalanceRevocationError(RuntimeException e) {
sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
EasyMock.expectLastCall().andThrow(e);
sinkTask.preCommit(EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(Collections.emptyMap());
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
return ConsumerRecords.empty();
});
}
private void expectRebalanceAssignmentError(RuntimeException e) {
sinkTask.close(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
sinkTask.preCommit(EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(Collections.emptyMap());
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
sinkTask.open(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall().andThrow(e);
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3);
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
return ConsumerRecords.empty();
});
}
private void expectPollInitialAssignment() {
sinkTask.open(INITIAL_ASSIGNMENT);
EasyMock.expectLastCall();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(() -> {
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
return ConsumerRecords.empty();
});
INITIAL_ASSIGNMENT.forEach(tp -> EasyMock.expect(consumer.position(tp)).andReturn(FIRST_OFFSET));
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
}
private void expectConsumerWakeup() {
consumer.wakeup();
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new WakeupException());
}
private void expectConsumerPoll(final int numMessages) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, Headers headers) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers);
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
expectConsumerPoll(numMessages, timestamp, timestampType, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType,
0, 0, RAW_KEY, RAW_VALUE, headers, Optional.empty()));
recordsReturnedTp1 += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
Collections.emptyMap()
);
});
}
private void expectConsumerPoll(List<ConsumerRecord<byte[], byte[]>> records) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> new ConsumerRecords<>(
records.isEmpty() ?
Collections.emptyMap() :
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)
));
}
private void expectConversionAndTransformation(final int numMessages) {
expectConversionAndTransformation(numMessages, null);
}
private void expectConversionAndTransformation(final int numMessages, final String topicPrefix) {
expectConversionAndTransformation(numMessages, topicPrefix, emptyHeaders());
}
private void expectConversionAndTransformation(final int numMessages, final String topicPrefix, final Headers headers) {
EasyMock.expect(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
EasyMock.expect(valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
for (Header header : headers) {
EasyMock.expect(headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).andReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value()))).times(1);
}
expectTransformation(numMessages, topicPrefix);
}
private void expectTransformation(final int numMessages, final String topicPrefix) {
final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)))
.andAnswer(() -> {
SinkRecord origRecord = recordCapture.getValue();
return topicPrefix != null && !topicPrefix.isEmpty()
? origRecord.newRecord(
topicPrefix + origRecord.topic(),
origRecord.kafkaPartition(),
origRecord.keySchema(),
origRecord.key(),
origRecord.valueSchema(),
origRecord.value(),
origRecord.timestamp(),
origRecord.headers()
)
: origRecord;
}).times(numMessages);
}
private void expectTaskGetTopic(boolean anyTimes) {
final Capture<String> connectorCapture = EasyMock.newCapture();
final Capture<String> topicCapture = EasyMock.newCapture();
IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
EasyMock.capture(connectorCapture),
EasyMock.capture(topicCapture)));
if (anyTimes) {
expect.andStubAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
} else {
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
}
if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
assertEquals("job", connectorCapture.getValue());
assertEquals(TOPIC, topicCapture.getValue());
}
}
private void assertSinkMetricValue(String name, double expected) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertTaskMetricValue(String name, double expected) {
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(taskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertTaskMetricValue(String name, String expected) {
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
String measured = metrics.currentMetricValueAsString(taskGroup, name);
assertEquals(expected, measured);
}
private void printMetrics() {
System.out.println();
sinkMetricValue("sink-record-read-rate");
sinkMetricValue("sink-record-read-total");
sinkMetricValue("sink-record-send-rate");
sinkMetricValue("sink-record-send-total");
sinkMetricValue("sink-record-active-count");
sinkMetricValue("sink-record-active-count-max");
sinkMetricValue("sink-record-active-count-avg");
sinkMetricValue("partition-count");
sinkMetricValue("offset-commit-seq-no");
sinkMetricValue("offset-commit-completion-rate");
sinkMetricValue("offset-commit-completion-total");
sinkMetricValue("offset-commit-skip-rate");
sinkMetricValue("offset-commit-skip-total");
sinkMetricValue("put-batch-max-time-ms");
sinkMetricValue("put-batch-avg-time-ms");
taskMetricValue("status-unassigned");
taskMetricValue("status-running");
taskMetricValue("status-paused");
taskMetricValue("status-failed");
taskMetricValue("status-destroyed");
taskMetricValue("running-ratio");
taskMetricValue("pause-ratio");
taskMetricValue("offset-commit-max-time-ms");
taskMetricValue("offset-commit-avg-time-ms");
taskMetricValue("batch-size-max");
taskMetricValue("batch-size-avg");
taskMetricValue("offset-commit-failure-percentage");
taskMetricValue("offset-commit-success-percentage");
}
private double sinkMetricValue(String metricName) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
double value = metrics.currentMetricValueAsDouble(sinkTaskGroup, metricName);
System.out.println("** " + metricName + "=" + value);
return value;
}
private double taskMetricValue(String metricName) {
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
double value = metrics.currentMetricValueAsDouble(taskGroup, metricName);
System.out.println("** " + metricName + "=" + value);
return value;
}
private void assertMetrics(int minimumPollCountExpected) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
double readRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-rate");
double readTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-total");
double sendRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-rate");
double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total");
}
private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}
private abstract static class TestSinkTask extends SinkTask {
}
}