blob: 3a0090f2267b0f197b3d1c619f9df5c5838fb265 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.Set;
import java.util.Collections;
import java.util.Collection;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Time.SYSTEM;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doReturn;
@RunWith(Parameterized.class)
public class ErrorHandlingTaskTest {
@Rule
public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
private static final String TOPIC = "test";
private static final int PARTITION1 = 12;
private static final int PARTITION2 = 13;
private static final long FIRST_OFFSET = 45;
@Mock
Plugins plugins;
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000;
public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000;
public static final ToleranceType OPERATOR_TOLERANCE_TYPE = ToleranceType.ALL;
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private TargetState initialState = TargetState.STARTED;
private Time time;
private MockConnectMetrics metrics;
@SuppressWarnings("unused")
@Mock
private SinkTask sinkTask;
@SuppressWarnings("unused")
@Mock
private SourceTask sourceTask;
private WorkerConfig workerConfig;
private SourceConnectorConfig sourceConfig;
@Mock
private PluginClassLoader pluginLoader;
@SuppressWarnings("unused")
@Mock
private HeaderConverter headerConverter;
private WorkerSinkTask workerSinkTask;
private WorkerSourceTask workerSourceTask;
@SuppressWarnings("unused")
@Mock
private KafkaConsumer<byte[], byte[]> consumer;
@SuppressWarnings("unused")
@Mock
private KafkaProducer<byte[], byte[]> producer;
@SuppressWarnings("unused")
@Mock private TopicAdmin admin;
@Mock
OffsetStorageReaderImpl offsetReader;
@Mock
OffsetStorageWriter offsetWriter;
@Mock
private ConnectorOffsetBackingStore offsetStore;
@SuppressWarnings("unused")
@Mock
private TaskStatus.Listener statusListener;
@SuppressWarnings("unused")
@Mock private StatusBackingStore statusBackingStore;
@Mock
private WorkerErrantRecordReporter workerErrantRecordReporter;
private ErrorHandlingMetrics errorHandlingMetrics;
private boolean enableTopicCreation;
@Parameterized.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public ErrorHandlingTaskTest(boolean enableTopicCreation) {
this.enableTopicCreation = enableTopicCreation;
}
@Before
public void setup() {
time = new MockTime(0, 0, 0);
metrics = new MockConnectMetrics();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
workerConfig = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorProps(TOPIC), true);
errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
}
private Map<String, String> sourceConnectorProps(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put("name", "foo-connector");
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(1));
props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
return props;
}
@After
public void tearDown() {
if (metrics != null) {
metrics.stop();
}
}
@Test
public void testSinkTasksCloseErrorReporters() throws Exception {
ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
// verify if invocation happened exactly 1 time
verifyInitializeSink();
verify(reporter).close();
verify(sinkTask).stop();
verify(consumer).close();
verify(headerConverter).close();
}
@Test
public void testSourceTasksCloseErrorReporters() throws IOException {
ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
verifyCloseSource();
verify(reporter).close();
}
@Test
public void testCloseErrorReportersExceptionPropagation() throws IOException {
ErrorReporter reporterA = mock(ErrorReporter.class);
ErrorReporter reporterB = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
createSourceTask(initialState, retryWithToleranceOperator);
// Even though the reporters throw exceptions, they should both still be closed.
doThrow(new RuntimeException()).when(reporterA).close();
doThrow(new RuntimeException()).when(reporterB).close();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
verify(reporterA).close();
verify(reporterB).close();
verifyCloseSource();
}
@Test
public void testErrorHandlingInSinkTasks() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
// valid json
ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
TOPIC, PARTITION1, FIRST_OFFSET,
null, "{\"a\": 10}".getBytes());
// bad json
ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(
TOPIC, PARTITION2, FIRST_OFFSET,
null, "{\"a\" 10}".getBytes());
when(consumer.poll(any()))
.thenReturn(records(record1))
.thenReturn(records(record2));
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.iteration();
workerSinkTask.iteration();
verifyInitializeSink();
verify(sinkTask, times(2)).put(any());
// two records were consumed from Kafka
assertSinkMetricValue("sink-record-read-total", 2.0);
// only one was written to the task
assertSinkMetricValue("sink-record-send-total", 1.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 1.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 3.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 1.0);
}
private RetryWithToleranceOperator operator() {
return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS,
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE,
SYSTEM, errorHandlingMetrics);
}
@Test
public void testErrorHandlingInSourceTasks() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
// valid json
Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
Struct struct1 = new Struct(valSchema).put("val", 1234);
SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
when(workerSourceTask.isStopping())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
doReturn(true).when(workerSourceTask).commitOffsets();
when(sourceTask.poll())
.thenReturn(singletonList(record1))
.thenReturn(singletonList(record2));
expectTopicCreation(TOPIC);
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
verify(workerSourceTask, times(3)).isStopping();
verify(workerSourceTask).commitOffsets();
verify(offsetStore).start();
verify(sourceTask).initialize(any());
verify(sourceTask).start(any());
verify(sourceTask, times(2)).poll();
verify(producer, times(2)).send(any(), any());
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
assertSourceMetricValue("source-record-write-total", 0.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 0.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 4.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
}
private ConnectorConfig connConfig(Map<String, String> connProps) {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, "test");
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName());
props.putAll(connProps);
return new ConnectorConfig(plugins, props);
}
@Test
public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator, badConverter());
// valid json
Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
Struct struct1 = new Struct(valSchema).put("val", 1234);
SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
when(workerSourceTask.isStopping())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
doReturn(true).when(workerSourceTask).commitOffsets();
when(sourceTask.poll())
.thenReturn(singletonList(record1))
.thenReturn(singletonList(record2));
expectTopicCreation(TOPIC);
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
assertSourceMetricValue("source-record-write-total", 0.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 0.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 8.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
verify(workerSourceTask, times(3)).isStopping();
verify(workerSourceTask).commitOffsets();
verify(offsetStore).start();
verify(sourceTask).initialize(any());
verify(sourceTask).start(any());
verify(sourceTask, times(2)).poll();
verify(producer, times(2)).send(any(), any());
}
private void assertSinkMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = workerSinkTask.sinkTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void verifyInitializeSink() {
verify(sinkTask).start(TASK_PROPS);
verify(sinkTask).initialize(any(WorkerSinkTaskContext.class));
verify(consumer).subscribe(eq(singletonList(TOPIC)),
any(ConsumerRebalanceListener.class));
}
private void assertSourceMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = workerSourceTask.sourceTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertErrorHandlingMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = errorHandlingMetrics.metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void verifyCloseSource() throws IOException {
verify(producer).close(any(Duration.class));
verify(admin).close(any(Duration.class));
verify(offsetReader).close();
verify(offsetStore).stop();
// headerConverter.close() can throw IOException
verify(headerConverter).close();
}
private void expectTopicCreation(String topic) {
if (enableTopicCreation) {
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(response);
}
}
private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
JsonConverter converter = new JsonConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
TransformationChain<SinkRecord> sinkTransforms =
new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SinkRecord>())), retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter, errorHandlingMetrics,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter,
statusBackingStore);
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
JsonConverter converter = new JsonConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
createSourceTask(initialState, retryWithToleranceOperator, converter);
}
private Converter badConverter() {
FaultyConverter converter = new FaultyConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
return converter;
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
workerSourceTask = spy(new WorkerSourceTask(
taskId, sourceTask, statusListener, initialState, converter,
converter, errorHandlingMetrics, headerConverter,
sourceTransforms, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run));
}
private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecords<>(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()), singletonList(record)));
}
private abstract static class TestSinkTask extends SinkTask {
}
static class FaultyConverter extends JsonConverter {
private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
private int invocations = 0;
public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (value == null) {
return super.fromConnectData(topic, schema, null);
}
invocations++;
if (invocations % 3 == 0) {
log.debug("Succeeding record: {} where invocations={}", value, invocations);
return super.fromConnectData(topic, schema, value);
} else {
log.debug("Failing record: {} at invocations={}", value, invocations);
throw new RetriableException("Bad invocations " + invocations + " for mod 3");
}
}
}
static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
private static final String MOD_CONFIG = "mod";
private static final int MOD_CONFIG_DEFAULT = 3;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(MOD_CONFIG, ConfigDef.Type.INT, MOD_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, "Pass records without failure only if timestamp % mod == 0");
private int mod = MOD_CONFIG_DEFAULT;
private int invocations = 0;
@Override
public R apply(R record) {
invocations++;
if (invocations % mod == 0) {
log.debug("Succeeding record: {} where invocations={}", record, invocations);
return record;
} else {
log.debug("Failing record: {} at invocations={}", record, invocations);
throw new RetriableException("Bad invocations " + invocations + " for mod " + mod);
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
log.info("Shutting down transform");
}
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
mod = Math.max(config.getInt(MOD_CONFIG), 2);
log.info("Configuring {}. Setting mod to {}", this.getClass(), mod);
}
}
}