| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.kafka; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import java.io.IOException; |
| import java.lang.reflect.InvocationTargetException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.io.UnboundedSource; |
| import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; |
| import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; |
| import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; |
| import org.apache.beam.sdk.io.kafka.KafkaIO.Read; |
| import org.apache.beam.sdk.metrics.Counter; |
| import org.apache.beam.sdk.metrics.Gauge; |
| import org.apache.beam.sdk.metrics.Metrics; |
| import org.apache.beam.sdk.metrics.SourceMetrics; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.WakeupException; |
| import org.apache.kafka.common.serialization.Deserializer; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An unbounded reader to read from Kafka. Each reader consumes messages from one or more Kafka |
| * partitions. See {@link KafkaIO} for user visible documentation and example usage. |
| */ |
| class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { |
| |
| ///////////////////// Reader API //////////////////////////////////////////////////////////// |
| @SuppressWarnings("FutureReturnValueIgnored") |
| @Override |
| public boolean start() throws IOException { |
| final int defaultPartitionInitTimeout = 60 * 1000; |
| final int kafkaRequestTimeoutMultiple = 2; |
| |
| Read<K, V> spec = source.getSpec(); |
| consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); |
| consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions()); |
| |
| try { |
| keyDeserializerInstance = spec.getKeyDeserializer().getDeclaredConstructor().newInstance(); |
| valueDeserializerInstance = |
| spec.getValueDeserializer().getDeclaredConstructor().newInstance(); |
| } catch (InstantiationException |
| | IllegalAccessException |
| | InvocationTargetException |
| | NoSuchMethodException e) { |
| throw new IOException("Could not instantiate deserializers", e); |
| } |
| |
| keyDeserializerInstance.configure(spec.getConsumerConfig(), true); |
| valueDeserializerInstance.configure(spec.getConsumerConfig(), false); |
| |
| // Seek to start offset for each partition. This is the first interaction with the server. |
| // Unfortunately it can block forever in case of network issues like incorrect ACLs. |
| // Initialize partition in a separate thread and cancel it if takes longer than a minute. |
| for (final PartitionState pState : partitionStates) { |
| Future<?> future = consumerPollThread.submit(() -> setupInitialOffset(pState)); |
| |
| try { |
| // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set. |
| Integer reqTimeout = |
| (Integer) spec.getConsumerConfig().get(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); |
| future.get( |
| reqTimeout != null |
| ? kafkaRequestTimeoutMultiple * reqTimeout |
| : defaultPartitionInitTimeout, |
| TimeUnit.MILLISECONDS); |
| } catch (TimeoutException e) { |
| consumer.wakeup(); // This unblocks consumer stuck on network I/O. |
| // Likely reason : Kafka servers are configured to advertise internal ips, but |
| // those ips are not accessible from workers outside. |
| String msg = |
| String.format( |
| "%s: Timeout while initializing partition '%s'. " |
| + "Kafka client may not be able to connect to servers.", |
| this, pState.topicPartition); |
| LOG.error("{}", msg); |
| throw new IOException(msg); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| LOG.info( |
| "{}: reading from {} starting at offset {}", |
| name, |
| pState.topicPartition, |
| pState.nextOffset); |
| } |
| |
| // Start consumer read loop. |
| // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop(). |
| consumerPollThread.submit(this::consumerPollLoop); |
| |
| // offsetConsumer setup : |
| Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig(); |
| |
| offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); |
| consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions()); |
| |
| // Fetch offsets once before running periodically. |
| updateLatestOffsets(); |
| |
| offsetFetcherThread.scheduleAtFixedRate( |
| this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); |
| |
| return advance(); |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| /* Read first record (if any). we need to loop here because : |
| * - (a) some records initially need to be skipped if they are before consumedOffset |
| * - (b) if curBatch is empty, we want to fetch next batch and then advance. |
| * - (c) curBatch is an iterator of iterators. we interleave the records from each. |
| * curBatch.next() might return an empty iterator. |
| */ |
| while (true) { |
| if (curBatch.hasNext()) { |
| PartitionState<K, V> pState = curBatch.next(); |
| |
| if (!pState.recordIter.hasNext()) { // -- (c) |
| pState.recordIter = Collections.emptyIterator(); // drop ref |
| curBatch.remove(); |
| continue; |
| } |
| |
| elementsRead.inc(); |
| elementsReadBySplit.inc(); |
| |
| ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next(); |
| long expected = pState.nextOffset; |
| long offset = rawRecord.offset(); |
| |
| if (offset < expected) { // -- (a) |
| // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) |
| // should we check if the offset is way off from consumedOffset (say > 1M)? |
| LOG.warn( |
| "{}: ignoring already consumed offset {} for {}", |
| this, |
| offset, |
| pState.topicPartition); |
| continue; |
| } |
| |
| long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. |
| |
| if (curRecord == null) { |
| LOG.info("{}: first record offset {}", name, offset); |
| offsetGap = 0; |
| } |
| |
| // Apply user deserializers. User deserializers might throw, which will be propagated up |
| // and 'curRecord' remains unchanged. The runner should close this reader. |
| // TODO: write records that can't be deserialized to a "dead-letter" additional output. |
| KafkaRecord<K, V> record = |
| new KafkaRecord<>( |
| rawRecord.topic(), |
| rawRecord.partition(), |
| rawRecord.offset(), |
| consumerSpEL.getRecordTimestamp(rawRecord), |
| consumerSpEL.getRecordTimestampType(rawRecord), |
| ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, |
| keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), |
| valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); |
| |
| curTimestamp = |
| pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record); |
| curRecord = record; |
| |
| int recordSize = |
| (rawRecord.key() == null ? 0 : rawRecord.key().length) |
| + (rawRecord.value() == null ? 0 : rawRecord.value().length); |
| pState.recordConsumed(offset, recordSize, offsetGap); |
| bytesRead.inc(recordSize); |
| bytesReadBySplit.inc(recordSize); |
| return true; |
| |
| } else { // -- (b) |
| nextBatch(); |
| |
| if (!curBatch.hasNext()) { |
| return false; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Instant getWatermark() { |
| |
| if (source.getSpec().getWatermarkFn() != null) { |
| // Support old API which requires a KafkaRecord to invoke watermarkFn. |
| if (curRecord == null) { |
| LOG.debug("{}: getWatermark() : no records have been read yet.", name); |
| return initialWatermark; |
| } |
| return source.getSpec().getWatermarkFn().apply(curRecord); |
| } |
| |
| // Return minimum watermark among partitions. |
| return partitionStates.stream() |
| .map(PartitionState::updateAndGetWatermark) |
| .min(Comparator.naturalOrder()) |
| .get(); |
| } |
| |
| @Override |
| public CheckpointMark getCheckpointMark() { |
| reportBacklog(); |
| return new KafkaCheckpointMark( |
| partitionStates.stream() |
| .map( |
| p -> |
| new PartitionMark( |
| p.topicPartition.topic(), |
| p.topicPartition.partition(), |
| p.nextOffset, |
| p.lastWatermark.getMillis())) |
| .collect(Collectors.toList()), |
| source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty()); |
| } |
| |
| @Override |
| public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public KafkaRecord<K, V> getCurrent() throws NoSuchElementException { |
| // should we delay updating consumed offset till this point? Mostly not required. |
| return curRecord; |
| } |
| |
| @Override |
| public Instant getCurrentTimestamp() throws NoSuchElementException { |
| return curTimestamp; |
| } |
| |
| @Override |
| public long getSplitBacklogBytes() { |
| long backlogBytes = 0; |
| |
| for (PartitionState p : partitionStates) { |
| long pBacklog = p.approxBacklogInBytes(); |
| if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { |
| return UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| backlogBytes += pBacklog; |
| } |
| |
| return backlogBytes; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class); |
| |
| @VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader"; |
| |
| @VisibleForTesting |
| static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued"; |
| |
| private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC = |
| "checkpointMarkCommitsSkipped"; |
| |
| private final KafkaUnboundedSource<K, V> source; |
| private final String name; |
| private Consumer<byte[], byte[]> consumer; |
| private final List<PartitionState<K, V>> partitionStates; |
| private KafkaRecord<K, V> curRecord; |
| private Instant curTimestamp; |
| private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator(); |
| |
| private Deserializer<K> keyDeserializerInstance = null; |
| private Deserializer<V> valueDeserializerInstance = null; |
| |
| private final Counter elementsRead = SourceMetrics.elementsRead(); |
| private final Counter bytesRead = SourceMetrics.bytesRead(); |
| private final Counter elementsReadBySplit; |
| private final Counter bytesReadBySplit; |
| private final Gauge backlogBytesOfSplit; |
| private final Gauge backlogElementsOfSplit; |
| private final Counter checkpointMarkCommitsEnqueued = |
| Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); |
| // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). |
| private final Counter checkpointMarkCommitsSkipped = |
| Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC); |
| |
| /** |
| * The poll timeout while reading records from Kafka. If option to commit reader offsets in to |
| * Kafka in {@link KafkaCheckpointMark#finalizeCheckpoint()} is enabled, it would be delayed until |
| * this poll returns. It should be reasonably low as a result. At the same time it probably can't |
| * be very low like 10 millis, I am not sure how it affects when the latency is high. Probably |
| * good to experiment. Often multiple marks would be finalized in a batch, it it reduce |
| * finalization overhead to wait a short while and finalize only the last checkpoint mark. |
| */ |
| private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); |
| |
| private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10); |
| private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100); |
| |
| // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including |
| // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout |
| // like 100 milliseconds does not work well. This along with large receive buffer for |
| // consumer achieved best throughput in tests (see `defaultConsumerProperties`). |
| private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor(); |
| private AtomicReference<Exception> consumerPollException = new AtomicReference<>(); |
| private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue = |
| new SynchronousQueue<>(); |
| private AtomicReference<KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference<>(); |
| private AtomicBoolean closed = new AtomicBoolean(false); |
| |
| // Backlog support : |
| // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd() |
| // then look at position(). Use another consumer to do this so that the primary consumer does |
| // not need to be interrupted. The latest offsets are fetched periodically on a thread. This is |
| // still a bit of a hack, but so far there haven't been any issues reported by the users. |
| private Consumer<byte[], byte[]> offsetConsumer; |
| private final ScheduledExecutorService offsetFetcherThread = |
| Executors.newSingleThreadScheduledExecutor(); |
| private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1; |
| |
| private static final long UNINITIALIZED_OFFSET = -1; |
| |
| // Add SpEL instance to cover the interface difference of Kafka client |
| private transient ConsumerSpEL consumerSpEL; |
| |
| /** watermark before any records have been read. */ |
| private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| |
| // Maintains approximate average over last 1000 elements |
| private static class MovingAvg { |
| private static final int MOVING_AVG_WINDOW = 1000; |
| private double avg = 0; |
| private long numUpdates = 0; |
| |
| void update(double quantity) { |
| numUpdates++; |
| avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); |
| } |
| |
| double get() { |
| return avg; |
| } |
| } |
| |
| private static class TimestampPolicyContext extends TimestampPolicy.PartitionContext { |
| |
| private final long messageBacklog; |
| private final Instant backlogCheckTime; |
| |
| TimestampPolicyContext(long messageBacklog, Instant backlogCheckTime) { |
| this.messageBacklog = messageBacklog; |
| this.backlogCheckTime = backlogCheckTime; |
| } |
| |
| @Override |
| public long getMessageBacklog() { |
| return messageBacklog; |
| } |
| |
| @Override |
| public Instant getBacklogCheckTime() { |
| return backlogCheckTime; |
| } |
| } |
| |
| // maintains state of each assigned partition (buffered records, consumed offset, etc) |
| private static class PartitionState<K, V> { |
| private final TopicPartition topicPartition; |
| private long nextOffset; |
| private long latestOffset; |
| private Instant latestOffsetFetchTime; |
| private Instant lastWatermark; // As returned by timestampPolicy |
| private final TimestampPolicy<K, V> timestampPolicy; |
| |
| private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator(); |
| |
| private MovingAvg avgRecordSize = new MovingAvg(); |
| private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled. |
| |
| PartitionState( |
| TopicPartition partition, long nextOffset, TimestampPolicy<K, V> timestampPolicy) { |
| this.topicPartition = partition; |
| this.nextOffset = nextOffset; |
| this.latestOffset = UNINITIALIZED_OFFSET; |
| this.latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE; |
| this.lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; |
| this.timestampPolicy = timestampPolicy; |
| } |
| |
| // Update consumedOffset, avgRecordSize, and avgOffsetGap |
| void recordConsumed(long offset, int size, long offsetGap) { |
| nextOffset = offset + 1; |
| |
| // This is always updated from single thread. Probably not worth making atomic. |
| avgRecordSize.update(size); |
| avgOffsetGap.update(offsetGap); |
| } |
| |
| synchronized void setLatestOffset(long latestOffset, Instant fetchTime) { |
| this.latestOffset = latestOffset; |
| this.latestOffsetFetchTime = fetchTime; |
| LOG.debug( |
| "{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", |
| this, |
| topicPartition, |
| latestOffset, |
| nextOffset, |
| avgRecordSize); |
| } |
| |
| synchronized long approxBacklogInBytes() { |
| // Note that is an an estimate of uncompressed backlog. |
| long backlogMessageCount = backlogMessageCount(); |
| if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) { |
| return UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| return (long) (backlogMessageCount * avgRecordSize.get()); |
| } |
| |
| synchronized long backlogMessageCount() { |
| if (latestOffset < 0 || nextOffset < 0) { |
| return UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get()); |
| return Math.max(0, (long) Math.ceil(remaining)); |
| } |
| |
| synchronized TimestampPolicyContext mkTimestampPolicyContext() { |
| return new TimestampPolicyContext(backlogMessageCount(), latestOffsetFetchTime); |
| } |
| |
| Instant updateAndGetWatermark() { |
| lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext()); |
| return lastWatermark; |
| } |
| } |
| |
| KafkaUnboundedReader( |
| KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) { |
| this.consumerSpEL = new ConsumerSpEL(); |
| this.source = source; |
| this.name = "Reader-" + source.getId(); |
| |
| List<TopicPartition> partitions = source.getSpec().getTopicPartitions(); |
| List<PartitionState<K, V>> states = new ArrayList<>(partitions.size()); |
| |
| if (checkpointMark != null) { |
| checkState( |
| checkpointMark.getPartitions().size() == partitions.size(), |
| "checkPointMark and assignedPartitions should match"); |
| } |
| |
| for (int i = 0; i < partitions.size(); i++) { |
| TopicPartition tp = partitions.get(i); |
| long nextOffset = UNINITIALIZED_OFFSET; |
| Optional<Instant> prevWatermark = Optional.empty(); |
| |
| if (checkpointMark != null) { |
| // Verify that assigned and check-pointed partitions match exactly and set next offset. |
| |
| PartitionMark ckptMark = checkpointMark.getPartitions().get(i); |
| |
| TopicPartition partition = new TopicPartition(ckptMark.getTopic(), ckptMark.getPartition()); |
| checkState( |
| partition.equals(tp), |
| "checkpointed partition %s and assigned partition %s don't match", |
| partition, |
| tp); |
| nextOffset = ckptMark.getNextOffset(); |
| prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis())); |
| } |
| |
| states.add( |
| new PartitionState<>( |
| tp, |
| nextOffset, |
| source |
| .getSpec() |
| .getTimestampPolicyFactory() |
| .createTimestampPolicy(tp, prevWatermark))); |
| } |
| |
| partitionStates = ImmutableList.copyOf(states); |
| |
| String splitId = String.valueOf(source.getId()); |
| elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId); |
| bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId); |
| backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId); |
| backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId); |
| } |
| |
| private void consumerPollLoop() { |
| // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue. |
| |
| try { |
| ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty(); |
| while (!closed.get()) { |
| try { |
| if (records.isEmpty()) { |
| records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); |
| } else if (availableRecordsQueue.offer( |
| records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { |
| records = ConsumerRecords.empty(); |
| } |
| KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null); |
| if (checkpointMark != null) { |
| commitCheckpointMark(checkpointMark); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("{}: consumer thread is interrupted", this, e); // not expected |
| break; |
| } catch (WakeupException e) { |
| break; |
| } |
| } |
| LOG.info("{}: Returning from consumer pool loop", this); |
| } catch (Exception e) { // mostly an unrecoverable KafkaException. |
| LOG.error("{}: Exception while reading from Kafka", this, e); |
| consumerPollException.set(e); |
| throw e; |
| } |
| } |
| |
| private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) { |
| LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); |
| |
| consumer.commitSync( |
| checkpointMark.getPartitions().stream() |
| .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET) |
| .collect( |
| Collectors.toMap( |
| p -> new TopicPartition(p.getTopic(), p.getPartition()), |
| p -> new OffsetAndMetadata(p.getNextOffset())))); |
| } |
| |
| /** |
| * Enqueue checkpoint mark to be committed to Kafka. This does not block until it is committed. |
| * There could be a delay of up to KAFKA_POLL_TIMEOUT (1 second). Any checkpoint mark enqueued |
| * earlier is dropped in favor of this checkpoint mark. Documentation for {@link |
| * CheckpointMark#finalizeCheckpoint()} says these are finalized in order. Only the latest offsets |
| * need to be committed. |
| */ |
| void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) { |
| if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) { |
| checkpointMarkCommitsSkipped.inc(); |
| } |
| checkpointMarkCommitsEnqueued.inc(); |
| } |
| |
| private void nextBatch() throws IOException { |
| curBatch = Collections.emptyIterator(); |
| |
| ConsumerRecords<byte[], byte[]> records; |
| try { |
| // poll available records, wait (if necessary) up to the specified timeout. |
| records = |
| availableRecordsQueue.poll( |
| RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.warn("{}: Unexpected", this, e); |
| return; |
| } |
| |
| if (records == null) { |
| // Check if the poll thread failed with an exception. |
| if (consumerPollException.get() != null) { |
| throw new IOException("Exception while reading from Kafka", consumerPollException.get()); |
| } |
| return; |
| } |
| |
| partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); |
| |
| // cycle through the partitions in order to interleave records from each. |
| curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); |
| } |
| |
| private void setupInitialOffset(PartitionState pState) { |
| Read<K, V> spec = source.getSpec(); |
| |
| if (pState.nextOffset != UNINITIALIZED_OFFSET) { |
| consumer.seek(pState.topicPartition, pState.nextOffset); |
| } else { |
| // nextOffset is uninitialized here, meaning start reading from latest record as of now |
| // ('latest' is the default, and is configurable) or 'look up offset by startReadTime. |
| // Remember the current position without waiting until the first record is read. This |
| // ensures checkpoint is accurate even if the reader is closed before reading any records. |
| Instant startReadTime = spec.getStartReadTime(); |
| if (startReadTime != null) { |
| pState.nextOffset = |
| consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime()); |
| consumer.seek(pState.topicPartition, pState.nextOffset); |
| } else { |
| pState.nextOffset = consumer.position(pState.topicPartition); |
| } |
| } |
| } |
| |
| // Update latest offset for each partition. |
| // Called from setupInitialOffset() at the start and then periodically from offsetFetcher thread. |
| private void updateLatestOffsets() { |
| for (PartitionState p : partitionStates) { |
| try { |
| Instant fetchTime = Instant.now(); |
| consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition); |
| long offset = offsetConsumer.position(p.topicPartition); |
| p.setLatestOffset(offset, fetchTime); |
| } catch (Exception e) { |
| if (closed.get()) { // Ignore the exception if the reader is closed. |
| break; |
| } |
| LOG.warn( |
| "{}: exception while fetching latest offset for partition {}. will be retried.", |
| this, |
| p.topicPartition, |
| e); |
| // Don't update the latest offset. |
| } |
| } |
| |
| LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); |
| } |
| |
| private void reportBacklog() { |
| long splitBacklogBytes = getSplitBacklogBytes(); |
| if (splitBacklogBytes < 0) { |
| splitBacklogBytes = UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| backlogBytesOfSplit.set(splitBacklogBytes); |
| long splitBacklogMessages = getSplitBacklogMessageCount(); |
| if (splitBacklogMessages < 0) { |
| splitBacklogMessages = UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| backlogElementsOfSplit.set(splitBacklogMessages); |
| } |
| |
| private long getSplitBacklogMessageCount() { |
| long backlogCount = 0; |
| |
| for (PartitionState p : partitionStates) { |
| long pBacklog = p.backlogMessageCount(); |
| if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { |
| return UnboundedReader.BACKLOG_UNKNOWN; |
| } |
| backlogCount += pBacklog; |
| } |
| |
| return backlogCount; |
| } |
| |
| @VisibleForTesting |
| Map<String, Object> getOffsetConsumerConfig() { |
| Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig()); |
| offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); |
| |
| Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG); |
| // override group_id and disable auto_commit so that it does not interfere with main consumer |
| String offsetGroupId = |
| String.format( |
| "%s_offset_consumer_%d_%s", |
| name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); |
| offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); |
| |
| if (source.getSpec().getOffsetConsumerConfig() != null) { |
| offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig()); |
| } |
| |
| // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer |
| // fetches latest offset for two reasons : (a) to calculate backlog (number of records |
| // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do |
| // for (a) is to leave this config unchanged from the main config (i.e. if there are records |
| // that can't be read because of uncommitted records before them, they shouldn't |
| // ideally count towards backlog when "read_committed" is enabled. But (b) |
| // requires finding out if there are any records left to be read (committed or uncommitted). |
| // Rather than using two separate consumers we will go with better support for (b). If we do |
| // hit a case where a lot of records are not readable (due to some stuck transactions), the |
| // pipeline would report more backlog, but would not be able to consume it. It might be ok |
| // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale. |
| offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); |
| |
| return offsetConsumerConfig; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| closed.set(true); |
| consumerPollThread.shutdown(); |
| offsetFetcherThread.shutdown(); |
| |
| boolean isShutdown = false; |
| |
| // Wait for threads to shutdown. Trying this as a loop to handle a tiny race where poll thread |
| // might block to enqueue right after availableRecordsQueue.poll() below. |
| while (!isShutdown) { |
| |
| if (consumer != null) { |
| consumer.wakeup(); |
| } |
| if (offsetConsumer != null) { |
| offsetConsumer.wakeup(); |
| } |
| availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread. |
| try { |
| isShutdown = |
| consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) |
| && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); // not expected |
| } |
| |
| if (!isShutdown) { |
| LOG.warn("An internal thread is taking a long time to shutdown. will retry."); |
| } |
| } |
| |
| Closeables.close(keyDeserializerInstance, true); |
| Closeables.close(valueDeserializerInstance, true); |
| |
| Closeables.close(offsetConsumer, true); |
| Closeables.close(consumer, true); |
| } |
| } |