[shade] Add shade module of seatunnel.

[shade] Add shade module of seatunnel.
diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 0000000..dcab78f
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+github:
+  description: SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
+  homepage: https://seatunnel.apache.org/
+  labels:
+    - data-integration
+    - change-data-capture
+    - cdc
+    - high-performance
+    - offline
+    - real-time
+    - batch
+    - streaming
+    - data-ingestion
+    - apache
+    - elt
+  enabled_merge_buttons:
+    squash: true
+    merge: false
+    rebase: false
+  protected_branches:
+    dev:
+      required_status_checks:
+        strict: true
+      required_pull_request_reviews:
+        dismiss_stale_reviews: true
+        required_approving_review_count: 2
+
+notifications:
+  commits:      commits@seatunnel.apache.org
+  issues:       commits@seatunnel.apache.org
+  pullrequests: commits@seatunnel.apache.org
+  pullrequests_status:  commits@seatunnel.apache.org
+  pullrequests_comment: commits@seatunnel.apache.org
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ba6882d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,57 @@
+# Package Files #
+**/*.zip
+**/*.tar.gz
+**/*.jar
+
+# see JDK-8214300
+.attach_pid*
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# build targets
+target/
+
+# Log file
+*.log
+/logs
+logs.zip
+
+# Intellij Idea files
+.idea/
+*.iml
+.idea/*
+
+.DS_Store
+
+metastore_db/
+
+work_dir
+
+all-dependencies.txt
+self-modules.txt
+third-party-dependencies.txt
+
+*.keytab
+/derby.log
+
+dependency-reduced-pom.xml
+
+apidoc
+
+# Python
+*.py[cod]
+
+Test.java
+Test.scala
+test.conf
+spark-warehouse
+**/*.flattened-pom.xml
+
+seatunnel-examples
+
+# vscode
+.vscode
+
+/lib/*
+version.properties
diff --git a/.licenserc.yaml b/.licenserc.yaml
new file mode 100644
index 0000000..f3ff27d
--- /dev/null
+++ b/.licenserc.yaml
@@ -0,0 +1,45 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+header:
+  license:
+    spdx-id: Apache-2.0
+    copyright-owner: Apache Software Foundation
+
+  paths-ignore:
+    - NOTICE
+    - LICENSE
+    - DISCLAIMER
+    - mvnw.cmd
+    - .mvn
+    - .gitmodules
+    - .gitattributes
+    - .github/actions
+    - '**/known-dependencies-*.txt'
+    - '**/*.md'
+    - '**/*.mdx'
+    - '**/*.json'
+    - '**/*.iml'
+    - '**/*.ini'
+    - '**/*.svg'
+    - '**/.gitignore'
+    - '**/LICENSE'
+    - '**/NOTICE'
+    - '**/.gitkeep'
+    - '**/com/typesafe/config/**'
+
+  comment: on-failure
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..aa1e4df
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.4/apache-maven-3.8.4-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar
diff --git a/mvnw b/mvnw
new file mode 100755
index 0000000..5643201
--- /dev/null
+++ b/mvnw
@@ -0,0 +1,316 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   M2_HOME - location of maven2's installed home dir
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /usr/local/etc/mavenrc ] ; then
+    . /usr/local/etc/mavenrc
+  fi
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        export JAVA_HOME="`/usr/libexec/java_home`"
+      else
+        export JAVA_HOME="/Library/Java/Home"
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+  ## resolve links - $0 may be a link to maven's home
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+
+  saveddir=`pwd`
+
+  M2_HOME=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  M2_HOME=`cd "$M2_HOME" && pwd`
+
+  cd "$saveddir"
+  # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --unix "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME="`(cd "$M2_HOME"; pwd)`"
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="`which javac`"
+  if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=`which readlink`
+    if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+      if $darwin ; then
+        javaHome="`dirname \"$javaExecutable\"`"
+        javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+      else
+        javaExecutable="`readlink -f \"$javaExecutable\"`"
+      fi
+      javaHome="`dirname \"$javaExecutable\"`"
+      javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="`\\unset -f command; \\command -v java`"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=`cd "$wdir/.."; pwd`
+    fi
+    # end of workaround
+  done
+  echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    echo "$(tr -s '\n' ' ' < "$1")"
+  fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Found .mvn/wrapper/maven-wrapper.jar"
+    fi
+else
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+    fi
+    if [ -n "$MVNW_REPOURL" ]; then
+      jarUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+    else
+      jarUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+    fi
+    while IFS="=" read key value; do
+      case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+      esac
+    done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Downloading from: $jarUrl"
+    fi
+    wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+    if $cygwin; then
+      wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+    fi
+
+    if command -v wget > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found wget ... using wget"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            wget "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        else
+            wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        fi
+    elif command -v curl > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found curl ... using curl"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            curl -o "$wrapperJarPath" "$jarUrl" -f
+        else
+            curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+        fi
+
+    else
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Falling back to using Java to download"
+        fi
+        javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        # For Cygwin, switch paths to Windows format before running javac
+        if $cygwin; then
+          javaClass=`cygpath --path --windows "$javaClass"`
+        fi
+        if [ -e "$javaClass" ]; then
+            if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Compiling MavenWrapperDownloader.java ..."
+                fi
+                # Compiling the Java class
+                ("$JAVA_HOME/bin/javac" "$javaClass")
+            fi
+            if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                # Running the downloader
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Running MavenWrapperDownloader.java ..."
+                fi
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+  echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --path --windows "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  $MAVEN_DEBUG_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.home=${M2_HOME}" \
+  "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
new file mode 100755
index 0000000..8a15b7f
--- /dev/null
+++ b/mvnw.cmd
@@ -0,0 +1,188 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
+if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Found %WRAPPER_JAR%
+    )
+) else (
+    if not "%MVNW_REPOURL%" == "" (
+        SET DOWNLOAD_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+    )
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Couldn't find %WRAPPER_JAR%, downloading it ...
+        echo Downloading from: %DOWNLOAD_URL%
+    )
+
+    powershell -Command "&{"^
+		"$webclient = new-object System.Net.WebClient;"^
+		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+		"}"^
+		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+		"}"
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Finished downloading %WRAPPER_JAR%
+    )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% ^
+  %JVM_CONFIG_MAVEN_PROPS% ^
+  %MAVEN_OPTS% ^
+  %MAVEN_DEBUG_OPTS% ^
+  -classpath %WRAPPER_JAR% ^
+  "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
+  %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
+if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%"=="on" pause
+
+if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
+
+cmd /C exit /B %ERROR_CODE%
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b7b00dd
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,210 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-shade</artifactId>
+    <version>${revision}</version>
+
+    <packaging>pom</packaging>
+
+    <name>SeaTunnel : Shade :</name>
+
+    <modules>
+        <module>seatunnel-hadoop3-3.1.4-uber</module>
+        <module>seatunnel-jackson</module>
+        <module>seatunnel-guava</module>
+        <module>seatunnel-config</module>
+    </modules>
+
+    <properties>
+        <revision>2.3.4</revision>
+        <e2e.dependency.skip>true</e2e.dependency.skip>
+        <spotless.version>2.29.0</spotless.version>
+        <junit5.version>5.9.2</junit5.version>
+        <skip.spotless>false</skip.spotless>
+        <maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
+        <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
+        <maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
+        <seatunnel.shade.package>org.apache.seatunnel.shade</seatunnel.shade.package>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.junit</groupId>
+                <artifactId>junit-bom</artifactId>
+                <version>${junit5.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless.version}</version>
+                <configuration>
+                    <skip>${skip.spotless}</skip>
+                    <java>
+                        <excludes>
+                            <exclude>src/main/java/org/apache/seatunnel/antlr4/generated/*.*</exclude>
+                        </excludes>
+                        <googleJavaFormat>
+                            <version>1.7</version>
+                            <style>AOSP</style>
+                        </googleJavaFormat>
+                        <removeUnusedImports />
+                        <formatAnnotations />
+                        <importOrder>
+                            <order>org.apache.seatunnel.shade,org.apache.seatunnel,org.apache,org,,javax,java,\#</order>
+                        </importOrder>
+                        <replaceRegex>
+                            <name>Remove wildcard imports</name>
+                            <searchRegex>import\s+(static)*\s*[^\*\s]+\*;(\r\n|\r|\n)</searchRegex>
+                            <replacement>$1</replacement>
+                        </replaceRegex>
+                        <replaceRegex>
+                            <name>Block powermock</name>
+                            <searchRegex>import\s+org\.powermock\.[^\*\s]*(|\*);(\r\n|\r|\n)</searchRegex>
+                            <replacement>$1</replacement>
+                        </replaceRegex>
+                        <replaceRegex>
+                            <name>Block jUnit4 imports</name>
+                            <searchRegex>import\s+org\.junit\.[^jupiter][^\*\s]*(|\*);(\r\n|\r|\n)</searchRegex>
+                            <replacement>$1</replacement>
+                        </replaceRegex>
+                    </java>
+                    <pom>
+                        <sortPom>
+                            <encoding>UTF-8</encoding>
+                            <nrOfIndentSpace>4</nrOfIndentSpace>
+                            <keepBlankLines>true</keepBlankLines>
+                            <indentBlankLines>false</indentBlankLines>
+                            <indentSchemaLocation>true</indentSchemaLocation>
+                            <spaceBeforeCloseEmptyElement>true</spaceBeforeCloseEmptyElement>
+                            <sortModules>false</sortModules>
+                            <sortExecutions>false</sortExecutions>
+                            <predefinedSortOrder>custom_1</predefinedSortOrder>
+                            <expandEmptyElements>false</expandEmptyElements>
+                            <sortProperties>false</sortProperties>
+                        </sortPom>
+                        <replace>
+                            <name>Leading blank line</name>
+                            <search>project</search>
+                            <replacement>project</replacement>
+                        </replace>
+                    </pom>
+                    <markdown>
+                        <includes>
+                            <include>docs/**/*.md</include>
+                        </includes>
+                        <excludes>
+                            <exclude>**/.github/**/*.md</exclude>
+                        </excludes>
+                        <flexmark />
+                    </markdown>
+                    <upToDateChecking>
+                        <enabled>true</enabled>
+                    </upToDateChecking>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>validate</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <configuration>
+                    <skip>${e2e.dependency.skip}</skip>
+                    <appendOutput>true</appendOutput>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-release-plugin</artifactId>
+                <configuration>
+                    <autoVersionSubmodules>true</autoVersionSubmodules>
+                    <tagNameFormat>@{project.version}</tagNameFormat>
+                    <tagBase>${project.version}</tagBase>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.scm</groupId>
+                        <artifactId>maven-scm-provider-jgit</artifactId>
+                        <version>${maven-scm-provider-jgit.version}</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+
+            <!-- shade -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <configuration>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <!-- Make sure the transitive dependencies are written to the generated pom under <dependencies> -->
+                    <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>org.slf4j:*</exclude>
+                            <exclude>ch.qos.logback:*</exclude>
+                            <exclude>log4j:*</exclude>
+                            <exclude>org.apache.logging.log4j:*</exclude>
+                            <exclude>commons-logging:*</exclude>
+                        </excludes>
+                    </artifactSet>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <transformers combine.children="append">
+                                <!-- The service transformer is needed to merge META-INF/services files -->
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/seatunnel-config/README.md b/seatunnel-config/README.md
new file mode 100644
index 0000000..4932a3f
--- /dev/null
+++ b/seatunnel-config/README.md
@@ -0,0 +1,25 @@
+# Introduction
+The `seatunnel-config` is used to parse `seatunnel.conf` files. This module is based on `com.typesafe.config`, 
+We have made some enhancement and import our enhancement by using maven shade. Most of the times, you don't need to directly 
+using this module, since you can receive from maven repository.
+
+# How to modify the config module
+If you want to modify the config module, you can follow the steps below.
+1. Open the `seatunnel-config` module.
+```xml
+<!--
+    We retrieve the config module from maven repository. If you want to change the config module,
+    you need to open this annotation and change the dependency of config-shade to project.
+    <module>seatunnel-config</module>
+-->
+```
+Open the annuotaion in `pom.xml` file, to import the `seatunnel-config` module.
+2. Replace the `config-shade` dependency to project.
+```xml
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-config-shade</artifactId>
+    <version>${project.version}</version>
+</dependency>
+```
+Add `<version>${project.version}</version>` to `seatunnel-config-shade` everywhere you use.
\ No newline at end of file
diff --git a/seatunnel-config/pom.xml b/seatunnel-config/pom.xml
new file mode 100644
index 0000000..9fcba5f
--- /dev/null
+++ b/seatunnel-config/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <artifactId>seatunnel-config</artifactId>
+    <packaging>pom</packaging>
+
+    <name>SeaTunnel : Config :</name>
+
+    <modules>
+        <module>seatunnel-config-shade</module>
+        <module>seatunnel-config-base</module>
+    </modules>
+</project>
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml b/seatunnel-config/seatunnel-config-base/pom.xml
new file mode 100644
index 0000000..37dfe2d
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-config</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>seatunnel-config-base</artifactId>
+    <name>SeaTunnel : Config : Base</name>
+
+    <properties>
+        <typesafe.version>1.3.3</typesafe.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>${typesafe.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+
+        <finalName>${project.artifactId}-${project.version}</finalName>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <minimizeJar>true</minimizeJar>
+                    <createSourcesJar>true</createSourcesJar>
+                    <shadeSourcesContent>true</shadeSourcesContent>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <createDependencyReducedPom>false</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>com.typesafe:config</artifact>
+                            <includes>
+                                <include>**</include>
+                            </includes>
+                            <excludes>
+                                <exclude>META-INF/MANIFEST.MF</exclude>
+                                <exclude>META-INF/NOTICE</exclude>
+                                <exclude>com/typesafe/config/ConfigParseOptions.class</exclude>
+                                <exclude>com/typesafe/config/ConfigMergeable.class</exclude>
+                                <exclude>com/typesafe/config/impl/ConfigParser.class</exclude>
+                                <exclude>com/typesafe/config/impl/ConfigNodePath.class</exclude>
+                                <exclude>com/typesafe/config/impl/PathParser.class</exclude>
+                                <exclude>com/typesafe/config/impl/Path.class</exclude>
+                                <exclude>com/typesafe/config/impl/SimpleConfigObject.class</exclude>
+                                <exclude>com/typesafe/config/impl/PropertiesParser.class</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                    <relocations>
+                        <relocation>
+                            <pattern>com.typesafe.config</pattern>
+                            <shadedPattern>${seatunnel.shade.package}.com.typesafe.config</shadedPattern>
+                        </relocation>
+                    </relocations>
+                    <transformers>
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" />
+                    </transformers>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${basedir}/target/${project.artifactId}-${project.version}.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
diff --git a/seatunnel-config/seatunnel-config-shade/pom.xml b/seatunnel-config/seatunnel-config-shade/pom.xml
new file mode 100644
index 0000000..903ff75
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-config</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>seatunnel-config-shade</artifactId>
+    <name>SeaTunnel : Config : Shade</name>
+
+    <properties>
+        <scala.version>2.12.15</scala.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-config-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+
+        <finalName>${project.artifactId}-${project.version}</finalName>
+
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${basedir}/target/${project.artifactId}-${project.version}.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
new file mode 100644
index 0000000..e1855c0
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config;
+
+import java.io.Serializable;
+
+/**
+ * Copy from {@link com.typesafe.config.ConfigMergeable}, in order to make the {@link Config} can be
+ * serialized
+ */
+public interface ConfigMergeable extends Serializable {
+    ConfigMergeable withFallback(ConfigMergeable configMergeable);
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
new file mode 100644
index 0000000..f66d4d1
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
@@ -0,0 +1,257 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config;
+
+/**
+ * A set of options related to parsing.
+ *
+ * <p>This object is immutable, so the "setters" return a new object.
+ *
+ * <p>Here is an example of creating a custom {@code ConfigParseOptions}:
+ *
+ * <pre>
+ *     ConfigParseOptions options = ConfigParseOptions.defaults()
+ *         .setSyntax(ConfigSyntax.JSON)
+ *         .setAllowMissing(false)
+ * </pre>
+ */
+public final class ConfigParseOptions {
+
+    /** a.b.c a-&gt;b-&gt;c */
+    public static final String PATH_TOKEN_SEPARATOR = "->";
+
+    final ConfigSyntax syntax;
+    final String originDescription;
+    final boolean allowMissing;
+    final ConfigIncluder includer;
+    final ClassLoader classLoader;
+
+    private ConfigParseOptions(
+            ConfigSyntax syntax,
+            String originDescription,
+            boolean allowMissing,
+            ConfigIncluder includer,
+            ClassLoader classLoader) {
+        this.syntax = syntax;
+        this.originDescription = originDescription;
+        this.allowMissing = allowMissing;
+        this.includer = includer;
+        this.classLoader = classLoader;
+    }
+
+    /**
+     * Gets an instance of {@code ConfigParseOptions} with all fields set to the default values.
+     * Start with this instance and make any changes you need.
+     *
+     * @return the default parse options
+     */
+    public static ConfigParseOptions defaults() {
+        return new ConfigParseOptions(null, null, true, null, null);
+    }
+
+    /**
+     * Set the file format. If set to null, try to guess from any available filename extension; if
+     * guessing fails, assume {@link ConfigSyntax#CONF}.
+     *
+     * @param syntax a syntax or {@code null} for best guess
+     * @return options with the syntax set
+     */
+    public ConfigParseOptions setSyntax(ConfigSyntax syntax) {
+        if (this.syntax == syntax) {
+            return this;
+        } else {
+            return new ConfigParseOptions(
+                    syntax,
+                    this.originDescription,
+                    this.allowMissing,
+                    this.includer,
+                    this.classLoader);
+        }
+    }
+
+    /**
+     * Gets the current syntax option, which may be null for "any".
+     *
+     * @return the current syntax or null
+     */
+    public ConfigSyntax getSyntax() {
+        return syntax;
+    }
+
+    /**
+     * Set a description for the thing being parsed. In most cases this will be set up for you to
+     * something like the filename, but if you provide just an input stream you might want to
+     * improve on it. Set to null to allow the library to come up with something automatically. This
+     * description is the basis for the {@link ConfigOrigin} of the parsed values.
+     *
+     * @param originDescription description to put in the {@link ConfigOrigin}
+     * @return options with the origin description set
+     */
+    public ConfigParseOptions setOriginDescription(String originDescription) {
+        // findbugs complains about == here but is wrong, do not "fix"
+        if (this.originDescription == originDescription) {
+            return this;
+        } else if (this.originDescription != null
+                && originDescription != null
+                && this.originDescription.equals(originDescription)) {
+            return this;
+        } else {
+            return new ConfigParseOptions(
+                    this.syntax,
+                    originDescription,
+                    this.allowMissing,
+                    this.includer,
+                    this.classLoader);
+        }
+    }
+
+    /**
+     * Gets the current origin description, which may be null for "automatic".
+     *
+     * @return the current origin description or null
+     */
+    public String getOriginDescription() {
+        return originDescription;
+    }
+
+    /** this is package-private, not public API */
+    ConfigParseOptions withFallbackOriginDescription(String originDescription) {
+        if (this.originDescription == null) {
+            return setOriginDescription(originDescription);
+        } else {
+            return this;
+        }
+    }
+
+    /**
+     * Set to false to throw an exception if the item being parsed (for example a file) is missing.
+     * Set to true to just return an empty document in that case. Note that this setting applies on
+     * only to fetching the root document, it has no effect on any nested includes.
+     *
+     * @param allowMissing true to silently ignore missing item
+     * @return options with the "allow missing" flag set
+     */
+    public ConfigParseOptions setAllowMissing(boolean allowMissing) {
+        if (this.allowMissing == allowMissing) {
+            return this;
+        } else {
+            return new ConfigParseOptions(
+                    this.syntax,
+                    this.originDescription,
+                    allowMissing,
+                    this.includer,
+                    this.classLoader);
+        }
+    }
+
+    /**
+     * Gets the current "allow missing" flag.
+     *
+     * @return whether we allow missing files
+     */
+    public boolean getAllowMissing() {
+        return allowMissing;
+    }
+
+    /**
+     * Set a {@link ConfigIncluder} which customizes how includes are handled. null means to use the
+     * default includer.
+     *
+     * @param includer the includer to use or null for default
+     * @return new version of the parse options with different includer
+     */
+    public ConfigParseOptions setIncluder(ConfigIncluder includer) {
+        if (this.includer == includer) {
+            return this;
+        } else {
+            return new ConfigParseOptions(
+                    this.syntax,
+                    this.originDescription,
+                    this.allowMissing,
+                    includer,
+                    this.classLoader);
+        }
+    }
+
+    /**
+     * Prepends a {@link ConfigIncluder} which customizes how includes are handled. To prepend your
+     * includer, the library calls {@link ConfigIncluder#withFallback} on your includer to append
+     * the existing includer to it.
+     *
+     * @param includer the includer to prepend (may not be null)
+     * @return new version of the parse options with different includer
+     */
+    public ConfigParseOptions prependIncluder(ConfigIncluder includer) {
+        if (includer == null) {
+            throw new NullPointerException("null includer passed to prependIncluder");
+        }
+        if (this.includer == includer) {
+            return this;
+        } else if (this.includer != null) {
+            return setIncluder(includer.withFallback(this.includer));
+        } else {
+            return setIncluder(includer);
+        }
+    }
+
+    /**
+     * Appends a {@link ConfigIncluder} which customizes how includes are handled. To append, the
+     * library calls {@link ConfigIncluder#withFallback} on the existing includer.
+     *
+     * @param includer the includer to append (may not be null)
+     * @return new version of the parse options with different includer
+     */
+    public ConfigParseOptions appendIncluder(ConfigIncluder includer) {
+        if (includer == null) {
+            throw new NullPointerException("null includer passed to appendIncluder");
+        }
+        if (this.includer == includer) {
+            return this;
+        } else if (this.includer != null) {
+            return setIncluder(this.includer.withFallback(includer));
+        } else {
+            return setIncluder(includer);
+        }
+    }
+
+    /**
+     * Gets the current includer (will be null for the default includer).
+     *
+     * @return current includer or null
+     */
+    public ConfigIncluder getIncluder() {
+        return includer;
+    }
+
+    /**
+     * Set the class loader. If set to null, {@code Thread.currentThread().getContextClassLoader()}
+     * will be used.
+     *
+     * @param loader a class loader or {@code null} to use thread context class loader
+     * @return options with the class loader set
+     */
+    public ConfigParseOptions setClassLoader(ClassLoader loader) {
+        if (this.classLoader == loader) {
+            return this;
+        } else {
+            return new ConfigParseOptions(
+                    this.syntax, this.originDescription, this.allowMissing, this.includer, loader);
+        }
+    }
+
+    /**
+     * Get the class loader; never returns {@code null}, if the class loader was unset, returns
+     * {@code Thread.currentThread().getContextClassLoader()}.
+     *
+     * @return class loader to use
+     */
+    public ClassLoader getClassLoader() {
+        if (this.classLoader == null) {
+            return Thread.currentThread().getContextClassLoader();
+        } else {
+            return this.classLoader;
+        }
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java
new file mode 100644
index 0000000..95e1022
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java
@@ -0,0 +1,64 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+final class ConfigNodePath extends AbstractConfigNode {
+    private final Path path;
+    final ArrayList<Token> tokens;
+
+    ConfigNodePath(Path path, Collection<Token> tokens) {
+        this.path = path;
+        this.tokens = new ArrayList<>(tokens);
+    }
+
+    @Override
+    protected Collection<Token> tokens() {
+        return tokens;
+    }
+
+    protected Path value() {
+        return path;
+    }
+
+    protected ConfigNodePath subPath(int toRemove) {
+        int periodCount = 0;
+        ArrayList<Token> tokensCopy = new ArrayList<>(tokens);
+        for (int i = 0; i < tokensCopy.size(); i++) {
+            if (Tokens.isUnquotedText(tokensCopy.get(i))
+                    && tokensCopy
+                            .get(i)
+                            .tokenText()
+                            .equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
+                periodCount++;
+            }
+
+            if (periodCount == toRemove) {
+                return new ConfigNodePath(
+                        path.subPath(toRemove), tokensCopy.subList(i + 1, tokensCopy.size()));
+            }
+        }
+        throw new ConfigException.BugOrBroken("Tried to remove too many elements from a Path node");
+    }
+
+    protected ConfigNodePath first() {
+        ArrayList<Token> tokensCopy = new ArrayList<>(tokens);
+        for (int i = 0; i < tokensCopy.size(); i++) {
+            if (Tokens.isUnquotedText(tokensCopy.get(i))
+                    && tokensCopy
+                            .get(i)
+                            .tokenText()
+                            .equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
+                return new ConfigNodePath(path.subPath(0, 1), tokensCopy.subList(0, i));
+            }
+        }
+        return this;
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
new file mode 100644
index 0000000..74bff80
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
@@ -0,0 +1,613 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigIncludeContext;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+final class ConfigParser {
+    static AbstractConfigValue parse(
+            ConfigNodeRoot document,
+            ConfigOrigin origin,
+            ConfigParseOptions options,
+            ConfigIncludeContext includeContext) {
+        ParseContext context =
+                new ParseContext(
+                        options.getSyntax(),
+                        origin,
+                        document,
+                        SimpleIncluder.makeFull(options.getIncluder()),
+                        includeContext);
+        return context.parse();
+    }
+
+    private static final class ParseContext {
+        private int lineNumber;
+        private final ConfigNodeRoot document;
+        private final FullIncluder includer;
+        private final ConfigIncludeContext includeContext;
+        private final ConfigSyntax flavor;
+        private final ConfigOrigin baseOrigin;
+        private final LinkedList<Path> pathStack;
+
+        // the number of lists we are inside; this is used to detect the "cannot
+        // generate a reference to a list element" problem, and once we fix that
+        // problem we should be able to get rid of this variable.
+        int arrayCount;
+
+        ParseContext(
+                ConfigSyntax flavor,
+                ConfigOrigin origin,
+                ConfigNodeRoot document,
+                FullIncluder includer,
+                ConfigIncludeContext includeContext) {
+            lineNumber = 1;
+            this.document = document;
+            this.flavor = flavor;
+            this.baseOrigin = origin;
+            this.includer = includer;
+            this.includeContext = includeContext;
+            this.pathStack = new LinkedList<>();
+            this.arrayCount = 0;
+        }
+
+        // merge a bunch of adjacent values into one
+        // value; change unquoted text into a string
+        // value.
+        private AbstractConfigValue parseConcatenation(ConfigNodeConcatenation n) {
+            // this trick is not done in JSON
+            if (flavor == ConfigSyntax.JSON) {
+                throw new ConfigException.BugOrBroken("Found a concatenation node in JSON");
+            }
+
+            List<AbstractConfigValue> values = new ArrayList<>();
+
+            for (AbstractConfigNode node : n.children()) {
+                AbstractConfigValue v = null;
+                if (node instanceof AbstractConfigNodeValue) {
+                    v = parseValue((AbstractConfigNodeValue) node, null);
+                    values.add(v);
+                }
+            }
+
+            return ConfigConcatenation.concatenate(values);
+        }
+
+        private SimpleConfigOrigin lineOrigin() {
+            return ((SimpleConfigOrigin) baseOrigin).withLineNumber(lineNumber);
+        }
+
+        private ConfigException parseError(String message) {
+            return parseError(message, null);
+        }
+
+        private ConfigException parseError(String message, Throwable cause) {
+            return new ConfigException.Parse(lineOrigin(), message, cause);
+        }
+
+        private Path fullCurrentPath() {
+            // pathStack has top of stack at front
+            if (pathStack.isEmpty()) {
+                throw new ConfigException.BugOrBroken(
+                        "Bug in parser; tried to get current path when at root");
+            } else {
+                return new Path(pathStack.descendingIterator());
+            }
+        }
+
+        private AbstractConfigValue parseValue(AbstractConfigNodeValue n, List<String> comments) {
+            AbstractConfigValue v;
+
+            int startingArrayCount = arrayCount;
+
+            if (n instanceof ConfigNodeSimpleValue) {
+                v = ((ConfigNodeSimpleValue) n).value();
+            } else if (n instanceof ConfigNodeObject) {
+
+                Path path = pathStack.peekFirst();
+
+                if (path != null
+                        && !ConfigSyntax.JSON.equals(flavor)
+                        && ("source".equals(path.first())
+                                || "transform".equals(path.first())
+                                || "sink".equals(path.first()))) {
+                    v = parseObjectForSeaTunnel((ConfigNodeObject) n);
+                } else {
+                    v = parseObject((ConfigNodeObject) n);
+                }
+
+            } else if (n instanceof ConfigNodeArray) {
+                v = parseArray((ConfigNodeArray) n);
+            } else if (n instanceof ConfigNodeConcatenation) {
+                v = parseConcatenation((ConfigNodeConcatenation) n);
+            } else {
+                throw parseError("Expecting a value but got wrong node type: " + n.getClass());
+            }
+
+            if (comments != null && !comments.isEmpty()) {
+                v = v.withOrigin(v.origin().prependComments(new ArrayList<>(comments)));
+                comments.clear();
+            }
+
+            if (arrayCount != startingArrayCount) {
+                throw new ConfigException.BugOrBroken(
+                        "Bug in config parser: unbalanced array count");
+            }
+
+            return v;
+        }
+
+        private static AbstractConfigObject createValueUnderPath(
+                Path path, AbstractConfigValue value) {
+            // for path foo.bar, we are creating
+            // { "foo" : { "bar" : value } }
+            List<String> keys = new ArrayList<>();
+
+            String key = path.first();
+            Path remaining = path.remainder();
+            while (key != null) {
+                keys.add(key);
+                if (remaining == null) {
+                    break;
+                } else {
+                    key = remaining.first();
+                    remaining = remaining.remainder();
+                }
+            }
+
+            // the withComments(null) is to ensure comments are only
+            // on the exact leaf node they apply to.
+            // a comment before "foo.bar" applies to the full setting
+            // "foo.bar" not also to "foo"
+            ListIterator<String> i = keys.listIterator(keys.size());
+            String deepest = i.previous();
+            AbstractConfigObject o =
+                    new SimpleConfigObject(
+                            value.origin().withComments(null),
+                            Collections.singletonMap(deepest, value));
+            while (i.hasPrevious()) {
+                Map<String, AbstractConfigValue> m = Collections.singletonMap(i.previous(), o);
+                o = new SimpleConfigObject(value.origin().withComments(null), m);
+            }
+
+            return o;
+        }
+
+        private void parseInclude(Map<String, AbstractConfigValue> values, ConfigNodeInclude n) {
+            boolean isRequired = n.isRequired();
+            ConfigIncludeContext cic =
+                    includeContext.setParseOptions(
+                            includeContext.parseOptions().setAllowMissing(!isRequired));
+
+            AbstractConfigObject obj;
+            switch (n.kind()) {
+                case URL:
+                    URL url;
+                    try {
+                        url = new URL(n.name());
+                    } catch (MalformedURLException e) {
+                        throw parseError("include url() specifies an invalid URL: " + n.name(), e);
+                    }
+                    obj = (AbstractConfigObject) includer.includeURL(cic, url);
+                    break;
+
+                case FILE:
+                    obj = (AbstractConfigObject) includer.includeFile(cic, new File(n.name()));
+                    break;
+
+                case CLASSPATH:
+                    obj = (AbstractConfigObject) includer.includeResources(cic, n.name());
+                    break;
+
+                case HEURISTIC:
+                    obj = (AbstractConfigObject) includer.include(cic, n.name());
+                    break;
+
+                default:
+                    throw new ConfigException.BugOrBroken("should not be reached");
+            }
+
+            // we really should make this work, but for now throwing an
+            // exception is better than producing an incorrect result.
+            // See https://github.com/lightbend/config/issues/160
+            if (arrayCount > 0 && obj.resolveStatus() != ResolveStatus.RESOLVED) {
+                throw parseError(
+                        "Due to current limitations of the config parser, when an include statement is nested inside a list value, "
+                                + "${} substitutions inside the included file cannot be resolved correctly. Either move the include outside of the list value or "
+                                + "remove the ${} statements from the included file.");
+            }
+
+            if (!pathStack.isEmpty()) {
+                Path prefix = fullCurrentPath();
+                obj = obj.relativized(prefix);
+            }
+
+            for (String key : obj.keySet()) {
+                AbstractConfigValue v = obj.get(key);
+                AbstractConfigValue existing = values.get(key);
+                if (existing != null) {
+                    values.put(key, v.withFallback(existing));
+                } else {
+                    values.put(key, v);
+                }
+            }
+        }
+
+        private SimpleConfigList parseObjectForSeaTunnel(ConfigNodeObject n) {
+
+            Map<String, AbstractConfigValue> values = new LinkedHashMap<>();
+            List<AbstractConfigValue> valuesList = new ArrayList<>();
+            SimpleConfigOrigin objectOrigin = lineOrigin();
+            boolean lastWasNewline = false;
+
+            ArrayList<AbstractConfigNode> nodes = new ArrayList<>(n.children());
+            List<String> comments = new ArrayList<>();
+            for (int i = 0; i < nodes.size(); i++) {
+                AbstractConfigNode node = nodes.get(i);
+                if (node instanceof ConfigNodeComment) {
+                    lastWasNewline = false;
+                    comments.add(((ConfigNodeComment) node).commentText());
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                    lineNumber++;
+                    if (lastWasNewline) {
+                        // Drop all comments if there was a blank line and start a new comment block
+                        comments.clear();
+                    }
+                    lastWasNewline = true;
+                } else if (flavor != ConfigSyntax.JSON && node instanceof ConfigNodeInclude) {
+                    parseInclude(values, (ConfigNodeInclude) node);
+                    lastWasNewline = false;
+                } else if (node instanceof ConfigNodeField) {
+                    lastWasNewline = false;
+                    Path path = ((ConfigNodeField) node).path().value();
+                    comments.addAll(((ConfigNodeField) node).comments());
+
+                    // path must be on-stack while we parse the value
+                    pathStack.push(path);
+                    if (((ConfigNodeField) node).separator() == Tokens.PLUS_EQUALS) {
+                        // we really should make this work, but for now throwing
+                        // an exception is better than producing an incorrect
+                        // result. See
+                        // https://github.com/lightbend/config/issues/160
+                        if (arrayCount > 0) {
+                            throw parseError(
+                                    "Due to current limitations of the config parser, += does not work nested inside a list. "
+                                            + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
+                                            + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
+                        }
+
+                        // because we will put it in an array after the fact so
+                        // we want this to be incremented during the parseValue
+                        // below in order to throw the above exception.
+                        arrayCount += 1;
+                    }
+
+                    AbstractConfigNodeValue valueNode;
+                    AbstractConfigValue newValue;
+
+                    valueNode = ((ConfigNodeField) node).value();
+
+                    // comments from the key token go to the value token
+                    newValue = parseValue(valueNode, comments);
+
+                    if (((ConfigNodeField) node).separator() == Tokens.PLUS_EQUALS) {
+                        arrayCount -= 1;
+
+                        List<AbstractConfigValue> concat = new ArrayList<>(2);
+                        AbstractConfigValue previousRef =
+                                new ConfigReference(
+                                        newValue.origin(),
+                                        new SubstitutionExpression(
+                                                fullCurrentPath(), true /* optional */));
+                        AbstractConfigValue list =
+                                new SimpleConfigList(
+                                        newValue.origin(), Collections.singletonList(newValue));
+                        concat.add(previousRef);
+                        concat.add(list);
+                        newValue = ConfigConcatenation.concatenate(concat);
+                    }
+
+                    // Grab any trailing comments on the same line
+                    if (i < nodes.size() - 1) {
+                        i++;
+                        while (i < nodes.size()) {
+                            if (nodes.get(i) instanceof ConfigNodeComment) {
+                                ConfigNodeComment comment = (ConfigNodeComment) nodes.get(i);
+                                newValue =
+                                        newValue.withOrigin(
+                                                newValue.origin()
+                                                        .appendComments(
+                                                                Collections.singletonList(
+                                                                        comment.commentText())));
+                                break;
+                            } else if (nodes.get(i) instanceof ConfigNodeSingleToken) {
+                                ConfigNodeSingleToken curr = (ConfigNodeSingleToken) nodes.get(i);
+                                if (curr.token() == Tokens.COMMA
+                                        || Tokens.isIgnoredWhitespace(curr.token())) {
+                                    // keep searching, as there could still be a comment
+                                } else {
+                                    i--;
+                                    break;
+                                }
+                            } else {
+                                i--;
+                                break;
+                            }
+                            i++;
+                        }
+                    }
+
+                    pathStack.pop();
+
+                    String key = path.first();
+                    Path remaining = path.remainder();
+
+                    if (remaining == null) {
+
+                        Map<String, String> m = Collections.singletonMap("plugin_name", key);
+                        newValue = newValue.withFallback(ConfigValueFactory.fromMap(m));
+
+                        values.put(key, newValue);
+                        valuesList.add(newValue);
+                    } else {
+                        if (flavor == ConfigSyntax.JSON) {
+                            throw new ConfigException.BugOrBroken(
+                                    "somehow got multi-element path in JSON mode");
+                        }
+
+                        AbstractConfigObject obj = createValueUnderPath(remaining, newValue);
+
+                        Map<String, String> m = Collections.singletonMap("plugin_name", key);
+                        obj = obj.withFallback(ConfigValueFactory.fromMap(m));
+
+                        values.put(key, obj);
+                        valuesList.add(obj);
+                    }
+                }
+            }
+
+            return new SimpleConfigList(objectOrigin, valuesList);
+        }
+
+        private AbstractConfigObject parseObject(ConfigNodeObject n) {
+            Map<String, AbstractConfigValue> values = new LinkedHashMap<>();
+            SimpleConfigOrigin objectOrigin = lineOrigin();
+            boolean lastWasNewline = false;
+
+            ArrayList<AbstractConfigNode> nodes = new ArrayList<>(n.children());
+            List<String> comments = new ArrayList<>();
+            for (int i = 0; i < nodes.size(); i++) {
+                AbstractConfigNode node = nodes.get(i);
+                if (node instanceof ConfigNodeComment) {
+                    lastWasNewline = false;
+                    comments.add(((ConfigNodeComment) node).commentText());
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                    lineNumber++;
+                    if (lastWasNewline) {
+                        // Drop all comments if there was a blank line and start a new comment block
+                        comments.clear();
+                    }
+                    lastWasNewline = true;
+                } else if (flavor != ConfigSyntax.JSON && node instanceof ConfigNodeInclude) {
+                    parseInclude(values, (ConfigNodeInclude) node);
+                    lastWasNewline = false;
+                } else if (node instanceof ConfigNodeField) {
+                    lastWasNewline = false;
+                    Path path = ((ConfigNodeField) node).path().value();
+                    comments.addAll(((ConfigNodeField) node).comments());
+
+                    // path must be on-stack while we parse the value
+                    pathStack.push(path);
+                    if (((ConfigNodeField) node).separator() == Tokens.PLUS_EQUALS) {
+                        // we really should make this work, but for now throwing
+                        // an exception is better than producing an incorrect
+                        // result. See
+                        // https://github.com/lightbend/config/issues/160
+                        if (arrayCount > 0) {
+                            throw parseError(
+                                    "Due to current limitations of the config parser, += does not work nested inside a list. "
+                                            + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
+                                            + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
+                        }
+
+                        // because we will put it in an array after the fact so
+                        // we want this to be incremented during the parseValue
+                        // below in order to throw the above exception.
+                        arrayCount += 1;
+                    }
+
+                    AbstractConfigNodeValue valueNode;
+                    AbstractConfigValue newValue;
+
+                    valueNode = ((ConfigNodeField) node).value();
+
+                    // comments from the key token go to the value token
+                    newValue = parseValue(valueNode, comments);
+
+                    if (((ConfigNodeField) node).separator() == Tokens.PLUS_EQUALS) {
+                        arrayCount -= 1;
+
+                        List<AbstractConfigValue> concat = new ArrayList<>(2);
+                        AbstractConfigValue previousRef =
+                                new ConfigReference(
+                                        newValue.origin(),
+                                        new SubstitutionExpression(
+                                                fullCurrentPath(), true /* optional */));
+                        AbstractConfigValue list =
+                                new SimpleConfigList(
+                                        newValue.origin(), Collections.singletonList(newValue));
+                        concat.add(previousRef);
+                        concat.add(list);
+                        newValue = ConfigConcatenation.concatenate(concat);
+                    }
+
+                    // Grab any trailing comments on the same line
+                    if (i < nodes.size() - 1) {
+                        i++;
+                        while (i < nodes.size()) {
+                            if (nodes.get(i) instanceof ConfigNodeComment) {
+                                ConfigNodeComment comment = (ConfigNodeComment) nodes.get(i);
+                                newValue =
+                                        newValue.withOrigin(
+                                                newValue.origin()
+                                                        .appendComments(
+                                                                Collections.singletonList(
+                                                                        comment.commentText())));
+                                break;
+                            } else if (nodes.get(i) instanceof ConfigNodeSingleToken) {
+                                ConfigNodeSingleToken curr = (ConfigNodeSingleToken) nodes.get(i);
+                                if (curr.token() == Tokens.COMMA
+                                        || Tokens.isIgnoredWhitespace(curr.token())) {
+                                    // keep searching, as there could still be a comment
+                                } else {
+                                    i--;
+                                    break;
+                                }
+                            } else {
+                                i--;
+                                break;
+                            }
+                            i++;
+                        }
+                    }
+
+                    pathStack.pop();
+
+                    String key = path.first();
+                    Path remaining = path.remainder();
+
+                    if (remaining == null) {
+                        AbstractConfigValue existing = values.get(key);
+                        if (existing != null) {
+                            // In strict JSON, dups should be an error; while in
+                            // our custom config language, they should be merged
+                            // if the value is an object (or substitution that
+                            // could become an object).
+
+                            if (flavor == ConfigSyntax.JSON) {
+                                throw parseError(
+                                        "JSON does not allow duplicate fields: '"
+                                                + key
+                                                + "' was already seen at "
+                                                + existing.origin().description());
+                            } else {
+                                newValue = newValue.withFallback(existing);
+                            }
+                        }
+                        values.put(key, newValue);
+                    } else {
+                        if (flavor == ConfigSyntax.JSON) {
+                            throw new ConfigException.BugOrBroken(
+                                    "somehow got multi-element path in JSON mode");
+                        }
+
+                        AbstractConfigObject obj = createValueUnderPath(remaining, newValue);
+                        AbstractConfigValue existing = values.get(key);
+                        if (existing != null) {
+                            obj = obj.withFallback(existing);
+                        }
+                        values.put(key, obj);
+                    }
+                }
+            }
+
+            return new SimpleConfigObject(objectOrigin, values);
+        }
+
+        private SimpleConfigList parseArray(ConfigNodeArray n) {
+            arrayCount += 1;
+
+            SimpleConfigOrigin arrayOrigin = lineOrigin();
+            List<AbstractConfigValue> values = new ArrayList<>();
+
+            boolean lastWasNewLine = false;
+            List<String> comments = new ArrayList<>();
+
+            AbstractConfigValue v = null;
+
+            for (AbstractConfigNode node : n.children()) {
+                if (node instanceof ConfigNodeComment) {
+                    comments.add(((ConfigNodeComment) node).commentText());
+                    lastWasNewLine = false;
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                    lineNumber++;
+                    if (lastWasNewLine && v == null) {
+                        comments.clear();
+                    } else if (v != null) {
+                        values.add(
+                                v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
+                        comments.clear();
+                        v = null;
+                    }
+                    lastWasNewLine = true;
+                } else if (node instanceof AbstractConfigNodeValue) {
+                    lastWasNewLine = false;
+                    if (v != null) {
+                        values.add(
+                                v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
+                        comments.clear();
+                    }
+                    v = parseValue((AbstractConfigNodeValue) node, comments);
+                }
+            }
+            // There shouldn't be any comments at this point, but add them just in case
+            if (v != null) {
+                values.add(v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
+            }
+            arrayCount -= 1;
+            return new SimpleConfigList(arrayOrigin, values);
+        }
+
+        AbstractConfigValue parse() {
+            AbstractConfigValue result = null;
+            ArrayList<String> comments = new ArrayList<>();
+            boolean lastWasNewLine = false;
+            for (AbstractConfigNode node : document.children()) {
+                if (node instanceof ConfigNodeComment) {
+                    comments.add(((ConfigNodeComment) node).commentText());
+                    lastWasNewLine = false;
+                } else if (node instanceof ConfigNodeSingleToken) {
+                    Token t = ((ConfigNodeSingleToken) node).token();
+                    if (Tokens.isNewline(t)) {
+                        lineNumber++;
+                        if (lastWasNewLine && result == null) {
+                            comments.clear();
+                        } else if (result != null) {
+                            result =
+                                    result.withOrigin(
+                                            result.origin()
+                                                    .appendComments(new ArrayList<>(comments)));
+                            comments.clear();
+                            break;
+                        }
+                        lastWasNewLine = true;
+                    }
+                } else if (node instanceof ConfigNodeComplexValue) {
+                    result = parseValue((ConfigNodeComplexValue) node, comments);
+                    lastWasNewLine = false;
+                }
+            }
+            return result;
+        }
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
new file mode 100644
index 0000000..33e8568
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
@@ -0,0 +1,236 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+
+import java.util.Iterator;
+import java.util.List;
+
+final class Path {
+
+    private final String first;
+    private final Path remainder;
+    private static final int DEFAULT_VALUE = 41;
+
+    Path(String first, Path remainder) {
+        this.first = first;
+        this.remainder = remainder;
+    }
+
+    Path(String... elements) {
+        if (elements.length == 0) {
+            throw new ConfigException.BugOrBroken("empty path");
+        }
+        this.first = elements[0];
+        if (elements.length > 1) {
+            PathBuilder pb = new PathBuilder();
+            for (int i = 1; i < elements.length; ++i) {
+                pb.appendKey(elements[i]);
+            }
+            this.remainder = pb.result();
+        } else {
+            this.remainder = null;
+        }
+    }
+
+    // append all the paths in the list together into one path
+    Path(List<Path> pathsToConcat) {
+        this(pathsToConcat.iterator());
+    }
+
+    // append all the paths in the iterator together into one path
+    Path(Iterator<Path> i) {
+        if (!i.hasNext()) {
+            throw new ConfigException.BugOrBroken("empty path");
+        }
+
+        Path firstPath = i.next();
+        this.first = firstPath.first;
+
+        PathBuilder pb = new PathBuilder();
+        if (firstPath.remainder != null) {
+            pb.appendPath(firstPath.remainder);
+        }
+        while (i.hasNext()) {
+            pb.appendPath(i.next());
+        }
+        this.remainder = pb.result();
+    }
+
+    String first() {
+        return first;
+    }
+
+    /** @return path minus the first element or null if no more elements */
+    Path remainder() {
+        return remainder;
+    }
+
+    /** @return path minus the last element or null if we have just one element */
+    Path parent() {
+        if (remainder == null) {
+            return null;
+        }
+
+        PathBuilder pb = new PathBuilder();
+        Path p = this;
+        while (p.remainder != null) {
+            pb.appendKey(p.first);
+            p = p.remainder;
+        }
+        return pb.result();
+    }
+
+    /** @return last element in the path */
+    String last() {
+        Path p = this;
+        while (p.remainder != null) {
+            p = p.remainder;
+        }
+        return p.first;
+    }
+
+    Path prepend(Path toPrepend) {
+        PathBuilder pb = new PathBuilder();
+        pb.appendPath(toPrepend);
+        pb.appendPath(this);
+        return pb.result();
+    }
+
+    int length() {
+        int count = 1;
+        Path p = remainder;
+        while (p != null) {
+            count += 1;
+            p = p.remainder;
+        }
+        return count;
+    }
+
+    Path subPath(int removeFromFront) {
+        int count = removeFromFront;
+        Path p = this;
+        while (p != null && count > 0) {
+            count -= 1;
+            p = p.remainder;
+        }
+        return p;
+    }
+
+    Path subPath(int firstIndex, int lastIndex) {
+        if (lastIndex < firstIndex) {
+            throw new ConfigException.BugOrBroken("bad call to subPath");
+        }
+
+        Path from = subPath(firstIndex);
+        PathBuilder pb = new PathBuilder();
+        int count = lastIndex - firstIndex;
+        while (count > 0) {
+            count -= 1;
+            pb.appendKey(from.first());
+            from = from.remainder();
+            if (from == null) {
+                throw new ConfigException.BugOrBroken(
+                        "subPath lastIndex out of range " + lastIndex);
+            }
+        }
+        return pb.result();
+    }
+
+    boolean startsWith(Path other) {
+        Path myRemainder = this;
+        Path otherRemainder = other;
+        if (otherRemainder.length() <= myRemainder.length()) {
+            while (otherRemainder != null) {
+                if (!otherRemainder.first().equals(myRemainder.first())) {
+                    return false;
+                }
+                myRemainder = myRemainder.remainder();
+                otherRemainder = otherRemainder.remainder();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof Path) {
+            Path that = (Path) other;
+            return this.first.equals(that.first)
+                    && ConfigImplUtil.equalsHandlingNull(this.remainder, that.remainder);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return DEFAULT_VALUE * (DEFAULT_VALUE + first.hashCode())
+                + (remainder == null ? 0 : remainder.hashCode());
+    }
+
+    // this doesn't have a very precise meaning, just to reduce
+    // noise from quotes in the rendered path for average cases
+    static boolean hasFunkyChars(String s) {
+        int length = s.length();
+
+        if (length == 0) {
+            return false;
+        }
+
+        for (int i = 0; i < length; ++i) {
+            char c = s.charAt(i);
+
+            if (Character.isLetterOrDigit(c) || c == '-' || c == '_' || c == '.') {
+                continue;
+            } else {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void appendToStringBuilder(StringBuilder sb) {
+        if (hasFunkyChars(first) || first.isEmpty()) {
+            sb.append(ConfigImplUtil.renderJsonString(first));
+        } else {
+            sb.append(first);
+        }
+        if (remainder != null) {
+            sb.append(ConfigParseOptions.PATH_TOKEN_SEPARATOR);
+            remainder.appendToStringBuilder(sb);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Path(");
+        appendToStringBuilder(sb);
+        sb.append(")");
+        return sb.toString();
+    }
+
+    /**
+     * toString() is a debugging-oriented version while this is an error-message-oriented
+     * human-readable one.
+     */
+    String render() {
+        StringBuilder sb = new StringBuilder();
+        appendToStringBuilder(sb);
+        return sb.toString();
+    }
+
+    static Path newKey(String key) {
+        return new Path(key, null);
+    }
+
+    static Path newPath(String path) {
+        return PathParser.parsePath(path);
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
new file mode 100644
index 0000000..a0cb50b
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
@@ -0,0 +1,312 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+final class PathParser {
+
+    static ConfigOrigin API_ORIGIN = SimpleConfigOrigin.newSimple("path parameter");
+
+    static ConfigNodePath parsePathNode(String path) {
+        return parsePathNode(path, ConfigSyntax.CONF);
+    }
+
+    static ConfigNodePath parsePathNode(String path, ConfigSyntax flavor) {
+        try (StringReader reader = new StringReader(path)) {
+            Iterator<Token> tokens = Tokenizer.tokenize(API_ORIGIN, reader, flavor);
+            tokens.next(); // drop START
+            return parsePathNodeExpression(tokens, API_ORIGIN, path, flavor);
+        }
+    }
+
+    static Path parsePath(String path) {
+        Path speculated = speculativeFastParsePath(path);
+        if (speculated != null) {
+            return speculated;
+        }
+        try (StringReader reader = new StringReader(path)) {
+            Iterator<Token> tokens = Tokenizer.tokenize(API_ORIGIN, reader, ConfigSyntax.CONF);
+            tokens.next(); // drop START
+            return parsePathExpression(tokens, API_ORIGIN, path);
+        }
+    }
+
+    protected static Path parsePathExpression(Iterator<Token> expression, ConfigOrigin origin) {
+        return parsePathExpression(expression, origin, null, null, ConfigSyntax.CONF);
+    }
+
+    protected static Path parsePathExpression(
+            Iterator<Token> expression, ConfigOrigin origin, String originalText) {
+        return parsePathExpression(expression, origin, originalText, null, ConfigSyntax.CONF);
+    }
+
+    protected static ConfigNodePath parsePathNodeExpression(
+            Iterator<Token> expression, ConfigOrigin origin) {
+        return parsePathNodeExpression(expression, origin, null, ConfigSyntax.CONF);
+    }
+
+    protected static ConfigNodePath parsePathNodeExpression(
+            Iterator<Token> expression,
+            ConfigOrigin origin,
+            String originalText,
+            ConfigSyntax flavor) {
+        ArrayList<Token> pathTokens = new ArrayList<>();
+        Path path = parsePathExpression(expression, origin, originalText, pathTokens, flavor);
+        return new ConfigNodePath(path, pathTokens);
+    }
+
+    // originalText may be null if not available
+    protected static Path parsePathExpression(
+            Iterator<Token> expression,
+            ConfigOrigin origin,
+            String originalText,
+            ArrayList<Token> pathTokens,
+            ConfigSyntax flavor) {
+        // each builder in "buf" is an element in the path.
+        List<Element> buf = new ArrayList<>();
+        buf.add(new Element("", false));
+
+        if (!expression.hasNext()) {
+            throw new ConfigException.BadPath(
+                    origin, originalText, "Expecting a field name or path here, but got nothing");
+        }
+
+        while (expression.hasNext()) {
+            Token t = expression.next();
+
+            if (pathTokens != null) {
+                pathTokens.add(t);
+            }
+
+            // Ignore all IgnoredWhitespace tokens
+            if (Tokens.isIgnoredWhitespace(t)) {
+                continue;
+            }
+
+            if (Tokens.isValueWithType(t, ConfigValueType.STRING)) {
+                AbstractConfigValue v = Tokens.getValue(t);
+                // this is a quoted string; so any periods
+                // in here don't count as path separators
+                String s = v.transformToString();
+
+                addPathText(buf, true, s);
+            } else if (t == Tokens.END) {
+                // ignore this; when parsing a file, it should not happen
+                // since we're parsing a token list rather than the main
+                // token iterator, and when parsing a path expression from the
+                // API, it's expected to have an END.
+            } else {
+                // any periods outside of a quoted string count as
+                // separators
+                String text;
+                if (Tokens.isValue(t)) {
+                    // appending a number here may add
+                    // a period, but we _do_ count those as path
+                    // separators, because we basically want
+                    // "foo 3.0bar" to parse as a string even
+                    // though there's a number in it. The fact that
+                    // we tokenize non-string values is largely an
+                    // implementation detail.
+                    AbstractConfigValue v = Tokens.getValue(t);
+
+                    // We need to split the tokens on a . so that we can get sub-paths but still
+                    // preserve
+                    // the original path text when doing an insertion
+                    if (pathTokens != null) {
+                        pathTokens.remove(pathTokens.size() - 1);
+                        pathTokens.addAll(splitTokenOnPeriod(t, flavor));
+                    }
+                    text = v.transformToString();
+                } else if (Tokens.isUnquotedText(t)) {
+                    // We need to split the tokens on a . so that we can get sub-paths but still
+                    // preserve
+                    // the original path text when doing an insertion on ConfigNodeObjects
+                    if (pathTokens != null) {
+                        pathTokens.remove(pathTokens.size() - 1);
+                        pathTokens.addAll(splitTokenOnPeriod(t, flavor));
+                    }
+                    text = Tokens.getUnquotedText(t);
+                } else {
+                    throw new ConfigException.BadPath(
+                            origin,
+                            originalText,
+                            "Token not allowed in path expression: "
+                                    + t
+                                    + " (you can double-quote this token if you really want it here)");
+                }
+
+                addPathText(buf, false, text);
+            }
+        }
+
+        PathBuilder pb = new PathBuilder();
+        for (Element e : buf) {
+            if (e.sb.length() == 0 && !e.canBeEmpty) {
+                throw new ConfigException.BadPath(
+                        origin,
+                        originalText,
+                        "path has a leading, trailing, or two adjacent period '.' (use quoted \"\" empty string if you want an empty element)");
+            } else {
+                pb.appendKey(e.sb.toString());
+            }
+        }
+
+        return pb.result();
+    }
+
+    private static Collection<Token> splitTokenOnPeriod(Token t, ConfigSyntax flavor) {
+
+        String tokenText = t.tokenText();
+        if (tokenText.equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
+            return Collections.singletonList(t);
+        }
+        String[] splitToken = tokenText.split(ConfigParseOptions.PATH_TOKEN_SEPARATOR);
+        ArrayList<Token> splitTokens = new ArrayList<>();
+        for (String s : splitToken) {
+            if (flavor == ConfigSyntax.CONF) {
+                splitTokens.add(Tokens.newUnquotedText(t.origin(), s));
+            } else {
+                splitTokens.add(Tokens.newString(t.origin(), s, "\"" + s + "\""));
+            }
+            splitTokens.add(
+                    Tokens.newUnquotedText(t.origin(), ConfigParseOptions.PATH_TOKEN_SEPARATOR));
+        }
+
+        if (!tokenText.startsWith(
+                ConfigParseOptions.PATH_TOKEN_SEPARATOR,
+                tokenText.length() - ConfigParseOptions.PATH_TOKEN_SEPARATOR.length())) {
+            splitTokens.remove(splitTokens.size() - 1);
+        }
+
+        return splitTokens;
+    }
+
+    private static void addPathText(List<Element> buf, boolean wasQuoted, String newText) {
+
+        int i = wasQuoted ? -1 : newText.indexOf(ConfigParseOptions.PATH_TOKEN_SEPARATOR);
+        Element current = buf.get(buf.size() - 1);
+        if (i < 0) {
+            // add to current path element
+            current.sb.append(newText);
+            // any empty quoted string means this element can
+            // now be empty.
+            if (wasQuoted && current.sb.length() == 0) {
+                current.canBeEmpty = true;
+            }
+        } else {
+            // "buf" plus up to the period is an element
+            current.sb.append(newText, 0, i);
+            // then start a new element
+            buf.add(new Element("", false));
+            // recurse to consume remainder of newText
+            addPathText(
+                    buf,
+                    false,
+                    newText.substring(i + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length()));
+        }
+    }
+
+    // the idea is to see if the string has any chars or features
+    // that might require the full parser to deal with.
+    private static boolean looksUnsafeForFastParser(String s) {
+        // TODO: maybe we should rewrite this function using ConfigParseOptions.pathTokenSeparator
+        boolean lastWasDot = true; // start of path is also a "dot"
+        int len = s.length();
+        if (s.isEmpty()) {
+            return true;
+        }
+        if (s.charAt(0) == '.') {
+            return true;
+        }
+        if (s.charAt(len - 1) == '.') {
+            return true;
+        }
+
+        for (int i = 0; i < len; ++i) {
+            char c = s.charAt(i);
+            if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_') {
+                lastWasDot = false;
+            } else if (c == '.') {
+                if (lastWasDot) {
+                    return true; // ".." means we need to throw an error
+                }
+                lastWasDot = true;
+            } else if (c == '-') {
+                if (lastWasDot) {
+                    return true;
+                }
+            } else {
+                return true;
+            }
+        }
+
+        if (lastWasDot) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private static Path fastPathBuild(Path tail, String s, int end) {
+
+        // lastIndexOf takes last index it should look at, end - 1 not end
+        int splitAt = s.lastIndexOf(ConfigParseOptions.PATH_TOKEN_SEPARATOR, end - 1);
+        ArrayList<Token> tokens = new ArrayList<>();
+        tokens.add(Tokens.newUnquotedText(null, s));
+        // this works even if splitAt is -1; then we start the substring at 0
+
+        if (splitAt < 0) {
+            Path withOneMoreElement = new Path(s.substring(0, end), tail);
+            return withOneMoreElement;
+        } else {
+            Path withOneMoreElement =
+                    new Path(
+                            s.substring(
+                                    splitAt + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length(),
+                                    end),
+                            tail);
+            return fastPathBuild(withOneMoreElement, s, splitAt);
+        }
+    }
+
+    // do something much faster than the full parser if
+    // we just have something like "foo" or "foo.bar"
+    private static Path speculativeFastParsePath(String path) {
+        String s = ConfigImplUtil.unicodeTrim(path);
+        if (looksUnsafeForFastParser(s)) {
+            return null;
+        }
+
+        return fastPathBuild(null, s, s.length());
+    }
+
+    static class Element {
+        StringBuilder sb;
+        // an element can be empty if it has a quoted empty string "" in it
+        boolean canBeEmpty;
+
+        Element(String initial, boolean canBeEmpty) {
+            this.canBeEmpty = canBeEmpty;
+            this.sb = new StringBuilder(initial);
+        }
+
+        @Override
+        public String toString() {
+            return "Element(" + sb.toString() + "," + canBeEmpty + ")";
+        }
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java
new file mode 100644
index 0000000..3cfdb7d
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java
@@ -0,0 +1,217 @@
+/** Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com> */
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+final class PropertiesParser {
+    static AbstractConfigObject parse(Reader reader, ConfigOrigin origin) throws IOException {
+        Properties props = new Properties();
+        props.load(reader);
+        return fromProperties(origin, props);
+    }
+
+    static String lastElement(String path) {
+        int i = path.lastIndexOf('.');
+        if (i < 0) return path;
+        else return path.substring(i + 1);
+    }
+
+    static String exceptLastElement(String path) {
+        int i = path.lastIndexOf('.');
+        if (i < 0) return null;
+        else return path.substring(0, i);
+    }
+
+    static Path pathFromPropertyKey(String key) {
+        String last = lastElement(key);
+        String exceptLast = exceptLastElement(key);
+        Path path = new Path(last, null);
+        while (exceptLast != null) {
+            last = lastElement(exceptLast);
+            exceptLast = exceptLastElement(exceptLast);
+            path = new Path(last, path);
+        }
+        return path;
+    }
+
+    static AbstractConfigObject fromProperties(ConfigOrigin origin, Properties props) {
+        return fromEntrySet(origin, props.entrySet());
+    }
+
+    private static <K, V> AbstractConfigObject fromEntrySet(
+            ConfigOrigin origin, Set<Map.Entry<K, V>> entries) {
+        final Map<Path, Object> pathMap = getPathMap(entries);
+        return fromPathMap(origin, pathMap, true /* from properties */);
+    }
+
+    private static <K, V> Map<Path, Object> getPathMap(Set<Map.Entry<K, V>> entries) {
+        Map<Path, Object> pathMap = new LinkedHashMap<>();
+        System.getProperties()
+                .forEach(
+                        (key, value) -> {
+                            if (key instanceof String) {
+                                Path path = pathFromPropertyKey((String) key);
+                                pathMap.put(path, value);
+                            }
+                        });
+        for (Map.Entry<K, V> entry : entries) {
+            Object key = entry.getKey();
+            if (key instanceof String) {
+                Path path = pathFromPropertyKey((String) key);
+                pathMap.put(path, entry.getValue());
+            }
+        }
+        return pathMap;
+    }
+
+    static AbstractConfigObject fromStringMap(ConfigOrigin origin, Map<String, String> stringMap) {
+        return fromEntrySet(origin, stringMap.entrySet());
+    }
+
+    static AbstractConfigObject fromPathMap(ConfigOrigin origin, Map<?, ?> pathExpressionMap) {
+        Map<Path, Object> pathMap = new LinkedHashMap<>();
+        for (Map.Entry<?, ?> entry : pathExpressionMap.entrySet()) {
+            Object keyObj = entry.getKey();
+            if (!(keyObj instanceof String)) {
+                throw new ConfigException.BugOrBroken(
+                        "Map has a non-string as a key, expecting a path expression as a String");
+            }
+            Path path = Path.newPath((String) keyObj);
+            pathMap.put(path, entry.getValue());
+        }
+        return fromPathMap(origin, pathMap, false /* from properties */);
+    }
+
+    private static AbstractConfigObject fromPathMap(
+            ConfigOrigin origin, Map<Path, Object> pathMap, boolean convertedFromProperties) {
+        /*
+         * First, build a list of paths that will have values, either string or
+         * object values.
+         */
+        Set<Path> scopePaths = new LinkedHashSet<>();
+        Set<Path> valuePaths = new LinkedHashSet<>();
+        for (Path path : pathMap.keySet()) {
+            // add value's path
+            valuePaths.add(path);
+
+            // all parent paths are objects
+            Path next = path.parent();
+            while (next != null) {
+                scopePaths.add(next);
+                next = next.parent();
+            }
+        }
+
+        if (convertedFromProperties) {
+            /*
+             * If any string values are also objects containing other values,
+             * drop those string values - objects "win".
+             */
+            valuePaths.removeAll(scopePaths);
+        } else {
+            /* If we didn't start out as properties, then this is an error. */
+            for (Path path : valuePaths) {
+                if (scopePaths.contains(path)) {
+                    throw new ConfigException.BugOrBroken(
+                            "In the map, path '"
+                                    + path.render()
+                                    + "' occurs as both the parent object of a value and as a value. "
+                                    + "Because Map has no defined ordering, this is a broken situation.");
+                }
+            }
+        }
+
+        /*
+         * Create maps for the object-valued values.
+         */
+        Map<String, AbstractConfigValue> root = new LinkedHashMap<>();
+        Map<Path, Map<String, AbstractConfigValue>> scopes = new LinkedHashMap<>();
+
+        for (Path path : scopePaths) {
+            Map<String, AbstractConfigValue> scope = new LinkedHashMap<>();
+            scopes.put(path, scope);
+        }
+
+        /* Store string values in the associated scope maps */
+        for (Path path : valuePaths) {
+            Path parentPath = path.parent();
+            Map<String, AbstractConfigValue> parent =
+                    parentPath != null ? scopes.get(parentPath) : root;
+
+            String last = path.last();
+            Object rawValue = pathMap.get(path);
+            AbstractConfigValue value;
+            if (convertedFromProperties) {
+                if (rawValue instanceof String) {
+                    if (((String) rawValue).startsWith("[") && ((String) rawValue).endsWith("]")) {
+                        List<String> list =
+                                Arrays.asList(
+                                        ((String) rawValue)
+                                                .substring(1, ((String) rawValue).length() - 1)
+                                                .split(","));
+                        value = ConfigImpl.fromAnyRef(list, origin, FromMapMode.KEYS_ARE_PATHS);
+                    } else {
+                        value = new ConfigString.Quoted(origin, (String) rawValue);
+                    }
+
+                } else {
+                    // silently ignore non-string values in Properties
+                    value = null;
+                }
+            } else {
+                value =
+                        ConfigImpl.fromAnyRef(
+                                pathMap.get(path), origin, FromMapMode.KEYS_ARE_PATHS);
+            }
+            if (value != null) parent.put(last, value);
+        }
+
+        /*
+         * Make a list of scope paths from longest to shortest, so children go
+         * before parents.
+         */
+        List<Path> sortedScopePaths = new ArrayList<>(scopePaths);
+        // sort descending by length
+        sortedScopePaths.sort(
+                (a, b) -> {
+                    // Path.length() is O(n) so in theory this sucks
+                    // but in practice we can make Path precompute length
+                    // if it ever matters.
+                    return b.length() - a.length();
+                });
+
+        /*
+         * Create ConfigObject for each scope map, working from children to
+         * parents to avoid modifying any already-created ConfigObject. This is
+         * where we need the sorted list.
+         */
+        for (Path scopePath : sortedScopePaths) {
+            Map<String, AbstractConfigValue> scope = scopes.get(scopePath);
+
+            Path parentPath = scopePath.parent();
+            Map<String, AbstractConfigValue> parent =
+                    parentPath != null ? scopes.get(parentPath) : root;
+
+            AbstractConfigObject o =
+                    new SimpleConfigObject(
+                            origin, scope, ResolveStatus.RESOLVED, false /* ignoresFallbacks */);
+            parent.put(scopePath.last(), o);
+        }
+
+        // return root config object
+        return new SimpleConfigObject(
+                origin, root, ResolveStatus.RESOLVED, false /* ignoresFallbacks */);
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
new file mode 100644
index 0000000..735df68
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
@@ -0,0 +1,644 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class SimpleConfigObject extends AbstractConfigObject implements Serializable {
+    private static final long serialVersionUID = 2L;
+    private final Map<String, AbstractConfigValue> value;
+    private final boolean resolved;
+    private final boolean ignoresFallbacks;
+    private static final SimpleConfigObject EMPTY_INSTANCE =
+            empty(SimpleConfigOrigin.newSimple("empty config"));
+    private static final int HASH_CODE = 41;
+
+    SimpleConfigObject(
+            ConfigOrigin origin,
+            Map<String, AbstractConfigValue> value,
+            ResolveStatus status,
+            boolean ignoresFallbacks) {
+        super(origin);
+        if (value == null) {
+            throw new ConfigException.BugOrBroken("creating config object with null map");
+        } else {
+            this.value = value;
+            this.resolved = status == ResolveStatus.RESOLVED;
+            this.ignoresFallbacks = ignoresFallbacks;
+            if (status != ResolveStatus.fromValues(value.values())) {
+                throw new ConfigException.BugOrBroken("Wrong resolved status on " + this);
+            }
+        }
+    }
+
+    SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue> value) {
+        this(origin, value, ResolveStatus.fromValues(value.values()), false);
+    }
+
+    public SimpleConfigObject withOnlyKey(String key) {
+        return this.withOnlyPath(Path.newKey(key));
+    }
+
+    public SimpleConfigObject withoutKey(String key) {
+        return this.withoutPath(Path.newKey(key));
+    }
+
+    protected SimpleConfigObject withOnlyPathOrNull(Path path) {
+        String key = path.first();
+        Path next = path.remainder();
+        AbstractConfigValue v = this.value.get(key);
+        if (next != null) {
+            if (v instanceof AbstractConfigObject) {
+                v = ((AbstractConfigObject) v).withOnlyPathOrNull(next);
+            } else {
+                v = null;
+            }
+        }
+
+        return v == null
+                ? null
+                : new SimpleConfigObject(
+                        this.origin(),
+                        Collections.singletonMap(key, v),
+                        v.resolveStatus(),
+                        this.ignoresFallbacks);
+    }
+
+    SimpleConfigObject withOnlyPath(Path path) {
+        SimpleConfigObject o = this.withOnlyPathOrNull(path);
+        return o == null
+                ? new SimpleConfigObject(
+                        this.origin(),
+                        Collections.emptyMap(),
+                        ResolveStatus.RESOLVED,
+                        this.ignoresFallbacks)
+                : o;
+    }
+
+    SimpleConfigObject withoutPath(Path path) {
+        String key = path.first();
+        Path next = path.remainder();
+        AbstractConfigValue v = this.value.get(key);
+        HashMap<String, AbstractConfigValue> smaller;
+        if (next != null && v instanceof AbstractConfigObject) {
+            v = ((AbstractConfigObject) v).withoutPath(next);
+            smaller = new LinkedHashMap<>(this.value);
+            smaller.put(key, v);
+            return new SimpleConfigObject(
+                    this.origin(),
+                    smaller,
+                    ResolveStatus.fromValues(smaller.values()),
+                    this.ignoresFallbacks);
+        } else if (next == null && v != null) {
+            smaller = new LinkedHashMap<>(this.value.size() - 1);
+
+            for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                    this.value.entrySet()) {
+                if (!stringAbstractConfigValueEntry.getKey().equals(key)) {
+                    smaller.put(
+                            stringAbstractConfigValueEntry.getKey(),
+                            stringAbstractConfigValueEntry.getValue());
+                }
+            }
+
+            return new SimpleConfigObject(
+                    this.origin(),
+                    smaller,
+                    ResolveStatus.fromValues(smaller.values()),
+                    this.ignoresFallbacks);
+        } else {
+            return this;
+        }
+    }
+
+    public SimpleConfigObject withValue(String key, ConfigValue v) {
+        if (v == null) {
+            throw new ConfigException.BugOrBroken(
+                    "Trying to store null ConfigValue in a ConfigObject");
+        } else {
+            Map newMap;
+            if (this.value.isEmpty()) {
+                newMap = Collections.singletonMap(key, (AbstractConfigValue) v);
+            } else {
+                newMap = new LinkedHashMap<>(this.value);
+                newMap.put(key, v);
+            }
+
+            return new SimpleConfigObject(
+                    this.origin(),
+                    newMap,
+                    ResolveStatus.fromValues(newMap.values()),
+                    this.ignoresFallbacks);
+        }
+    }
+
+    SimpleConfigObject withValue(Path path, ConfigValue v) {
+        String key = path.first();
+        Path next = path.remainder();
+        if (next == null) {
+            return this.withValue(key, v);
+        } else {
+            AbstractConfigValue child = this.value.get(key);
+            if (child instanceof AbstractConfigObject) {
+                return this.withValue(key, ((AbstractConfigObject) child).withValue(next, v));
+            } else {
+                SimpleConfig subtree =
+                        ((AbstractConfigValue) v)
+                                .atPath(
+                                        SimpleConfigOrigin.newSimple(
+                                                "withValue(" + next.render() + ")"),
+                                        next);
+                return this.withValue(key, subtree.root());
+            }
+        }
+    }
+
+    protected AbstractConfigValue attemptPeekWithPartialResolve(String key) {
+        return this.value.get(key);
+    }
+
+    private SimpleConfigObject newCopy(
+            ResolveStatus newStatus, ConfigOrigin newOrigin, boolean newIgnoresFallbacks) {
+        return new SimpleConfigObject(newOrigin, this.value, newStatus, newIgnoresFallbacks);
+    }
+
+    protected SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin newOrigin) {
+        return this.newCopy(newStatus, newOrigin, this.ignoresFallbacks);
+    }
+
+    protected SimpleConfigObject withFallbacksIgnored() {
+        return this.ignoresFallbacks
+                ? this
+                : this.newCopy(this.resolveStatus(), this.origin(), true);
+    }
+
+    ResolveStatus resolveStatus() {
+        return ResolveStatus.fromBoolean(this.resolved);
+    }
+
+    public SimpleConfigObject replaceChild(
+            AbstractConfigValue child, AbstractConfigValue replacement) {
+        Map<String, AbstractConfigValue> newChildren = new LinkedHashMap<>(this.value);
+        Iterator<Entry<String, AbstractConfigValue>> var4 = newChildren.entrySet().iterator();
+
+        Entry<String, AbstractConfigValue> old;
+        do {
+            if (!var4.hasNext()) {
+                throw new ConfigException.BugOrBroken(
+                        "SimpleConfigObject.replaceChild did not find " + child + " in " + this);
+            }
+
+            old = var4.next();
+        } while (old.getValue() != child);
+
+        if (replacement != null) {
+            old.setValue(replacement);
+        } else {
+            newChildren.remove(old.getKey());
+        }
+
+        return new SimpleConfigObject(
+                this.origin(),
+                newChildren,
+                ResolveStatus.fromValues(newChildren.values()),
+                this.ignoresFallbacks);
+    }
+
+    public boolean hasDescendant(AbstractConfigValue descendant) {
+        Iterator<AbstractConfigValue> var2 = this.value.values().iterator();
+
+        AbstractConfigValue child;
+        do {
+            if (!var2.hasNext()) {
+                var2 = this.value.values().iterator();
+
+                do {
+                    if (!var2.hasNext()) {
+                        return false;
+                    }
+
+                    child = var2.next();
+                } while (!(child instanceof Container)
+                        || !((Container) child).hasDescendant(descendant));
+
+                return true;
+            }
+
+            child = var2.next();
+        } while (child != descendant);
+
+        return true;
+    }
+
+    protected boolean ignoresFallbacks() {
+        return this.ignoresFallbacks;
+    }
+
+    public Map<String, Object> unwrapped() {
+        Map<String, Object> m = new LinkedHashMap<>();
+
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                this.value.entrySet()) {
+            m.put(
+                    stringAbstractConfigValueEntry.getKey(),
+                    stringAbstractConfigValueEntry.getValue().unwrapped());
+        }
+
+        return m;
+    }
+
+    protected SimpleConfigObject mergedWithObject(AbstractConfigObject abstractFallback) {
+        this.requireNotIgnoringFallbacks();
+        if (!(abstractFallback instanceof SimpleConfigObject)) {
+            throw new ConfigException.BugOrBroken(
+                    "should not be reached (merging non-SimpleConfigObject)");
+        } else {
+            SimpleConfigObject fallback = (SimpleConfigObject) abstractFallback;
+            boolean changed = false;
+            boolean allResolved = true;
+            Map<String, AbstractConfigValue> merged = new LinkedHashMap<>();
+            Set<String> allKeys = new HashSet<>();
+            allKeys.addAll(this.keySet());
+            allKeys.addAll(fallback.keySet());
+
+            for (String key : allKeys) {
+                AbstractConfigValue first = this.value.get(key);
+                AbstractConfigValue second = fallback.value.get(key);
+                AbstractConfigValue kept;
+                if (first == null) {
+                    kept = second;
+                } else if (second == null) {
+                    kept = first;
+                } else {
+                    kept = first.withFallback(second);
+                }
+
+                merged.put(key, kept);
+                if (first != kept) {
+                    changed = true;
+                }
+
+                if (kept.resolveStatus() == ResolveStatus.UNRESOLVED) {
+                    allResolved = false;
+                }
+            }
+
+            ResolveStatus newResolveStatus = ResolveStatus.fromBoolean(allResolved);
+            boolean newIgnoresFallbacks = fallback.ignoresFallbacks();
+            if (changed) {
+                return new SimpleConfigObject(
+                        mergeOrigins(this, fallback),
+                        merged,
+                        newResolveStatus,
+                        newIgnoresFallbacks);
+            } else if (newResolveStatus == this.resolveStatus()
+                    && newIgnoresFallbacks == this.ignoresFallbacks()) {
+                return this;
+            } else {
+                return this.newCopy(newResolveStatus, this.origin(), newIgnoresFallbacks);
+            }
+        }
+    }
+
+    private SimpleConfigObject modify(NoExceptionsModifier modifier) {
+        try {
+            return this.modifyMayThrow(modifier);
+        } catch (RuntimeException var3) {
+            throw var3;
+        } catch (Exception var4) {
+            throw new ConfigException.BugOrBroken("unexpected checked exception", var4);
+        }
+    }
+
+    private SimpleConfigObject modifyMayThrow(Modifier modifier) throws Exception {
+        Map<String, AbstractConfigValue> changes = null;
+
+        for (String k : this.keySet()) {
+            AbstractConfigValue v = this.value.get(k);
+            AbstractConfigValue modified = modifier.modifyChildMayThrow(k, v);
+            if (modified != v) {
+                if (changes == null) {
+                    changes = new LinkedHashMap<>();
+                }
+
+                changes.put(k, modified);
+            }
+        }
+
+        if (changes == null) {
+            return this;
+        } else {
+            Map<String, AbstractConfigValue> modified = new LinkedHashMap<>();
+            boolean sawUnresolved = false;
+
+            for (String k : this.keySet()) {
+                AbstractConfigValue newValue;
+                if (changes.containsKey(k)) {
+                    newValue = changes.get(k);
+                    if (newValue != null) {
+                        modified.put(k, newValue);
+                        if (newValue.resolveStatus() == ResolveStatus.UNRESOLVED) {
+                            sawUnresolved = true;
+                        }
+                    }
+                } else {
+                    newValue = this.value.get(k);
+                    modified.put(k, newValue);
+                    if (newValue.resolveStatus() == ResolveStatus.UNRESOLVED) {
+                        sawUnresolved = true;
+                    }
+                }
+            }
+
+            return new SimpleConfigObject(
+                    this.origin(),
+                    modified,
+                    sawUnresolved ? ResolveStatus.UNRESOLVED : ResolveStatus.RESOLVED,
+                    this.ignoresFallbacks());
+        }
+    }
+
+    ResolveResult<? extends AbstractConfigObject> resolveSubstitutions(
+            ResolveContext context, ResolveSource source) throws NotPossibleToResolve {
+        if (this.resolveStatus() == ResolveStatus.RESOLVED) {
+            return ResolveResult.make(context, this);
+        } else {
+            ResolveSource sourceWithParent = source.pushParent(this);
+
+            try {
+                SimpleConfigObject.ResolveModifier modifier =
+                        new SimpleConfigObject.ResolveModifier(context, sourceWithParent);
+                AbstractConfigValue value = this.modifyMayThrow(modifier);
+                return ResolveResult.make(modifier.context, value).asObjectResult();
+            } catch (NotPossibleToResolve | RuntimeException var6) {
+                throw var6;
+            } catch (Exception var8) {
+                throw new ConfigException.BugOrBroken("unexpected checked exception", var8);
+            }
+        }
+    }
+
+    SimpleConfigObject relativized(final Path prefix) {
+        return this.modify(
+                new NoExceptionsModifier() {
+                    public AbstractConfigValue modifyChild(String key, AbstractConfigValue v) {
+                        return v.relativized(prefix);
+                    }
+                });
+    }
+
+    protected void render(
+            StringBuilder sb, int indent, boolean atRoot, ConfigRenderOptions options) {
+        if (this.isEmpty()) {
+            sb.append("{}");
+        } else {
+            boolean outerBraces = options.getJson() || !atRoot;
+            int innerIndent;
+            if (outerBraces) {
+                innerIndent = indent + 1;
+                sb.append("{");
+                if (options.getFormatted()) {
+                    sb.append('\n');
+                }
+            } else {
+                innerIndent = indent;
+            }
+
+            int separatorCount = 0;
+            String[] keys = this.keySet().toArray(new String[0]);
+
+            for (String k : keys) {
+                AbstractConfigValue v = this.value.get(k);
+                if (options.getOriginComments()) {
+                    String[] lines = v.origin().description().split("\n");
+
+                    for (String l : lines) {
+                        indent(sb, indent + 1, options);
+                        sb.append('#');
+                        if (!l.isEmpty()) {
+                            sb.append(' ');
+                        }
+
+                        sb.append(l);
+                        sb.append("\n");
+                    }
+                }
+
+                if (options.getComments()) {
+
+                    for (String comment : v.origin().comments()) {
+                        indent(sb, innerIndent, options);
+                        sb.append("#");
+                        if (!comment.startsWith(" ")) {
+                            sb.append(' ');
+                        }
+
+                        sb.append(comment);
+                        sb.append("\n");
+                    }
+                }
+
+                indent(sb, innerIndent, options);
+                v.render(sb, innerIndent, false, k, options);
+                if (options.getFormatted()) {
+                    if (options.getJson()) {
+                        sb.append(",");
+                        separatorCount = 2;
+                    } else {
+                        separatorCount = 1;
+                    }
+
+                    sb.append('\n');
+                } else {
+                    sb.append(",");
+                    separatorCount = 1;
+                }
+            }
+
+            sb.setLength(sb.length() - separatorCount);
+            if (outerBraces) {
+                if (options.getFormatted()) {
+                    sb.append('\n');
+                    indent(sb, indent, options);
+                }
+
+                sb.append("}");
+            }
+        }
+
+        if (atRoot && options.getFormatted()) {
+            sb.append('\n');
+        }
+    }
+
+    public AbstractConfigValue get(Object key) {
+        return this.value.get(key);
+    }
+
+    private static boolean mapEquals(Map<String, ConfigValue> a, Map<String, ConfigValue> b) {
+        if (a == b) {
+            return true;
+        } else {
+            Set<String> aKeys = a.keySet();
+            Set<String> bKeys = b.keySet();
+            if (aKeys.equals(bKeys)) {
+                Iterator<String> var4 = aKeys.iterator();
+
+                String key;
+                do {
+                    if (!var4.hasNext()) {
+                        return true;
+                    }
+
+                    key = var4.next();
+                } while (a.get(key).equals(b.get(key)));
+            }
+            return false;
+        }
+    }
+
+    @SuppressWarnings("magicnumber")
+    private static int mapHash(Map<String, ConfigValue> m) {
+        List<String> keys = new ArrayList<>(m.keySet());
+        Collections.sort(keys);
+        int valuesHash = 0;
+
+        String k;
+        for (Iterator<String> var3 = keys.iterator();
+                var3.hasNext();
+                valuesHash += m.get(k).hashCode()) {
+            k = var3.next();
+        }
+
+        return HASH_CODE * (HASH_CODE + keys.hashCode()) + valuesHash;
+    }
+
+    protected boolean canEqual(Object other) {
+        return other instanceof ConfigObject;
+    }
+
+    public boolean equals(Object other) {
+        if (!(other instanceof ConfigObject)) {
+            return false;
+        } else {
+            return this.canEqual(other) && mapEquals(this, (ConfigObject) other);
+        }
+    }
+
+    public int hashCode() {
+        return mapHash(this);
+    }
+
+    public boolean containsKey(Object key) {
+        return this.value.containsKey(key);
+    }
+
+    public Set<String> keySet() {
+        return this.value.keySet();
+    }
+
+    public boolean containsValue(Object v) {
+        return this.value.containsValue(v);
+    }
+
+    public Set<Entry<String, ConfigValue>> entrySet() {
+        HashSet<Entry<String, ConfigValue>> entries = new HashSet<>();
+
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                this.value.entrySet()) {
+            entries.add(
+                    new AbstractMap.SimpleImmutableEntry<>(
+                            stringAbstractConfigValueEntry.getKey(),
+                            stringAbstractConfigValueEntry.getValue()));
+        }
+
+        return entries;
+    }
+
+    public boolean isEmpty() {
+        return this.value.isEmpty();
+    }
+
+    public int size() {
+        return this.value.size();
+    }
+
+    public Collection<ConfigValue> values() {
+        return new HashSet<>(this.value.values());
+    }
+
+    static SimpleConfigObject empty() {
+        return EMPTY_INSTANCE;
+    }
+
+    static SimpleConfigObject empty(ConfigOrigin origin) {
+        return origin == null ? empty() : new SimpleConfigObject(origin, Collections.emptyMap());
+    }
+
+    static SimpleConfigObject emptyMissing(ConfigOrigin baseOrigin) {
+        return new SimpleConfigObject(
+                SimpleConfigOrigin.newSimple(baseOrigin.description() + " (not found)"),
+                Collections.emptyMap());
+    }
+
+    private Object writeReplace() throws ObjectStreamException {
+        return new SerializedConfigValue(this);
+    }
+
+    private static final class ResolveModifier implements Modifier {
+        final Path originalRestrict;
+        ResolveContext context;
+        final ResolveSource source;
+
+        ResolveModifier(ResolveContext context, ResolveSource source) {
+            this.context = context;
+            this.source = source;
+            this.originalRestrict = context.restrictToChild();
+        }
+
+        public AbstractConfigValue modifyChildMayThrow(String key, AbstractConfigValue v)
+                throws NotPossibleToResolve {
+            if (this.context.isRestrictedToChild()) {
+                if (key.equals(this.context.restrictToChild().first())) {
+                    Path remainder = this.context.restrictToChild().remainder();
+                    if (remainder != null) {
+                        ResolveResult<? extends AbstractConfigValue> result =
+                                this.context.restrict(remainder).resolve(v, this.source);
+                        this.context =
+                                result.context.unrestricted().restrict(this.originalRestrict);
+                        return result.value;
+                    } else {
+                        return v;
+                    }
+                } else {
+                    return v;
+                }
+            } else {
+                ResolveResult<? extends AbstractConfigValue> result =
+                        this.context.unrestricted().resolve(v, this.source);
+                this.context = result.context.unrestricted().restrict(this.originalRestrict);
+                return result.value;
+            }
+        }
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java
new file mode 100644
index 0000000..63e4a51
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CompleteTest {
+
+    @Test
+    public void testVariables() throws URISyntaxException {
+        // We use a map to mock the system property, since the system property will be only loaded
+        // once
+        // after the test is run. see Issue #1670
+        Map<String, String> systemProperties = new HashMap<>();
+        systemProperties.put("dt", "20190318");
+        systemProperties.put("city2", "shanghai");
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/seatunnel/variables.conf"))
+                        .resolveWith(
+                                ConfigFactory.parseMap(systemProperties),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        String sql1 = config.getConfigList("transform").get(1).getString("sql");
+        String sql2 = config.getConfigList("transform").get(2).getString("sql");
+
+        Assertions.assertTrue(sql1.contains("shanghai"));
+        Assertions.assertTrue(sql2.contains("20190318"));
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java
new file mode 100644
index 0000000..2a90bef
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigFactoryTest {
+
+    @Test
+    public void testBasicParseAppConf() throws URISyntaxException {
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+
+        Assertions.assertTrue(config.hasPath("env"));
+        Assertions.assertTrue(config.hasPath("source"));
+        Assertions.assertTrue(config.hasPath("transform"));
+        Assertions.assertTrue(config.hasPath("sink"));
+
+        // check evn config
+        Config env = config.getConfig("env");
+        Assertions.assertEquals("SeaTunnel", env.getString("spark.app.name"));
+        Assertions.assertEquals("2", env.getString("spark.executor.instances"));
+        Assertions.assertEquals("1", env.getString("spark.executor.cores"));
+        Assertions.assertEquals("1g", env.getString("spark.executor.memory"));
+        Assertions.assertEquals("5", env.getString("spark.stream.batchDuration"));
+
+        // check custom plugin
+        Assertions.assertEquals(
+                "c.Console", config.getConfigList("sink").get(1).getString("plugin_name"));
+    }
+
+    @Test
+    public void testTransformOrder() throws URISyntaxException {
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+
+        String[] pluginNames = {"split", "sql1", "sql2", "sql3", "json"};
+
+        List<? extends Config> transforms = config.getConfigList("transform");
+        Assertions.assertEquals(pluginNames.length, transforms.size());
+
+        for (int i = 0; i < transforms.size(); i++) {
+            String parsedPluginName =
+                    String.valueOf(transforms.get(i).root().get("plugin_name").unwrapped());
+            Assertions.assertEquals(pluginNames[i], parsedPluginName);
+        }
+    }
+
+    @Test
+    public void testQuotedString() throws URISyntaxException {
+        List<String> keys =
+                Arrays.asList(
+                        "spark.app.name",
+                        "spark.executor.instances",
+                        "spark.executor.cores",
+                        "spark.executor.memory",
+                        "spark.stream.batchDuration");
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+        Config evnConfig = config.getConfig("env");
+        evnConfig.entrySet().forEach(entry -> Assertions.assertTrue(keys.contains(entry.getKey())));
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java
new file mode 100644
index 0000000..6cd3c1a
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+
+public class JsonFormatTest {
+
+    @Test
+    public void testJsonFormat() throws URISyntaxException {
+
+        Config json =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/json/spark.batch.json"))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/json/spark.batch.conf"))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+        Assertions.assertEquals(config.atPath("transform"), json.atPath("transform"));
+        Assertions.assertEquals(config.atPath("sink"), json.atPath("sink"));
+        Assertions.assertEquals(config.atPath("source"), json.atPath("source"));
+        Assertions.assertEquals(config.atPath("env"), json.atPath("env"));
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
new file mode 100644
index 0000000..457908e
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** Test if {@link Config} can be serialized. */
+public class SerializeTest {
+
+    @Test
+    void testSerialize(@TempDir Path tempDir)
+            throws URISyntaxException, IOException, ClassNotFoundException {
+        Config config =
+                ConfigFactory.parseFile(
+                        FileUtils.getFileFromResources("/seatunnel/serialize.conf"));
+        Path path = tempDir.resolve("test.config.ser");
+        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path));
+        objectOutputStream.writeObject(config);
+        objectOutputStream.close();
+        ObjectInputStream in = new ObjectInputStream(Files.newInputStream(path));
+        in.readObject();
+        in.close();
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
new file mode 100644
index 0000000..adfbc20
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config.utils;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public final class FileUtils {
+
+    private FileUtils() {}
+
+    // get file from classpath, resources folder
+    public static File getFileFromResources(String fileName) throws URISyntaxException {
+        URL resource = FileUtils.class.getResource(fileName);
+        if (resource == null) {
+            throw new IllegalArgumentException("file is not found!");
+        }
+        return Paths.get(resource.toURI()).toFile();
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/factory/config.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/factory/config.conf
new file mode 100644
index 0000000..af8c82b
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/factory/config.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in seatunnel config
+######
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  "spark.executor.cores" = 1
+  "spark.executor.memory" = "1g"
+  "spark.stream.batchDuration" = 5
+}
+
+source {
+
+  fakeStream {
+    content = ["Hello World, SeaTunnel"]
+  }
+
+}
+
+transform {
+
+  split {
+    fields = ["msg", "name"]
+    delimiter = ","
+  }
+
+  sql1 {
+    sql = "sql1"
+  }
+
+  sql2 {
+    sql = "sql2"
+  }
+
+  sql3 {
+    sql = "sql3"
+  }
+
+  json {
+    sql = "sql3"
+  }
+
+}
+
+sink {
+  Console {}
+  c.Console {}
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.conf
new file mode 100644
index 0000000..27ad42b
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in SeaTunnel config
+######
+
+env {
+  # You can set spark configuration here
+  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+}
+
+source {
+  # This is a example input plugin **only for test and demonstrate the feature input plugin**
+  Fake {
+    result_table_name = "my_dataset"
+  }
+
+  # You can also use other input plugins, such as hdfs
+  # hdfs {
+  #   result_table_name = "accesslog"
+  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog"
+  #   format = "json"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+}
+
+transform {
+  # split data by specific delimiter
+
+  # you can also use other transform plugins, such as sql
+  # sql {
+  #   sql = "select * from accesslog where request_time > 1000"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Console {}
+
+  # you can also you other output plugins, such as sql
+  # hdfs {
+  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
+  #   save_mode = "append"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.json b/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.json
new file mode 100644
index 0000000..f0f68ae
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/json/spark.batch.json
@@ -0,0 +1,20 @@
+{
+  "env" : {
+    "spark.app.name" : "SeaTunnel",
+    "spark.executor.cores" : 1,
+    "spark.executor.instances" : 2,
+    "spark.executor.memory" : "1g"
+  },
+  "sink" : [
+    {
+      "plugin_name" : "Console"
+    }
+  ],
+  "source" : [
+    {
+      "plugin_name" : "Fake",
+      "result_table_name" : "my_dataset"
+    }
+  ],
+  "transform" : []
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
new file mode 100644
index 0000000..39ebfe9
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+  job.mode = BATCH
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+  Console {}
+}
\ No newline at end of file
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/variables.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/variables.conf
new file mode 100644
index 0000000..f06fb1c
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/variables.conf
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spark {
+  spark.stream.batchDuration = 5
+
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+}
+
+source {
+  fakestream {
+    content = [
+      "20190318, beijing, first message",
+      "20190319, shanghai, second message",
+      "20190318, shanghai, third message"
+    ]
+    rate = 1
+  }
+}
+
+transform {
+  split {
+    fields = ["dt", "city", "msg"]
+    delimiter = ","
+  }
+
+  sql {
+    table_name = "user_view"
+    sql = "select * from user_view where city = '"${city2}"'"
+    result_table_name = "result1"
+  }
+
+  sql {
+    table_name = "user_view"
+    sql = "select * from user_view where dt = '"${dt}"'"
+    result_table_name = "result2"
+  }
+}
+
+sink {
+  stdout {
+    source_table_name="result1"
+  }
+
+  stdout {
+  }
+}
diff --git a/seatunnel-guava/pom.xml b/seatunnel-guava/pom.xml
new file mode 100644
index 0000000..741a1be
--- /dev/null
+++ b/seatunnel-guava/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>seatunnel-guava</artifactId>
+    <name>SeaTunnel : Shade : Guava</name>
+
+    <properties>
+        <guava.version>27.0-jre</guava.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <finalName>seatunnel-guava</finalName>
+                            <createSourcesJar>true</createSourcesJar>
+                            <shadeSourcesContent>true</shadeSourcesContent>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.com.google</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${basedir}/target/seatunnel-guava.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/seatunnel-hadoop3-3.1.4-uber/pom.xml b/seatunnel-hadoop3-3.1.4-uber/pom.xml
new file mode 100644
index 0000000..b80f07e
--- /dev/null
+++ b/seatunnel-hadoop3-3.1.4-uber/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+    <name>SeaTunnel : Shade : Hadoop3</name>
+
+    <properties>
+        <hadoop3.version>3.1.4</hadoop3.version>
+        <guava.version>27.0-jre</guava.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>${guava.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop3.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <finalName>seatunnel-hadoop3-3.1.4-uber</finalName>
+                            <createSourcesJar>true</createSourcesJar>
+                            <shadeSourcesContent>true</shadeSourcesContent>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.commons.io</pattern>
+                                    <shadedPattern>shade.org.apache.commons.io</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.lang3</pattern>
+                                    <shadedPattern>shade.org.apache.commons.lang3</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.hadoop.com.google.common</shadedPattern>
+                                    <includes>
+                                        <include>com.google.common.base.*</include>
+                                        <include>com.google.common.cache.*</include>
+                                        <include>com.google.common.collect.*</include>
+                                    </includes>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.hadoop.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${basedir}/target/seatunnel-hadoop3-3.1.4-uber.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/seatunnel-jackson/pom.xml b/seatunnel-jackson/pom.xml
new file mode 100644
index 0000000..3d515b4
--- /dev/null
+++ b/seatunnel-jackson/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>seatunnel-jackson</artifactId>
+    <name>SeaTunnel : Shade : Jackson</name>
+
+    <properties>
+        <jackson.version>2.13.3</jackson.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-properties</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <finalName>seatunnel-jackson</finalName>
+                            <createSourcesJar>true</createSourcesJar>
+                            <shadeSourcesContent>true</shadeSourcesContent>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${basedir}/target/seatunnel-jackson.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>