[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration
This closes #3556.
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index bddd5fc..b3f17b2 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -97,7 +97,8 @@
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
- sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
+ sourceTranslator.translate(
+ pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 7b631c6..90bbea7 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -23,7 +23,6 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
-import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@@ -41,12 +40,14 @@
public class DataSourceTranslator {
public DataStreamSource<Event> translate(
- SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
+ SourceDef sourceDef,
+ StreamExecutionEnvironment env,
+ Configuration pipelineConfig,
+ int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);
// Get source provider
- final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 7736016..da88753 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -94,7 +94,7 @@
private final Set<Integer> flushedSinkWriters;
/** Status of the execution of current schema change request. */
- private boolean isSchemaChangeApplying;
+ private volatile boolean isSchemaChangeApplying;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;