[FLINK-22358][connector base] Add stability annotations to connector base and iterator sources.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
index 99b40ae..9c2772f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 
@@ -28,6 +29,7 @@
  * @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
  * @param <SplitStateT> the mutable type of split state.
  */
+@PublicEvolving
 public interface RecordEmitter<E, T, SplitStateT> {
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index e5e18dc..293de7c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceSplit;
 
 import javax.annotation.Nullable;
@@ -34,6 +35,7 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** An implementation of RecordsWithSplitIds to host all the records by splits. */
+@PublicEvolving
 public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
 
     private final Set<String> finishedSplits;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index 88174fd..5233da0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
 
 /** An interface for the elements passed from the fetchers to the source reader. */
+@PublicEvolving
 public interface RecordsWithSplitIds<E> {
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index e3b8d43..67632e5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -55,6 +56,7 @@
  * @param <SplitT> The type of the splits processed by the source.
  * @param <SplitStateT> The type of the mutable state per split.
  */
+@PublicEvolving
 public abstract class SingleThreadMultiplexSourceReaderBase<
                 E, T, SplitT extends SourceSplit, SplitStateT>
         extends SourceReaderBase<E, T, SplitT, SplitStateT> {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 696e21d..416dbb2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
@@ -63,6 +64,7 @@
  * @param <SplitT> the immutable split type.
  * @param <SplitStateT> the mutable type of split state.
  */
+@PublicEvolving
 public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
         implements SourceReader<T, SplitT> {
     private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
index fa20439..641acf0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 
 /** The options that can be set for the {@link SourceReaderBase}. */
+@PublicEvolving
 public class SourceReaderOptions {
 
     public static final ConfigOption<Long> SOURCE_READER_CLOSE_TIMEOUT =
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index fade54f..97a072b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
@@ -26,6 +27,7 @@
 import java.util.Map;
 
 /** The task to add splits. */
+@Internal
 class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
 
     private final SplitReader<?, SplitT> splitReader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 38140b8..34ac618 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -28,6 +29,7 @@
 import java.util.function.Consumer;
 
 /** The default fetch task that fetches the records into the element queue. */
+@Internal
 class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
     private final SplitReader<E, SplitT> splitReader;
     private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index 2abed2e..f7b49381 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -39,6 +40,7 @@
  * via the same client. In the example of the file source, there is a single thread that reads the
  * files after another.
  */
+@Internal
 public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
         extends SplitFetcherManager<E, SplitT> {
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 78b37a2..f2aa237 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -38,6 +39,7 @@
 import java.util.function.Consumer;
 
 /** The internal fetcher runnable responsible for polling message from the external system. */
+@Internal
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
     private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index a90c69d..3184386 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -50,6 +51,7 @@
  * manager would only start a single fetcher and assign all the splits to it. A one-thread-per-split
  * fetcher may spawn a new thread every time a new split is assigned.
  */
+@Internal
 public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
     private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class);
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index ceb5ec1..62a21d3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.IOException;
 
 /** An interface similar to {@link Runnable} but allows throwing exceptions and wakeup. */
+@Internal
 public interface SplitFetcherTask {
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 97b2125..4f2ff6a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.splitreader;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
@@ -30,6 +31,7 @@
  * @param <E> the element type.
  * @param <SplitT> the split type.
  */
+@PublicEvolving
 public interface SplitReader<E, SplitT extends SourceSplit> {
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
index b26d564..ccc8e19 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.splitreader;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.util.List;
 
 /**
@@ -25,6 +27,7 @@
  *
  * @param <SplitT> the split type.
  */
+@PublicEvolving
 public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {
 
     public SplitsAddition(List<SplitT> splits) {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
index 4c89a4f..ce9f05f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.connector.base.source.reader.splitreader;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.util.Collections;
 import java.util.List;
 
 /** An abstract class to host splits change. */
+@PublicEvolving
 public abstract class SplitsChange<SplitT> {
     private final List<SplitT> splits;
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index bea5709..9977bcd 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -69,6 +70,7 @@
  *
  * @param <T> the type of the elements in the queue.
  */
+@Internal
 public class FutureCompletingBlockingQueue<T> {
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
index 2bf219d..6b2de99 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.utils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
@@ -32,6 +33,7 @@
 import java.util.function.Function;
 
 /** A util class with some helper method for serde in the sources. */
+@Internal
 public class SerdeUtils {
 
     /** Private constructor for util class. */