| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.kafka.sink; |
| |
| import org.apache.flink.api.common.operators.ProcessingTimeService; |
| import org.apache.flink.api.common.serialization.SerializationSchema; |
| import org.apache.flink.api.connector.sink2.SinkWriter; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.connector.base.DeliveryGuarantee; |
| import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; |
| import org.apache.flink.metrics.Counter; |
| import org.apache.flink.metrics.Gauge; |
| import org.apache.flink.metrics.MetricGroup; |
| import org.apache.flink.metrics.groups.OperatorIOMetricGroup; |
| import org.apache.flink.metrics.groups.SinkWriterMetricGroup; |
| import org.apache.flink.metrics.testutils.MetricListener; |
| import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; |
| import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; |
| import org.apache.flink.util.TestLoggerExtension; |
| import org.apache.flink.util.UserCodeClassLoader; |
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.clients.producer.RecordMetadata; |
| import org.apache.kafka.common.errors.ProducerFencedException; |
| import org.apache.kafka.common.serialization.ByteArraySerializer; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.TestInfo; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.EnumSource; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testcontainers.containers.KafkaContainer; |
| import org.testcontainers.containers.Network; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.OptionalLong; |
| import java.util.PriorityQueue; |
| import java.util.Properties; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.function.Consumer; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA; |
| import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; |
| import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; |
| |
| /** Tests for the standalone KafkaWriter. */ |
| @ExtendWith(TestLoggerExtension.class) |
| public class KafkaWriterITCase { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(KafkaWriterITCase.class); |
| private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; |
| private static final Network NETWORK = Network.newNetwork(); |
| private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total"; |
| private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext(); |
| private String topic; |
| |
| private MetricListener metricListener; |
| private TriggerTimeService timeService; |
| |
| private static final KafkaContainer KAFKA_CONTAINER = |
| createKafkaContainer(KAFKA, LOG) |
| .withEmbeddedZookeeper() |
| .withNetwork(NETWORK) |
| .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); |
| |
| @BeforeAll |
| public static void beforeAll() { |
| KAFKA_CONTAINER.start(); |
| } |
| |
| @AfterAll |
| public static void afterAll() { |
| KAFKA_CONTAINER.stop(); |
| } |
| |
| @BeforeEach |
| public void setUp(TestInfo testInfo) { |
| metricListener = new MetricListener(); |
| timeService = new TriggerTimeService(); |
| topic = testInfo.getDisplayName().replaceAll("\\W", ""); |
| } |
| |
| @ParameterizedTest |
| @EnumSource(DeliveryGuarantee.class) |
| public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { |
| try (final KafkaWriter<Integer> ignored = |
| createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { |
| assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue(); |
| } |
| } |
| |
| @ParameterizedTest |
| @EnumSource(DeliveryGuarantee.class) |
| public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { |
| assertKafkaMetricNotPresent(guarantee, "flink.disable-metrics", "true"); |
| assertKafkaMetricNotPresent(guarantee, "register.producer.metrics", "false"); |
| } |
| |
| @Test |
| public void testIncreasingRecordBasedCounters() throws Exception { |
| final OperatorIOMetricGroup operatorIOMetricGroup = |
| UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); |
| final InternalSinkWriterMetricGroup metricGroup = |
| InternalSinkWriterMetricGroup.mock( |
| metricListener.getMetricGroup(), operatorIOMetricGroup); |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { |
| final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); |
| final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); |
| final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); |
| final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter(); |
| assertThat(numBytesOut.getCount()).isEqualTo(0L); |
| assertThat(numRecordsOut.getCount()).isEqualTo(0); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); |
| assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); |
| |
| // elements for which the serializer returns null should be silently skipped |
| writer.write(null, SINK_WRITER_CONTEXT); |
| timeService.trigger(); |
| assertThat(numBytesOut.getCount()).isEqualTo(0L); |
| assertThat(numRecordsOut.getCount()).isEqualTo(0); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); |
| assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); |
| |
| // but elements for which a non-null producer record is returned should count |
| writer.write(1, SINK_WRITER_CONTEXT); |
| timeService.trigger(); |
| assertThat(numRecordsOut.getCount()).isEqualTo(1); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); |
| assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); |
| assertThat(numBytesOut.getCount()).isGreaterThan(0L); |
| } |
| } |
| |
| @Test |
| public void testCurrentSendTimeMetric() throws Exception { |
| final InternalSinkWriterMetricGroup metricGroup = |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), |
| DeliveryGuarantee.AT_LEAST_ONCE, |
| metricGroup)) { |
| final Optional<Gauge<Long>> currentSendTime = |
| metricListener.getGauge("currentSendTime"); |
| assertThat(currentSendTime.isPresent()).isTrue(); |
| assertThat(currentSendTime.get().getValue()).isEqualTo(0L); |
| IntStream.range(0, 100) |
| .forEach( |
| (run) -> { |
| try { |
| writer.write(1, SINK_WRITER_CONTEXT); |
| // Manually flush the records to generate a sendTime |
| if (run % 10 == 0) { |
| writer.flush(false); |
| } |
| } catch (IOException | InterruptedException e) { |
| throw new RuntimeException("Failed writing Kafka record."); |
| } |
| }); |
| assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); |
| } |
| } |
| |
| @Test |
| void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { |
| Properties properties = getKafkaClientConfiguration(); |
| |
| SinkInitContext sinkInitContext = |
| new SinkInitContext( |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), |
| timeService, |
| null); |
| final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); |
| final Counter numRecordsOutErrors = |
| sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); |
| |
| triggerProducerException(writer, properties); |
| |
| // test flush |
| assertThatCode(() -> writer.flush(false)) |
| .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| |
| assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) |
| .as("the exception is not thrown again") |
| .doesNotThrowAnyException(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| } |
| |
| @Test |
| void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception { |
| Properties properties = getKafkaClientConfiguration(); |
| |
| SinkInitContext sinkInitContext = |
| new SinkInitContext( |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), |
| timeService, |
| null); |
| final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); |
| final Counter numRecordsOutErrors = |
| sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); |
| |
| triggerProducerException(writer, properties); |
| // to ensure that the exceptional send request has completed |
| writer.getCurrentProducer().flush(); |
| |
| assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) |
| .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| |
| assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) |
| .as("the exception is not thrown again") |
| .doesNotThrowAnyException(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| } |
| |
| @Test |
| void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception { |
| Properties properties = getKafkaClientConfiguration(); |
| |
| SinkInitContext sinkInitContext = |
| new SinkInitContext( |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), |
| timeService, |
| null); |
| final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); |
| final Counter numRecordsOutErrors = |
| sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); |
| |
| triggerProducerException(writer, properties); |
| // to ensure that the exceptional send request has completed |
| writer.getCurrentProducer().flush(); |
| |
| while (sinkInitContext.getMailboxExecutor().tryYield()) { |
| // execute all mails |
| } |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| |
| assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) |
| .as("the exception is not thrown again") |
| .doesNotThrowAnyException(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| } |
| |
| @Test |
| void testCloseAsyncErrorPropagationAndErrorCounter() throws Exception { |
| Properties properties = getKafkaClientConfiguration(); |
| |
| SinkInitContext sinkInitContext = |
| new SinkInitContext( |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), |
| timeService, |
| null); |
| final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); |
| final Counter numRecordsOutErrors = |
| sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); |
| |
| triggerProducerException(writer, properties); |
| // to ensure that the exceptional send request has completed |
| writer.getCurrentProducer().flush(); |
| |
| // test flush |
| assertThatCode(writer::close) |
| .as("flush should throw the exception from the WriterCallback") |
| .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); |
| assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); |
| } |
| |
| private void triggerProducerException(KafkaWriter<Integer> writer, Properties properties) |
| throws IOException { |
| final String transactionalId = writer.getCurrentProducer().getTransactionalId(); |
| |
| try (FlinkKafkaInternalProducer<byte[], byte[]> producer = |
| new FlinkKafkaInternalProducer<>(properties, transactionalId)) { |
| producer.initTransactions(); |
| producer.beginTransaction(); |
| producer.send(new ProducerRecord<byte[], byte[]>(topic, "1".getBytes())); |
| producer.commitTransaction(); |
| } |
| |
| writer.write(1, SINK_WRITER_CONTEXT); |
| } |
| |
| @Test |
| public void testMetadataPublisher() throws Exception { |
| List<String> metadataList = new ArrayList<>(); |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), |
| DeliveryGuarantee.AT_LEAST_ONCE, |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), |
| meta -> metadataList.add(meta.toString()))) { |
| List<String> expected = new ArrayList<>(); |
| for (int i = 0; i < 100; i++) { |
| writer.write(1, SINK_WRITER_CONTEXT); |
| expected.add("testMetadataPublisher-0@" + i); |
| } |
| writer.flush(false); |
| assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected); |
| } |
| } |
| |
| /** Test that producer is not accidentally recreated or pool is used. */ |
| @Test |
| void testLingeringTransaction() throws Exception { |
| final KafkaWriter<Integer> failedWriter = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE); |
| |
| // create two lingering transactions |
| failedWriter.flush(false); |
| failedWriter.prepareCommit(); |
| failedWriter.snapshotState(1); |
| failedWriter.flush(false); |
| failedWriter.prepareCommit(); |
| failedWriter.snapshotState(2); |
| |
| try (final KafkaWriter<Integer> recoveredWriter = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { |
| recoveredWriter.write(1, SINK_WRITER_CONTEXT); |
| |
| recoveredWriter.flush(false); |
| Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit(); |
| recoveredWriter.snapshotState(1); |
| assertThat(committables).hasSize(1); |
| final KafkaCommittable committable = committables.stream().findFirst().get(); |
| assertThat(committable.getProducer().isPresent()).isTrue(); |
| |
| committable.getProducer().get().getObject().commitTransaction(); |
| |
| List<ConsumerRecord<byte[], byte[]>> records = |
| drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); |
| assertThat(records).hasSize(1); |
| } |
| |
| failedWriter.close(); |
| } |
| |
| /** Test that producer is not accidentally recreated or pool is used. */ |
| @ParameterizedTest |
| @EnumSource( |
| value = DeliveryGuarantee.class, |
| names = "EXACTLY_ONCE", |
| mode = EnumSource.Mode.EXCLUDE) |
| void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception { |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { |
| assertThat(writer.getProducerPool()).hasSize(0); |
| |
| FlinkKafkaInternalProducer<byte[], byte[]> firstProducer = writer.getCurrentProducer(); |
| writer.flush(false); |
| Collection<KafkaCommittable> committables = writer.prepareCommit(); |
| writer.snapshotState(0); |
| assertThat(committables).hasSize(0); |
| |
| assertThat(writer.getCurrentProducer() == firstProducer) |
| .as("Expected same producer") |
| .isTrue(); |
| assertThat(writer.getProducerPool()).hasSize(0); |
| } |
| } |
| |
| /** Test that producers are reused when committed. */ |
| @Test |
| void usePoolForTransactional() throws Exception { |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { |
| assertThat(writer.getProducerPool()).hasSize(0); |
| |
| writer.write(1, SINK_WRITER_CONTEXT); |
| writer.flush(false); |
| Collection<KafkaCommittable> committables0 = writer.prepareCommit(); |
| writer.snapshotState(1); |
| assertThat(committables0).hasSize(1); |
| final KafkaCommittable committable = committables0.stream().findFirst().get(); |
| assertThat(committable.getProducer().isPresent()).isTrue(); |
| |
| FlinkKafkaInternalProducer<?, ?> firstProducer = |
| committable.getProducer().get().getObject(); |
| assertThat(firstProducer != writer.getCurrentProducer()) |
| .as("Expected different producer") |
| .isTrue(); |
| |
| // recycle first producer, KafkaCommitter would commit it and then return it |
| assertThat(writer.getProducerPool()).hasSize(0); |
| firstProducer.commitTransaction(); |
| committable.getProducer().get().close(); |
| assertThat(writer.getProducerPool()).hasSize(1); |
| |
| writer.write(1, SINK_WRITER_CONTEXT); |
| writer.flush(false); |
| Collection<KafkaCommittable> committables1 = writer.prepareCommit(); |
| writer.snapshotState(2); |
| assertThat(committables1).hasSize(1); |
| final KafkaCommittable committable1 = committables1.stream().findFirst().get(); |
| assertThat(committable1.getProducer().isPresent()).isTrue(); |
| |
| assertThat(firstProducer == writer.getCurrentProducer()) |
| .as("Expected recycled producer") |
| .isTrue(); |
| } |
| } |
| |
| /** |
| * Tests that if a pre-commit attempt occurs on an empty transaction, the writer should not emit |
| * a KafkaCommittable, and instead immediately commit the empty transaction and recycle the |
| * producer. |
| */ |
| @Test |
| void prepareCommitForEmptyTransaction() throws Exception { |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration( |
| getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { |
| assertThat(writer.getProducerPool()).hasSize(0); |
| |
| // no data written to current transaction |
| writer.flush(false); |
| Collection<KafkaCommittable> emptyCommittables = writer.prepareCommit(); |
| |
| assertThat(emptyCommittables).hasSize(0); |
| assertThat(writer.getProducerPool()).hasSize(1); |
| final FlinkKafkaInternalProducer<?, ?> recycledProducer = |
| writer.getProducerPool().pop(); |
| assertThat(recycledProducer.isInTransaction()).isFalse(); |
| } |
| } |
| |
| /** |
| * Tests that open transactions are automatically aborted on close such that successive writes |
| * succeed. |
| */ |
| @Test |
| void testAbortOnClose() throws Exception { |
| Properties properties = getKafkaClientConfiguration(); |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { |
| writer.write(1, SINK_WRITER_CONTEXT); |
| assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0); |
| } |
| |
| try (final KafkaWriter<Integer> writer = |
| createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { |
| writer.write(2, SINK_WRITER_CONTEXT); |
| writer.flush(false); |
| Collection<KafkaCommittable> committables = writer.prepareCommit(); |
| writer.snapshotState(1L); |
| |
| // manually commit here, which would only succeed if the first transaction was aborted |
| assertThat(committables).hasSize(1); |
| final KafkaCommittable committable = committables.stream().findFirst().get(); |
| String transactionalId = committable.getTransactionalId(); |
| try (FlinkKafkaInternalProducer<byte[], byte[]> producer = |
| new FlinkKafkaInternalProducer<>(properties, transactionalId)) { |
| producer.resumeTransaction(committable.getProducerId(), committable.getEpoch()); |
| producer.commitTransaction(); |
| } |
| |
| assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1); |
| } |
| } |
| |
| private void assertKafkaMetricNotPresent( |
| DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { |
| final Properties config = getKafkaClientConfiguration(); |
| config.put(configKey, configValue); |
| try (final KafkaWriter<Integer> ignored = |
| createWriterWithConfiguration(config, guarantee)) { |
| assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME)).isNotPresent(); |
| } |
| } |
| |
| private KafkaWriter<Integer> createWriterWithConfiguration( |
| Properties config, DeliveryGuarantee guarantee) { |
| return createWriterWithConfiguration( |
| config, |
| guarantee, |
| InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())); |
| } |
| |
| private KafkaWriter<Integer> createWriterWithConfiguration( |
| Properties config, |
| DeliveryGuarantee guarantee, |
| SinkWriterMetricGroup sinkWriterMetricGroup) { |
| return createWriterWithConfiguration(config, guarantee, sinkWriterMetricGroup, null); |
| } |
| |
| private KafkaWriter<Integer> createWriterWithConfiguration( |
| Properties config, |
| DeliveryGuarantee guarantee, |
| SinkWriterMetricGroup sinkWriterMetricGroup, |
| @Nullable Consumer<RecordMetadata> metadataConsumer) { |
| return new KafkaWriter<>( |
| guarantee, |
| config, |
| "test-prefix", |
| new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer), |
| new DummyRecordSerializer(), |
| new DummySchemaContext(), |
| Collections.emptyList()); |
| } |
| |
| private KafkaWriter<Integer> createWriterWithConfiguration( |
| Properties config, DeliveryGuarantee guarantee, SinkInitContext sinkInitContext) { |
| return new KafkaWriter<>( |
| guarantee, |
| config, |
| "test-prefix", |
| sinkInitContext, |
| new DummyRecordSerializer(), |
| new DummySchemaContext(), |
| Collections.emptyList()); |
| } |
| |
| private static Properties getKafkaClientConfiguration() { |
| final Properties standardProps = new Properties(); |
| standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); |
| standardProps.put("group.id", "kafkaWriter-tests"); |
| standardProps.put("enable.auto.commit", false); |
| standardProps.put("key.serializer", ByteArraySerializer.class.getName()); |
| standardProps.put("value.serializer", ByteArraySerializer.class.getName()); |
| standardProps.put("auto.offset.reset", "earliest"); |
| return standardProps; |
| } |
| |
| private static class SinkInitContext extends TestSinkInitContext { |
| |
| private final SinkWriterMetricGroup metricGroup; |
| private final ProcessingTimeService timeService; |
| @Nullable private final Consumer<RecordMetadata> metadataConsumer; |
| |
| SinkInitContext( |
| SinkWriterMetricGroup metricGroup, |
| ProcessingTimeService timeService, |
| @Nullable Consumer<RecordMetadata> metadataConsumer) { |
| this.metricGroup = metricGroup; |
| this.timeService = timeService; |
| this.metadataConsumer = metadataConsumer; |
| } |
| |
| @Override |
| public UserCodeClassLoader getUserCodeClassLoader() { |
| throw new UnsupportedOperationException("Not implemented."); |
| } |
| |
| @Override |
| public ProcessingTimeService getProcessingTimeService() { |
| return timeService; |
| } |
| |
| @Override |
| public int getSubtaskId() { |
| return 0; |
| } |
| |
| @Override |
| public int getNumberOfParallelSubtasks() { |
| return 1; |
| } |
| |
| @Override |
| public int getAttemptNumber() { |
| return 0; |
| } |
| |
| @Override |
| public SinkWriterMetricGroup metricGroup() { |
| return metricGroup; |
| } |
| |
| @Override |
| public OptionalLong getRestoredCheckpointId() { |
| return OptionalLong.empty(); |
| } |
| |
| @Override |
| public SerializationSchema.InitializationContext |
| asSerializationSchemaInitializationContext() { |
| return null; |
| } |
| |
| @Override |
| public <MetaT> Optional<Consumer<MetaT>> metadataConsumer() { |
| return Optional.ofNullable((Consumer<MetaT>) metadataConsumer); |
| } |
| } |
| |
| private class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer> { |
| @Override |
| public ProducerRecord<byte[], byte[]> serialize( |
| Integer element, KafkaSinkContext context, Long timestamp) { |
| if (element == null) { |
| // in general, serializers should be allowed to skip invalid elements |
| return null; |
| } |
| return new ProducerRecord<>(topic, ByteBuffer.allocate(4).putInt(element).array()); |
| } |
| } |
| |
| private static class DummySchemaContext implements SerializationSchema.InitializationContext { |
| |
| @Override |
| public MetricGroup getMetricGroup() { |
| throw new UnsupportedOperationException("Not implemented."); |
| } |
| |
| @Override |
| public UserCodeClassLoader getUserCodeClassLoader() { |
| throw new UnsupportedOperationException("Not implemented."); |
| } |
| } |
| |
| private static class DummySinkWriterContext implements SinkWriter.Context { |
| @Override |
| public long currentWatermark() { |
| return 0; |
| } |
| |
| @Override |
| public Long timestamp() { |
| return null; |
| } |
| } |
| |
| private static class TriggerTimeService implements ProcessingTimeService { |
| |
| private final PriorityQueue<Tuple2<Long, ProcessingTimeCallback>> registeredCallbacks = |
| new PriorityQueue<>(Comparator.comparingLong(o -> o.f0)); |
| |
| @Override |
| public long getCurrentProcessingTime() { |
| return 0; |
| } |
| |
| @Override |
| public ScheduledFuture<?> registerTimer( |
| long time, ProcessingTimeCallback processingTimerCallback) { |
| registeredCallbacks.add(new Tuple2<>(time, processingTimerCallback)); |
| return null; |
| } |
| |
| public void trigger() throws Exception { |
| final Tuple2<Long, ProcessingTimeCallback> registered = registeredCallbacks.poll(); |
| if (registered == null) { |
| LOG.warn("Triggered time service but no callback was registered."); |
| return; |
| } |
| registered.f1.onProcessingTime(registered.f0); |
| } |
| } |
| } |