[FLINK-22358][connector base] Add stability annotations to connector base and iterator sources.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
index 99b40ae..9c2772f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -28,6 +29,7 @@
* @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
* @param <SplitStateT> the mutable type of split state.
*/
+@PublicEvolving
public interface RecordEmitter<E, T, SplitStateT> {
/**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index e5e18dc..293de7c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import javax.annotation.Nullable;
@@ -34,6 +35,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
/** An implementation of RecordsWithSplitIds to host all the records by splits. */
+@PublicEvolving
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
private final Set<String> finishedSplits;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index 88174fd..5233da0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,11 +18,14 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
+
import javax.annotation.Nullable;
import java.util.Set;
/** An interface for the elements passed from the fetchers to the source reader. */
+@PublicEvolving
public interface RecordsWithSplitIds<E> {
/**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index e3b8d43..67632e5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
@@ -55,6 +56,7 @@
* @param <SplitT> The type of the splits processed by the source.
* @param <SplitStateT> The type of the mutable state per split.
*/
+@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 696e21d..416dbb2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
@@ -63,6 +64,7 @@
* @param <SplitT> the immutable split type.
* @param <SplitStateT> the mutable type of split state.
*/
+@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
index fa20439..641acf0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
@@ -18,11 +18,13 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
/** The options that can be set for the {@link SourceReaderBase}. */
+@PublicEvolving
public class SourceReaderOptions {
public static final ConfigOption<Long> SOURCE_READER_CLOSE_TIMEOUT =
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index fade54f..97a072b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
@@ -26,6 +27,7 @@
import java.util.Map;
/** The task to add splits. */
+@Internal
class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<?, SplitT> splitReader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 38140b8..34ac618 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -28,6 +29,7 @@
import java.util.function.Consumer;
/** The default fetch task that fetches the records into the element queue. */
+@Internal
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<E, SplitT> splitReader;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index 2abed2e..f7b49381 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -39,6 +40,7 @@
* via the same client. In the example of the file source, there is a single thread that reads the
* files after another.
*/
+@Internal
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 78b37a2..f2aa237 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -38,6 +39,7 @@
import java.util.function.Consumer;
/** The internal fetcher runnable responsible for polling message from the external system. */
+@Internal
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index a90c69d..3184386 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -50,6 +51,7 @@
* manager would only start a single fetcher and assign all the splits to it. A one-thread-per-split
* fetcher may spawn a new thread every time a new split is assigned.
*/
+@Internal
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index ceb5ec1..62a21d3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,9 +18,12 @@
package org.apache.flink.connector.base.source.reader.fetcher;
+import org.apache.flink.annotation.Internal;
+
import java.io.IOException;
/** An interface similar to {@link Runnable} but allows throwing exceptions and wakeup. */
+@Internal
public interface SplitFetcherTask {
/**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 97b2125..4f2ff6a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.splitreader;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -30,6 +31,7 @@
* @param <E> the element type.
* @param <SplitT> the split type.
*/
+@PublicEvolving
public interface SplitReader<E, SplitT extends SourceSplit> {
/**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
index b26d564..ccc8e19 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.base.source.reader.splitreader;
+import org.apache.flink.annotation.PublicEvolving;
+
import java.util.List;
/**
@@ -25,6 +27,7 @@
*
* @param <SplitT> the split type.
*/
+@PublicEvolving
public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {
public SplitsAddition(List<SplitT> splits) {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
index 4c89a4f..ce9f05f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
@@ -18,10 +18,13 @@
package org.apache.flink.connector.base.source.reader.splitreader;
+import org.apache.flink.annotation.PublicEvolving;
+
import java.util.Collections;
import java.util.List;
/** An abstract class to host splits change. */
+@PublicEvolving
public abstract class SplitsChange<SplitT> {
private final List<SplitT> splits;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index bea5709..9977bcd 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader.synchronization;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;
@@ -69,6 +70,7 @@
*
* @param <T> the type of the elements in the queue.
*/
+@Internal
public class FutureCompletingBlockingQueue<T> {
/**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
index 2bf219d..6b2de99 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.utils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -32,6 +33,7 @@
import java.util.function.Function;
/** A util class with some helper method for serde in the sources. */
+@Internal
public class SerdeUtils {
/** Private constructor for util class. */