[Hotfix][Connector-V1][Kafka] When json or avro are selected for kafka schema, a ClassCastException error is reported (#3423)

* [Bug] [seatunnel-connectors-flink-kafka] When json or avro are selected for kafka schema, a ClassCastException error is reported  (#3243)

* Fix logger miss

Co-authored-by: brave kong <41284456+bravekong@users.noreply.github.com>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index dbb60c7..2ff6ad9 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -31,6 +31,7 @@
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.auto.service.AutoService;
 import org.apache.commons.lang3.StringUtils;
@@ -111,7 +112,12 @@
         }
         String schemaContent = config.getString(SCHEMA);
         format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
-        schemaInfo = JsonUtils.parseArray(schemaContent);
+        try {
+            schemaInfo = JsonUtils.stringToJsonNode(schemaContent);
+        } catch (JsonProcessingException e) {
+            LOGGER.error("schema parse error", e);
+            throw new ClassCastException(String.format("%s, cannot be cast to com.fasterxml.jackson.databind.JsonNode.", schemaContent));
+        }
     }
 
     @Override