blob: 12f5c17b5cbc4ab3734c8c498b28a685fa108eb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
/**
* Pulsar data source.
* Copied from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
* Added with inlong metric support
* @param <T> The type of records produced by this data source.
*/
@Slf4j
public class FlinkPulsarSource<T>
extends
RichParallelSourceFunction<T>
implements
ResultTypeQueryable<T>,
CheckpointListener,
CheckpointedFunction {
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
/** Boolean configuration key to disable metrics tracking. **/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private static final String OFFSETS_STATE_NAME_V3 = "topic-offset-states";
// ------------------------------------------------------------------------
// configuration state, set on the client relevant for all subtasks
// ------------------------------------------------------------------------
protected String adminUrl;
protected ClientConfigurationData clientConfigurationData;
protected final Map<String, String> caseInsensitiveParams;
protected final Map<String, Object> readerConf;
protected volatile PulsarDeserializationSchema<T> deserializer;
private Map<TopicRange, MessageId> ownedTopicStarts;
/**
* Optional watermark strategy that will be run per pulsar partition, to exploit per-partition
* timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
* it into multiple copies.
*/
private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
/** User configured value for discovery interval, in milliseconds. */
private final long discoveryIntervalMillis;
protected final int pollTimeoutMs;
protected final int commitMaxRetries;
/** The startup mode for the reader (default is {@link StartupMode#LATEST}). */
private StartupMode startupMode = StartupMode.LATEST;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private transient Map<TopicRange, MessageId> specificStartupOffsets;
/**
* The subscription name to be used; only relevant when startup mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}
* If the subscription exists for a partition, we would start reading this partition from the subscription cursor.
* At the same time, checkpoint for the job would made progress on the subscription.
*/
private String externalSubscriptionName;
/**
* The subscription position to use when subscription does not exist (default is {@link MessageId#latest});
* Only relevant when startup mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}.
*/
private MessageId subscriptionPosition = MessageId.latest;
// TODO: remove this when MessageId is serializable itself.
// see: https://github.com/apache/pulsar/pull/6064
private Map<TopicRange, byte[]> specificStartupOffsetsAsBytes;
protected final Properties properties;
protected final UUID uuid = UUID.randomUUID();
// ------------------------------------------------------------------------
// runtime state (used individually by each parallel subtask)
// ------------------------------------------------------------------------
/** Data for pending but uncommitted offsets. */
private final LinkedHashMap<Long, Map<TopicRange, MessageId>> pendingOffsetsToCommit = new LinkedHashMap<>();
/** Fetcher implements Pulsar reads. */
private transient volatile PulsarFetcher<T> pulsarFetcher;
/** The partition discoverer, used to find new partitions. */
protected transient volatile PulsarMetadataReader metadataReader;
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
* <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.
*/
private transient volatile TreeMap<TopicRange, MessageId> restoredState;
private transient volatile Set<TopicRange> excludeStartMessageIds;
/**
* Accessor for state in the operator state backend.
*/
private transient ListState<Tuple2<TopicSubscription, MessageId>> unionOffsetStates;
private int oldStateVersion = 2;
private volatile boolean stateSubEqualexternalSub = false;
/** Discovery loop, executed in a separate thread. */
private transient volatile Thread discoveryLoopThread;
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
/**
* Flag indicating whether or not metrics should be exposed.
* If {@code true}, offset metrics (e.g. current offset, committed offset) and
* other metrics will be registered.
*/
private final boolean useMetrics;
/** Counter for successful Pulsar offset commits. */
private transient Counter successfulCommits;
/** Counter for failed Pulsar offset commits. */
private transient Counter failedCommits;
/** Callback interface that will be invoked upon async pulsar commit completion. */
private transient PulsarCommitCallback offsetCommitCallback;
private transient int taskIndex;
private transient int numParallelTasks;
protected String inlongMetric;
protected String auditHostAndPorts;
protected String auditKeys;
protected String serverUrl;
public FlinkPulsarSource(
String adminUrl,
String serverUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties,
String inlongMetric,
String inlongAudit,
String auditKeys) {
this.adminUrl = checkNotNull(adminUrl);
this.clientConfigurationData = checkNotNull(clientConf);
this.deserializer = deserializer;
this.properties = properties;
this.caseInsensitiveParams =
SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties));
this.readerConf =
SourceSinkUtils.getReaderParams(Maps.fromProperties(properties));
this.discoveryIntervalMillis =
SourceSinkUtils.getPartitionDiscoveryIntervalInMillis(caseInsensitiveParams);
this.pollTimeoutMs =
SourceSinkUtils.getPollTimeoutMs(caseInsensitiveParams);
this.commitMaxRetries =
SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
this.useMetrics =
SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
this.serverUrl = serverUrl;
CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
if (this.clientConfigurationData.getServiceUrl() == null) {
throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
}
this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = inlongAudit;
this.auditKeys = auditKeys;
}
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
/**
* Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks
* in a punctuated manner. The watermark extractor will run per Pulsar partition,
* watermarks will be merged across partitions in the same way as in the Flink runtime,
* when streams are merged.
*
* <p>When a subtask of a FlinkPulsarSource source reads multiple Pulsar partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion.
* Per-partition characteristics are usually lost that way.
* For example, if the timestamps are strictly ascending per Pulsar partition,
* they will not be strictly ascending in the resulting Flink DataStream, if the
* parallel source subtask reads more that one partition.
*
* <p>Running timestamp extractors / watermark generators directly inside the Pulsar source,
* per Pulsar partition, allows users to let them exploit the per-partition characteristics.
*
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The reader object, to allow function chaining.
*/
@Deprecated
public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.watermarkStrategy != null) {
throw new IllegalStateException("Some watermark strategy has already been set.");
}
try {
ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
final WatermarkStrategy<T> wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner);
return assignTimestampsAndWatermarks(wms);
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
}
/**
* Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks
* in a punctuated manner. The watermark extractor will run per Pulsar partition,
* watermarks will be merged across partitions in the same way as in the Flink runtime,
* when streams are merged.
*
* <p>When a subtask of a FlinkPulsarSource source reads multiple Pulsar partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion.
* Per-partition characteristics are usually lost that way.
* For example, if the timestamps are strictly ascending per Pulsar partition,
* they will not be strictly ascending in the resulting Flink DataStream,
* if the parallel source subtask reads more that one partition.
*
* <p>Running timestamp extractors / watermark generators directly inside the Pulsar source,
* per Pulsar partition, allows users to let them exploit the per-partition characteristics.
*
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The reader object, to allow function chaining.
*/
@Deprecated
public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.watermarkStrategy != null) {
throw new IllegalStateException("Some watermark strategy has already been set.");
}
try {
ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
final WatermarkStrategy<T> wms = new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner);
return assignTimestampsAndWatermarks(wms);
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
}
/**
* Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign
* timestamps to records and generates watermarks to signal event time progress.
*
* <p>Running timestamp extractors / watermark generators directly inside the Pulsar source
* (which you can do by using this method), per Pulsar partition, allows users to let them
* exploit the per-partition characteristics.
*
* <p>When a subtask of a FlinkPulsarSource reads multiple pulsar partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion.
* Per-partition characteristics are usually lost that way. For example, if the timestamps are
* strictly ascending per Pulsar partition, they will not be strictly ascending in the resulting
* Flink DataStream, if the parallel source subtask reads more than one partition.
*
* <p>Common watermark generation patterns can be found as static methods in the
* {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} class.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkPulsarSource<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
checkNotNull(watermarkStrategy);
try {
ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
} catch (Exception e) {
throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
}
return this;
}
public FlinkPulsarSource<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.specificStartupOffsets = null;
return this;
}
public FlinkPulsarSource<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.specificStartupOffsets = null;
return this;
}
public FlinkPulsarSource<T> setStartFromSpecificOffsets(Map<String, MessageId> specificStartupOffsets) {
checkNotNull(specificStartupOffsets);
this.specificStartupOffsets = specificStartupOffsets.entrySet()
.stream()
.collect(Collectors.toMap(e -> new TopicRange(e.getKey()), Map.Entry::getValue));
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.specificStartupOffsetsAsBytes = new HashMap<>();
for (Map.Entry<TopicRange, MessageId> entry : this.specificStartupOffsets.entrySet()) {
specificStartupOffsetsAsBytes.put(entry.getKey(), entry.getValue().toByteArray());
}
return this;
}
public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName) {
this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
return this;
}
public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName,
MessageId subscriptionPosition) {
this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
this.subscriptionPosition = checkNotNull(subscriptionPosition);
return this;
}
// ------------------------------------------------------------------------
// Work methods
// ------------------------------------------------------------------------
private MetricState metricState;
private SourceMetricData sourceMetricData;
@Override
public void open(Configuration parameters) throws Exception {
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.withRegisterMetric(RegisteredMetric.ALL)
.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.build();
if (metricOption != null) {
log.info("init source");
sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
if (this.deserializer != null) {
DynamicPulsarDeserializationSchema dynamicKafkaDeserializationSchema =
(DynamicPulsarDeserializationSchema) deserializer;
dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
this.deserializer.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext(),
metricGroup -> metricGroup.addGroup("user")));
}
this.taskIndex = getRuntimeContext().getIndexOfThisSubtask();
this.numParallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
this.metadataReader = createMetadataReader();
ownedTopicStarts = new HashMap<>();
excludeStartMessageIds = new HashSet<>();
Set<TopicRange> allTopics = metadataReader.discoverTopicChanges();
if (specificStartupOffsets == null && specificStartupOffsetsAsBytes != null) {
specificStartupOffsets = new HashMap<>();
for (Map.Entry<TopicRange, byte[]> entry : specificStartupOffsetsAsBytes.entrySet()) {
specificStartupOffsets.put(entry.getKey(), MessageId.fromByteArray(entry.getValue()));
}
}
Map<TopicRange, MessageId> allTopicOffsets =
offsetForEachTopic(allTopics, startupMode, specificStartupOffsets);
boolean usingRestoredState = (startupMode != StartupMode.EXTERNAL_SUBSCRIPTION) || stateSubEqualexternalSub;
if (restoredState != null && usingRestoredState) {
allTopicOffsets.entrySet().stream()
.filter(e -> !restoredState.containsKey(e.getKey()))
.forEach(e -> restoredState.put(e.getKey(), e.getValue()));
SerializableRange subTaskRange = metadataReader.getRange();
restoredState.entrySet().stream()
.filter(
e -> SourceSinkUtils.belongsTo(
e.getKey().getTopic(),
subTaskRange,
numParallelTasks,
taskIndex))
.forEach(
e -> {
TopicRange tr =
new TopicRange(
e.getKey().getTopic(),
subTaskRange.getPulsarRange());
ownedTopicStarts.put(tr, e.getValue());
excludeStartMessageIds.add(e.getKey());
});
Set<TopicRange> goneTopics =
Sets.difference(restoredState.keySet(), allTopics).stream()
.filter(
k -> SourceSinkUtils.belongsTo(
k.getTopic(),
subTaskRange,
numParallelTasks,
taskIndex))
.map(k -> new TopicRange(k.getTopic(), subTaskRange.getPulsarRange()))
.collect(Collectors.toSet());
for (TopicRange goneTopic : goneTopics) {
log.warn(goneTopic + " is removed from subscription since " +
"it no longer matches with topics settings.");
ownedTopicStarts.remove(goneTopic);
}
log.info("Source {} will start reading {} topics in restored state {}",
taskIndex, ownedTopicStarts.size(), StringUtils.join(ownedTopicStarts.entrySet()));
} else {
ownedTopicStarts.putAll(
allTopicOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (ownedTopicStarts.isEmpty()) {
log.info("Source {} initially has no topics to read from.", taskIndex);
} else {
log.info("Source {} will start reading {} topics from initialized positions: {}",
taskIndex, ownedTopicStarts.size(), ownedTopicStarts);
}
}
}
protected String getSubscriptionName() {
if (startupMode == StartupMode.EXTERNAL_SUBSCRIPTION) {
checkNotNull(externalSubscriptionName);
return externalSubscriptionName;
} else {
return "flink-pulsar-" + uuid.toString();
}
}
protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
return new PulsarMetadataReader(
adminUrl,
serverUrl,
clientConfigurationData,
getSubscriptionName(),
caseInsensitiveParams,
taskIndex,
numParallelTasks,
startupMode == StartupMode.EXTERNAL_SUBSCRIPTION);
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
if (ownedTopicStarts == null) {
throw new Exception("The partitions were not set for the source");
}
this.successfulCommits =
this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits =
this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
this.offsetCommitCallback = new PulsarCommitCallback() {
@Override
public void onSuccess() {
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
log.warn("source {} failed commit by {}", taskIndex, cause.toString());
failedCommits.inc();
}
};
if (ownedTopicStarts.isEmpty()) {
ctx.markAsTemporarilyIdle();
}
log.info("Source {} creating fetcher with offsets {}",
taskIndex,
StringUtils.join(ownedTopicStarts.entrySet()));
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Pulsar through the fetcher, if configured to do so)
StreamingRuntimeContext streamingRuntime = (StreamingRuntimeContext) getRuntimeContext();
this.pulsarFetcher = createFetcher(
ctx,
ownedTopicStarts,
watermarkStrategy,
streamingRuntime.getProcessingTimeService(),
streamingRuntime.getExecutionConfig().getAutoWatermarkInterval(),
getRuntimeContext().getUserCodeClassLoader(),
streamingRuntime,
useMetrics,
excludeStartMessageIds,
getSubscriptionName());
if (!running) {
return;
}
if (discoveryIntervalMillis < 0) {
pulsarFetcher.runFetchLoop();
} else {
runWithTopicsDiscovery();
}
}
protected PulsarFetcher<T> createFetcher(
SourceContext<T> sourceContext,
Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
StreamingRuntimeContext streamingRuntime,
boolean useMetrics,
Set<TopicRange> excludeStartMessageIds,
String subscriptionName) throws Exception {
// readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName());
return new PulsarFetcher<>(
sourceContext,
seedTopicsWithInitialOffsets,
excludeStartMessageIds,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
streamingRuntime,
clientConfigurationData,
readerConf,
subscriptionName,
pollTimeoutMs,
commitMaxRetries,
deserializer,
metadataReader,
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP),
useMetrics);
}
public void joinDiscoveryLoopThread() throws InterruptedException {
if (discoveryLoopThread != null) {
discoveryLoopThread.join();
}
}
public void runWithTopicsDiscovery() throws Exception {
AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
createAndStartDiscoveryLoop(discoveryLoopErrorRef);
pulsarFetcher.runFetchLoop();
joinDiscoveryLoopThread();
Exception discoveryLoopError = discoveryLoopErrorRef.get();
if (discoveryLoopError != null) {
throw new RuntimeException(discoveryLoopError);
}
}
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
discoveryLoopThread = new Thread(
() -> {
try {
while (running) {
Set<TopicRange> added = metadataReader.discoverTopicChanges();
if (running && !added.isEmpty()) {
pulsarFetcher.addDiscoveredTopics(added);
}
if (running && discoveryIntervalMillis != -1) {
Thread.sleep(discoveryIntervalMillis);
}
}
} catch (PulsarMetadataReader.ClosedException e) {
// break out while and do nothing
} catch (InterruptedException e) {
// break out while and do nothing
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
if (running) {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
cancel();
}
}
}, "Pulsar topic discovery for source " + taskIndex);
discoveryLoopThread.start();
}
@Override
public void close() throws Exception {
cancel();
joinDiscoveryLoopThread();
Exception exception = null;
if (metadataReader != null) {
try {
metadataReader.close();
} catch (Exception e) {
exception = e;
}
}
try {
super.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (exception != null) {
throw exception;
}
}
@Override
public void cancel() {
running = false;
if (discoveryLoopThread != null) {
discoveryLoopThread.interrupt();
}
if (pulsarFetcher != null) {
try {
pulsarFetcher.cancel();
} catch (Exception e) {
log.error("Failed to cancel the Pulsar Fetcher {}", ExceptionUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
}
// ------------------------------------------------------------------------
// ResultTypeQueryable methods
// ------------------------------------------------------------------------
@Override
public TypeInformation<T> getProducedType() {
return deserializer.getProducedType();
}
// ------------------------------------------------------------------------
// Checkpoint and restore
// ------------------------------------------------------------------------
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
unionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME_V3,
createStateSerializer()));
if (context.isRestored()) {
restoredState = new TreeMap<>();
Iterator<Tuple2<TopicSubscription, MessageId>> iterator = unionOffsetStates.get().iterator();
if (!iterator.hasNext()) {
iterator = tryMigrateState(stateStore);
}
while (iterator.hasNext()) {
final Tuple2<TopicSubscription, MessageId> tuple2 = iterator.next();
final SerializableRange range =
tuple2.f0.getRange() != null ? tuple2.f0.getRange() : SerializableRange.ofFullRange();
final TopicRange topicRange =
new TopicRange(tuple2.f0.getTopic(), range.getPulsarRange());
restoredState.put(topicRange, tuple2.f1);
String subscriptionName = tuple2.f0.getSubscriptionName();
if (!stateSubEqualexternalSub && StringUtils.equals(subscriptionName, externalSubscriptionName)) {
stateSubEqualexternalSub = true;
log.info("Source restored state with subscriptionName {}", subscriptionName);
}
}
log.info("Source subtask {} restored state {}",
taskIndex,
StringUtils.join(restoredState.entrySet()));
} else {
log.info("Source subtask {} has no restore state", taskIndex);
}
}
@VisibleForTesting
static TupleSerializer<Tuple2<TopicSubscription, MessageId>> createStateSerializer() {
// explicit serializer will keep the compatibility with GenericTypeInformation and allow to
// disableGenericTypes for users
TypeSerializer<?>[] fieldSerializers =
new TypeSerializer<?>[]{
TopicSubscriptionSerializer.INSTANCE,
MessageIdSerializer.INSTANCE
};
@SuppressWarnings("unchecked")
Class<Tuple2<TopicSubscription, MessageId>> tupleClass =
(Class<Tuple2<TopicSubscription, MessageId>>) (Class<?>) Tuple2.class;
return new TupleSerializer<>(tupleClass, fieldSerializers);
}
/**
* Try to restore the old save point.
*
* @param stateStore state store
* @return state data
* @throws Exception Type incompatibility, serialization failure
*/
private Iterator<Tuple2<TopicSubscription, MessageId>> tryMigrateState(OperatorStateStore stateStore)
throws Exception {
log.info("restore old state version {}", oldStateVersion);
PulsarSourceStateSerializer stateSerializer =
new PulsarSourceStateSerializer(getRuntimeContext().getExecutionConfig());
// Since stateStore.getUnionListState gets the data of a state point,
// it can only be registered once and will fail to register again,
// so it only allows the user to set a version number.
ListState<?> rawStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
stateSerializer.getSerializer(oldStateVersion)));
ListState<String> oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
final Iterator<String> subNameIterator = oldUnionSubscriptionNameStates.get().iterator();
Iterator<?> tuple2s = rawStates.get().iterator();
log.info("restore old state has data {}", tuple2s.hasNext());
final List<Tuple2<TopicSubscription, MessageId>> records = new ArrayList<>();
while (tuple2s.hasNext()) {
final Object next = tuple2s.next();
Tuple2<TopicSubscription, MessageId> tuple2 = stateSerializer.deserialize(oldStateVersion, next);
String subName = tuple2.f0.getSubscriptionName();
if (subNameIterator.hasNext()) {
subName = subNameIterator.next();
}
final TopicSubscription topicSubscription = TopicSubscription.builder()
.topic(tuple2.f0.getTopic())
.range(tuple2.f0.getRange())
.subscriptionName(subName)
.build();
final Tuple2<TopicSubscription, MessageId> record = Tuple2.of(topicSubscription, tuple2.f1);
log.info("migrationState {}", record);
records.add(record);
}
rawStates.clear();
oldUnionSubscriptionNameStates.clear();
return records.listIterator();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
log.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
PulsarFetcher<T> fetcher = this.pulsarFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<TopicRange, MessageId> entry : ownedTopicStarts.entrySet()) {
final TopicSubscription topicSubscription = TopicSubscription.builder()
.topic(entry.getKey().getTopic())
.range(entry.getKey().getRange())
.subscriptionName(getSubscriptionName())
.build();
unionOffsetStates.add(Tuple2.of(topicSubscription, entry.getValue()));
}
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
} else {
Map<TopicRange, MessageId> currentOffsets = fetcher.snapshotCurrentState();
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
for (Map.Entry<TopicRange, MessageId> entry : currentOffsets.entrySet()) {
final TopicSubscription topicSubscription = TopicSubscription.builder()
.topic(entry.getKey().getTopic())
.range(entry.getKey().getRange())
.subscriptionName(getSubscriptionName())
.build();
unionOffsetStates.add(Tuple2.of(topicSubscription, entry.getValue()));
}
int exceed = pendingOffsetsToCommit.size() - MAX_NUM_PENDING_CHECKPOINTS;
Iterator<Long> iterator = pendingOffsetsToCommit.keySet().iterator();
while (iterator.hasNext() && exceed > 0) {
iterator.next();
iterator.remove();
}
}
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
log.info("notifyCheckpointComplete() called on closed source");
return;
}
PulsarFetcher<T> fetcher = this.pulsarFetcher;
if (fetcher == null) {
log.info("notifyCheckpointComplete() called on uninitialized source");
return;
}
log.debug("Source {} received confirmation for unknown checkpoint id {}",
taskIndex, checkpointId);
try {
if (!pendingOffsetsToCommit.containsKey(checkpointId)) {
log.warn("Source {} received confirmation for unknown checkpoint id {}",
taskIndex, checkpointId);
return;
}
Map<TopicRange, MessageId> offset = pendingOffsetsToCommit.get(checkpointId);
// remove older checkpoints in map
Iterator<Long> iterator = pendingOffsetsToCommit.keySet().iterator();
while (iterator.hasNext()) {
Long key = iterator.next();
iterator.remove();
if (Objects.equals(key, checkpointId)) {
break;
}
}
if (offset == null || offset.size() == 0) {
log.debug("Source {} has empty checkpoint state", taskIndex);
return;
}
fetcher.commitOffsetToPulsar(offset, offsetCommitCallback);
} catch (Exception e) {
if (running) {
throw e;
}
}
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
log.error("checkpoint aborted, checkpointId: {}", checkpointId);
}
public Map<TopicRange, MessageId> offsetForEachTopic(
Set<TopicRange> topics,
StartupMode mode,
Map<TopicRange, MessageId> specificStartupOffsets) {
switch (mode) {
case LATEST:
return topics.stream()
.collect(Collectors.toMap(k -> k, k -> MessageId.latest));
case EARLIEST:
return topics.stream()
.collect(Collectors.toMap(k -> k, k -> MessageId.earliest));
case SPECIFIC_OFFSETS:
checkArgument(topics.containsAll(specificStartupOffsets.keySet()),
String.format(
"Topics designated in startingOffsets should appear in %s, topics:" +
"%s, topics in offsets: %s",
StringUtils.join(PulsarOptions.TOPIC_OPTION_KEYS),
StringUtils.join(topics.toArray()),
StringUtils.join(specificStartupOffsets.entrySet().toArray())));
Map<TopicRange, MessageId> specificOffsets = new HashMap<>();
for (TopicRange topic : topics) {
if (specificStartupOffsets.containsKey(topic)) {
specificOffsets.put(topic, specificStartupOffsets.get(topic));
} else {
specificOffsets.put(topic, MessageId.latest);
}
}
return specificOffsets;
case EXTERNAL_SUBSCRIPTION:
Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
for (TopicRange topic : topics) {
offsetsFromSubs.put(
topic,
metadataReader.getPositionFromSubscription(
topic, subscriptionPosition));
}
log.info("offset for each topic: {}", offsetsFromSubs);
return offsetsFromSubs;
}
return null;
}
public Map<Long, Map<TopicRange, MessageId>> getPendingOffsetsToCommit() {
return pendingOffsetsToCommit;
}
public Map<TopicRange, MessageId> getOwnedTopicStarts() {
return ownedTopicStarts;
}
}