[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration

This closes #3556.
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index bddd5fc..b3f17b2 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -97,7 +97,8 @@
         // Build Source Operator
         DataSourceTranslator sourceTranslator = new DataSourceTranslator();
         DataStream<Event> stream =
-                sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
+                sourceTranslator.translate(
+                        pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
 
         // Build PreTransformOperator for processing Schema Event
         TransformTranslator transformTranslator = new TransformTranslator();
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 7b631c6..90bbea7 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -23,7 +23,6 @@
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.factories.DataSourceFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
-import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.common.source.EventSourceProvider;
 import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@@ -41,12 +40,14 @@
 public class DataSourceTranslator {
 
     public DataStreamSource<Event> translate(
-            SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
+            SourceDef sourceDef,
+            StreamExecutionEnvironment env,
+            Configuration pipelineConfig,
+            int sourceParallelism) {
         // Create data source
         DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);
 
         // Get source provider
-        final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
         EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
         if (eventSourceProvider instanceof FlinkSourceProvider) {
             // Source
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 7736016..da88753 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -94,7 +94,7 @@
     private final Set<Integer> flushedSinkWriters;
 
     /** Status of the execution of current schema change request. */
-    private boolean isSchemaChangeApplying;
+    private volatile boolean isSchemaChangeApplying;
     /** Executor service to execute schema change. */
     private final ExecutorService schemaChangeThreadPool;