blob: 492fc3bf65e61f3f69207e9e522641402727dc81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
import org.apache.flink.streaming.connectors.pulsar.internal.PoisonState;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicPartitionStateWithWatermarkGenerator;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceContextWatermarkOutputAdapter;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITTED_OFFSETS_METRICS_GAUGE;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.CURRENT_OFFSETS_METRICS_GAUGE;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.OFFSETS_BY_TOPIC_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9,
* From {@link org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher}
* Implements the logic around emitting records and tracking offsets,
* as well as around the optional timestamp assignment and watermark generation.
*
* @param <T> The type of elements deserialized from Pulsar messages, and emitted into
* the Flink data stream.
*/
public class PulsarFetcher<T> {
private static final Logger log = LoggerFactory.getLogger(PulsarFetcher.class);
private static final int NO_TIMESTAMPS_WATERMARKS = 0;
private static final int WITH_WATERMARK_GENERATOR = 1;
// ------------------------------------------------------------------------
/** The source context to emit records and watermarks to. */
protected final SourceContext<T> sourceContext;
protected final Map<TopicRange, MessageId> seedTopicsWithInitialOffsets;
protected final Set<TopicRange> excludeStartMessageIds;
/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
private final Object checkpointLock;
/** All partitions (and their state) that this fetcher is subscribed to. */
protected final List<PulsarTopicState<T>> subscribedPartitionStates;
/**
* Queue of partitions that are not yet assigned to any reader thread for consuming.
*
* <p>All partitions added to this queue are guaranteed to have been added
* to {@link #subscribedPartitionStates} already.
*/
protected final ClosableBlockingQueue<PulsarTopicState<T>> unassignedPartitionsQueue;
/** The mode describing whether the fetcher also generates timestamps and watermarks. */
private final int timestampWatermarkMode;
/**
* Optional watermark strategy that will be run per pulsar partition, to exploit per-partition
* timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
* it into multiple copies.
*/
private final SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
/** User class loader used to deserialize watermark assigners. */
private final ClassLoader userCodeClassLoader;
private final StreamingRuntimeContext runtimeContext;
protected final ClientConfigurationData clientConf;
protected final Map<String, Object> readerConf;
protected final String subscriptionName;
protected final PulsarDeserializationSchema<T> deserializer;
protected final int pollTimeoutMs;
private final int commitMaxRetries;
protected final PulsarMetadataReader metadataReader;
/**
* Wrapper around our SourceContext for allowing the
* {@link org.apache.flink.api.common.eventtime.WatermarkGenerator}
* to emit watermarks and mark idleness.
*/
protected final WatermarkOutput watermarkOutput;
/**
* {@link WatermarkOutputMultiplexer} for supporting per-partition watermark generation.
*/
private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
/** Flag to mark the main work loop as alive. */
private volatile boolean running = true;
/** The threads that runs the actual reading and hand the records to this fetcher. */
private Map<TopicRange, ReaderThread<T>> topicToThread;
/** Failed or not when data loss. **/
private boolean failOnDataLoss = true;
private boolean useEarliestWhenDataLoss;
/** topic poison state */
private PoisonState poisonState;
// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
/**
* Flag indicating whether or not metrics should be exposed.
* If {@code true}, offset metrics (e.g. current offset, committed offset) and
* pulsar-shipped metrics will be registered.
*/
private final boolean useMetrics;
/**
* The metric group which all metrics for the source should be registered to.
*/
private final MetricGroup consumerMetricGroup;
public PulsarFetcher(
SourceContext<T> sourceContext,
Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
StreamingRuntimeContext runtimeContext,
ClientConfigurationData clientConf,
Map<String, Object> readerConf,
int pollTimeoutMs,
PulsarDeserializationSchema<T> deserializer,
PulsarMetadataReader metadataReader,
MetricGroup consumerMetricGroup,
boolean useMetrics,
String subscriptionName) throws Exception {
this(
sourceContext,
seedTopicsWithInitialOffsets,
Collections.emptySet(),
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
runtimeContext,
clientConf,
readerConf,
subscriptionName,
pollTimeoutMs,
3, // commit retries before fail
deserializer,
metadataReader,
consumerMetricGroup,
useMetrics);
}
public PulsarFetcher(
SourceContext<T> sourceContext,
Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
Set<TopicRange> excludeStartMessageIds,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
StreamingRuntimeContext runtimeContext,
ClientConfigurationData clientConf,
Map<String, Object> readerConf,
String subscriptionName,
int pollTimeoutMs,
int commitMaxRetries,
PulsarDeserializationSchema<T> deserializer,
PulsarMetadataReader metadataReader,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
this.sourceContext = sourceContext;
this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
this.subscriptionName = subscriptionName;
this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets;
this.excludeStartMessageIds = excludeStartMessageIds;
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = userCodeClassLoader;
this.runtimeContext = runtimeContext;
this.clientConf = clientConf;
this.readerConf = readerConf == null ? new HashMap<>() : readerConf;
this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf);
this.useEarliestWhenDataLoss = SourceSinkUtils.getUseEarliestWhenDataLossAndRemoveKey(this.readerConf);
this.pollTimeoutMs = pollTimeoutMs;
this.commitMaxRetries = commitMaxRetries;
this.deserializer = deserializer;
this.metadataReader = metadataReader;
// figure out what we watermark mode we will be using
this.watermarkStrategy = watermarkStrategy;
if (watermarkStrategy == null) {
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
}
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
seedTopicsWithInitialOffsets,
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader);
// check that all seed partition states have a defined offset
for (PulsarTopicState<T> state : subscribedPartitionStates) {
if (!state.isOffsetDefined()) {
throw new IllegalArgumentException(
"The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
for (PulsarTopicState<T> state : subscribedPartitionStates) {
unassignedPartitionsQueue.add(state);
}
// register metrics for the initial seed partitions
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T> periodicEmitter = new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedPartitionStates,
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
periodicEmitter.start();
}
// get position state
Field poisonInstance = PoisonState.class.getDeclaredField("INSTANCE");
poisonInstance.setAccessible(true);
poisonState = (PoisonState) poisonInstance.get(PoisonState.class);
}
public void runFetchLoop() throws Exception {
topicToThread = new HashMap<>();
ExceptionProxy exceptionProxy = new ExceptionProxy(Thread.currentThread());
try {
while (running) {
// re-throw any exception from the concurrent fetcher threads
exceptionProxy.checkAndThrowException();
// wait for max 5 seconds trying to get partitions to assign
// if threads shut down, this poll returns earlier, because the threads inject the
// special marker into the queue
List<PulsarTopicState<T>> topicsToAssign = unassignedPartitionsQueue.getBatchBlocking(5000);
// if there are more markers, remove them all
topicsToAssign.removeIf(s -> s.equals(poisonState));
if (!topicsToAssign.isEmpty()) {
if (!running) {
throw BreakingException.INSTANCE;
}
topicToThread.putAll(
createAndStartReaderThread(topicsToAssign, exceptionProxy));
} else {
// there were no partitions to assign. Check if any consumer threads shut down.
// we get into this section of the code, if either the poll timed out, or the
// blocking poll was woken up by the marker element
topicToThread.values().removeIf(t -> !t.isRunning());
}
if (topicToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
PulsarTopicState topicForBlocking = unassignedPartitionsQueue.getElementBlocking();
if (topicForBlocking.equals(poisonState)) {
throw BreakingException.INSTANCE;
}
topicToThread.putAll(
createAndStartReaderThread(ImmutableList.of(topicForBlocking), exceptionProxy));
}
}
} catch (BreakingException b) {
// do nothing
} catch (InterruptedException e) {
// this may be thrown because an exception on one of the concurrent fetcher threads
// woke this thread up. make sure we throw the root exception instead in that case
exceptionProxy.checkAndThrowException();
// no other root exception, throw the interrupted exception
throw e;
} finally {
running = false;
// clear the interruption flag
// this allows the joining on reader threads (on best effort) to happen in
// case the initial interrupt already
Thread.interrupted();
// make sure that in any case (completion, abort, error), all spawned threads are stopped
try {
int runningThreads = 0;
do { // check whether threads are alive and cancel them
runningThreads = 0;
topicToThread.values().removeIf(s -> !s.isAlive());
for (ReaderThread t : topicToThread.values()) {
t.cancel();
runningThreads++;
}
if (runningThreads > 0) {
for (ReaderThread t : topicToThread.values()) {
t.join(500 / runningThreads + 1);
}
}
} while (runningThreads > 0);
} catch (InterruptedException ignored) {
// waiting for the thread shutdown apparently got interrupted
// restore interrupted state and continue
Thread.currentThread().interrupt();
} catch (Throwable t) {
// we catch all here to preserve the original exception
log.error("Exception while shutting down reader threads", t);
}
}
}
// ------------------------------------------------------------------------
// emitting records
// ------------------------------------------------------------------------
/**
* Emits a record attaching a timestamp to it.
* @param records The records to emit
* @param partitionState The state of the pulsar partition from which the record was fetched
* @param offset The offset of the corresponding pulsar record
* @param pulsarEventTimestamp The timestamp of the pulsar record
*/
protected void emitRecordsWithTimestamps(
Queue<T> records,
PulsarTopicState<T> partitionState,
MessageId offset,
long pulsarEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
long timestamp = partitionState.extractTimestamp(record, pulsarEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
// this might emit a watermark, so do it after emitting the record
partitionState.onEvent(record, timestamp);
}
partitionState.setOffset(offset);
}
}
public void cancel() throws Exception {
// single the main thread to exit
running = false;
// make sure the main thread wakes up soon
unassignedPartitionsQueue.addIfOpen(poisonState);
}
public void commitOffsetToState(Map<TopicRange, MessageId> offset) {
for (PulsarTopicState state : subscribedPartitionStates) {
MessageId off = offset.get(state.getTopicRange());
if (off != null) {
state.setCommittedOffset(off);
}
}
}
public void commitOffsetToPulsar(
Map<TopicRange, MessageId> offset,
PulsarCommitCallback offsetCommitCallback) {
doCommitOffsetToPulsar(removeEarliestAndLatest(offset), offsetCommitCallback);
}
public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> offset) {
Map<TopicRange, MessageId> result = new HashMap<>();
for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
MessageId mid = entry.getValue();
if (!mid.equals(MessageId.earliest) && !mid.equals(MessageId.latest)) {
result.put(entry.getKey(), mid);
}
}
return result;
}
public void addDiscoveredTopics(Set<TopicRange> newTopics) throws IOException, ClassNotFoundException {
List<PulsarTopicState<T>> newStates = createPartitionStateHolders(
newTopics.stream().collect(Collectors.toMap(t -> t, t -> MessageId.earliest)),
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader);
for (PulsarTopicState state : newStates) {
// the ordering is crucial here; first register the state holder, then
// push it to the partitions queue to be read
subscribedPartitionStates.add(state);
unassignedPartitionsQueue.add(state);
}
}
// ------------------------------------------------------------------------
// snapshot and restore the state
// ------------------------------------------------------------------------
/**
* Takes a snapshot of the partition offsets.
*
* <p>Important: This method must be called under the checkpoint lock.
*
* @return A map from partition to current offset.
*/
public Map<TopicRange, MessageId> snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
Map<TopicRange, MessageId> state = new HashMap<>(subscribedPartitionStates.size());
for (PulsarTopicState pa : subscribedPartitionStates) {
state.put(pa.getTopicRange(), pa.getOffset());
}
return state;
}
public Map<TopicRange, ReaderThread<T>> createAndStartReaderThread(
List<PulsarTopicState<T>> states,
ExceptionProxy exceptionProxy) {
Map<TopicRange, ReaderThread<T>> topic2Threads = new HashMap<>();
for (PulsarTopicState state : states) {
ReaderThread<T> readerT = createReaderThread(exceptionProxy, state);
readerT.setName(String.format(
"Pulsar Reader for %s in task %s",
state.getTopicRange(),
runtimeContext.getTaskName()));
readerT.setDaemon(true);
readerT.start();
log.info("Starting Thread {}", readerT.getName());
topic2Threads.put(state.getTopicRange(), readerT);
}
return topic2Threads;
}
protected List<PulsarTopicState<T>> getSubscribedTopicStates() {
return subscribedPartitionStates;
}
protected ReaderThread<T> createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState state) {
return new ReaderThread<>(
this,
state,
clientConf,
readerConf,
deserializer,
pollTimeoutMs,
exceptionProxy,
failOnDataLoss,
useEarliestWhenDataLoss,
excludeStartMessageIds.contains(state.getTopicRange()), subscriptionName);
}
/**
* Utility method that takes the topic partitions and creates the topic partition state
* holders, depending on the timestamp / watermark mode.
*/
private List<PulsarTopicState<T>> createPartitionStateHolders(
Map<TopicRange, MessageId> partitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
// CopyOnWrite as adding discovered partitions could happen in parallel
// while different threads iterate the partitions list
List<PulsarTopicState<T>> partitionStates = new CopyOnWriteArrayList<>();
switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
for (Map.Entry<TopicRange, MessageId> partitionEntry : partitionsToInitialOffsets.entrySet()) {
PulsarTopicState<T> state = new PulsarTopicState(partitionEntry.getKey());
state.setOffset(partitionEntry.getValue());
partitionStates.add(state);
}
return partitionStates;
}
case WITH_WATERMARK_GENERATOR: {
for (Map.Entry<TopicRange, MessageId> partitionEntry : partitionsToInitialOffsets.entrySet()) {
final TopicRange topicRange = partitionEntry.getKey();
PulsarTopicState<T> state = new PulsarTopicState(partitionEntry.getKey());
WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
userCodeClassLoader);
// the format of the ID does not matter, as long as it is unique
final String partitionId = state.getTopicRange().toString();
watermarkOutputMultiplexer.registerNewOutput(partitionId);
WatermarkOutput immediateOutput =
watermarkOutputMultiplexer.getImmediateOutput(partitionId);
WatermarkOutput deferredOutput =
watermarkOutputMultiplexer.getDeferredOutput(partitionId);
PulsarTopicPartitionStateWithWatermarkGenerator<T> partitionState =
new PulsarTopicPartitionStateWithWatermarkGenerator<>(
topicRange,
state,
deserializedWatermarkStrategy.createTimestampAssigner(() -> consumerMetricGroup),
deserializedWatermarkStrategy.createWatermarkGenerator(() -> consumerMetricGroup),
immediateOutput,
deferredOutput);
partitionState.setOffset(partitionEntry.getValue());
partitionStates.add(partitionState);
}
return partitionStates;
}
default:
// cannot happen, add this as a guard for the future
throw new RuntimeException();
}
}
// ------------------------- Metrics ----------------------------------
/**
* For each partition, register a new metric group to expose current offsets and committed offsets.
* Per-partition metric groups can be scoped by user variables.
*
* <p>Note: this method also registers gauges for deprecated offset metrics, to maintain backwards compatibility.
*
* @param consumerMetricGroup The consumer metric group
* @param partitionOffsetStates The partition offset state holders, whose values will be used to update metrics
*/
private void registerOffsetMetrics(
MetricGroup consumerMetricGroup,
List<PulsarTopicState<T>> partitionOffsetStates) {
for (PulsarTopicState<T> pts : partitionOffsetStates) {
MetricGroup topicPartitionGroup = consumerMetricGroup
.addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, pts.getTopicRange().getTopic());
topicPartitionGroup.gauge(
CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(pts, OffsetGaugeType.CURRENT_OFFSET));
topicPartitionGroup.gauge(
COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(pts, OffsetGaugeType.COMMITTED_OFFSET));
}
}
/**
* Gauge types.
*/
private enum OffsetGaugeType {
CURRENT_OFFSET,
COMMITTED_OFFSET
}
/**
* Gauge for getting the offset of a PulsarTopicState.
*/
private static class OffsetGauge implements Gauge<MessageId> {
private final PulsarTopicState<?> pts;
private final OffsetGaugeType gaugeType;
OffsetGauge(PulsarTopicState<?> pts, OffsetGaugeType gaugeType) {
this.pts = pts;
this.gaugeType = gaugeType;
}
@Override
public MessageId getValue() {
switch (gaugeType) {
case COMMITTED_OFFSET:
return pts.getCommittedOffset();
case CURRENT_OFFSET:
return pts.getOffset();
default:
throw new RuntimeException("Unknown gauge type: " + gaugeType);
}
}
}
// ------------------------------------------------------------------------
/**
* The periodic watermark emitter. In its given interval, it checks all partitions for
* the current event time watermark, and possibly emits the next watermark.
*/
private static class PeriodicWatermarkEmitter<T> implements ProcessingTimeCallback {
private final Object checkpointLock;
private final List<PulsarTopicState<T>> allPartitions;
private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
private final ProcessingTimeService timerService;
private final long interval;
// -------------------------------------------------
PeriodicWatermarkEmitter(
Object checkpointLock,
List<PulsarTopicState<T>> allPartitions,
WatermarkOutputMultiplexer watermarkOutputMultiplexer,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
this.checkpointLock = checkpointLock;
this.allPartitions = checkNotNull(allPartitions);
this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
this.timerService = checkNotNull(timerService);
this.interval = autoWatermarkInterval;
}
// -------------------------------------------------
public void start() {
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
synchronized (checkpointLock) {
for (PulsarTopicState<?> state : allPartitions) {
state.onPeriodicEmit();
}
watermarkOutputMultiplexer.onPeriodicEmit();
}
// schedule the next watermark
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}
}
private static class BreakingException extends Exception {
static final BreakingException INSTANCE = new BreakingException();
private BreakingException() {
}
}
protected void doCommitOffsetToPulsar(
Map<TopicRange, MessageId> offset,
PulsarCommitCallback offsetCommitCallback) {
try {
int retries = 0;
boolean success = false;
while (running) {
try {
metadataReader.commitOffsetToCursor(offset);
success = true;
break;
} catch (Exception e) {
log.warn("Failed to commit cursor to Pulsar.", e);
if (retries >= commitMaxRetries) {
log.error("Failed to commit cursor to Pulsar after {} attempts", retries);
throw e;
}
retries += 1;
Thread.sleep(1000);
}
}
if (success) {
offsetCommitCallback.onSuccess();
} else {
return;
}
} catch (Exception e) {
if (running) {
offsetCommitCallback.onException(e);
} else {
return;
}
}
for (PulsarTopicState state : subscribedPartitionStates) {
MessageId off = offset.get(state.getTopicRange());
if (off != null) {
state.setCommittedOffset(off);
}
}
}
public PulsarMetadataReader getMetaDataReader() {
return this.metadataReader;
}
}