[Improve][Connector-V2] Clean key name in catalog table (#6942)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index a357676..6e3b386 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -35,7 +35,9 @@
 import com.mysql.cj.MysqlType;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -150,8 +152,9 @@
 
     private String buildColumnsIdentifySql(String catalogName) {
         List<String> columnSqls = new ArrayList<>();
+        Map<String, String> columnTypeMap = new HashMap<>();
         for (Column column : columns) {
-            columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName));
+            columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap));
         }
         if (primaryKey != null) {
             columnSqls.add("\t" + buildPrimaryKeySql());
@@ -161,28 +164,34 @@
                 if (StringUtils.isBlank(constraintKey.getConstraintName())) {
                     continue;
                 }
-                //                columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
+                String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap);
+                if (StringUtils.isNotBlank(constraintKeyStr)) {
+                    columnSqls.add("\t" + constraintKeyStr);
+                }
             }
         }
         return String.join(", \n", columnSqls);
     }
 
-    private String buildColumnIdentifySql(Column column, String catalogName) {
+    private String buildColumnIdentifySql(
+            Column column, String catalogName, Map<String, String> columnTypeMap) {
         final List<String> columnSqls = new ArrayList<>();
         columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
-        boolean isSupportDef = true;
-
+        String type;
         if ((SqlType.TIME.equals(column.getDataType().getSqlType())
                         || SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
                 && column.getScale() != null) {
             BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
-            columnSqls.add(typeDefine.getColumnType());
-        } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
-            columnSqls.add(column.getSourceType());
+            type = typeDefine.getColumnType();
+        } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
+                && StringUtils.isNotBlank(column.getSourceType())) {
+            type = column.getSourceType();
         } else {
             BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
-            columnSqls.add(typeDefine.getColumnType());
+            type = typeDefine.getColumnType();
         }
+        columnSqls.add(type);
+        columnTypeMap.put(column.getName(), type);
         // nullable
         if (column.isNullable()) {
             columnSqls.add("NULL");
@@ -206,19 +215,32 @@
         return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
     }
 
-    private String buildConstraintKeySql(ConstraintKey constraintKey) {
+    private String buildConstraintKeySql(
+            ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
         ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType();
         String indexColumns =
                 constraintKey.getColumnNames().stream()
                         .map(
                                 constraintKeyColumn -> {
+                                    String columnName = constraintKeyColumn.getColumnName();
+                                    boolean withLength = false;
+                                    if (columnTypeMap.containsKey(columnName)) {
+                                        String columnType = columnTypeMap.get(columnName);
+                                        if (columnType.endsWith("BLOB")
+                                                || columnType.endsWith("TEXT")) {
+                                            withLength = true;
+                                        }
+                                    }
                                     if (constraintKeyColumn.getSortType() == null) {
                                         return String.format(
-                                                "`%s`", constraintKeyColumn.getColumnName());
+                                                "`%s`%s",
+                                                CatalogUtils.getFieldIde(columnName, fieldIde),
+                                                withLength ? "(255)" : "");
                                     }
                                     return String.format(
-                                            "`%s` %s",
-                                            constraintKeyColumn.getColumnName(),
+                                            "`%s`%s %s",
+                                            CatalogUtils.getFieldIde(columnName, fieldIde),
+                                            withLength ? "(255)" : "",
                                             constraintKeyColumn.getSortType().name());
                                 })
                         .collect(Collectors.joining(", "));
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index f45d038..ed44900 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -118,7 +118,7 @@
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             // all the PK_NAME should be the same
-            pkName = rs.getString("PK_NAME");
+            pkName = cleanKeyName(rs.getString("PK_NAME"));
             int keySeq = rs.getInt("KEY_SEQ");
             // KEY_SEQ is 1-based index
             primaryKeyColumns.add(Pair.of(keySeq, columnName));
@@ -152,7 +152,7 @@
             if (columnName == null) {
                 continue;
             }
-            String indexName = resultSet.getString("INDEX_NAME");
+            String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
             boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
 
             ConstraintKey constraintKey =
@@ -179,6 +179,15 @@
         return new ArrayList<>(constraintKeyMap.values());
     }
 
+    private static String cleanKeyName(String keyName) {
+        if (keyName != null) {
+            // only keep the characters that are valid in an index name
+            keyName = keyName.replaceAll("[^a-zA-Z0-9_]", "");
+            keyName = keyName.replaceAll("^_+", "");
+        }
+        return keyName;
+    }
+
     public static TableSchema getTableSchema(
             DatabaseMetaData metadata, TablePath tablePath, JdbcDialectTypeMapper typeMapper)
             throws SQLException {
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index 2e7a725..745c703 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -26,6 +26,7 @@
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
@@ -37,6 +38,7 @@
 
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 
 public class MysqlCreateTableSqlBuilderTest {
@@ -60,6 +62,14 @@
                                         "age", BasicType.INT_TYPE, (Long) null, true, null, "age"))
                         .column(
                                 PhysicalColumn.of(
+                                        "blob_v",
+                                        PrimitiveByteArrayType.INSTANCE,
+                                        Long.MAX_VALUE,
+                                        true,
+                                        null,
+                                        "blob_v"))
+                        .column(
+                                PhysicalColumn.of(
                                         "createTime",
                                         LocalTimeType.LOCAL_DATE_TIME_TYPE,
                                         3,
@@ -76,12 +86,19 @@
                                         "lastUpdateTime"))
                         .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
                         .constraintKey(
-                                ConstraintKey.of(
-                                        ConstraintKey.ConstraintType.INDEX_KEY,
-                                        "name",
-                                        Lists.newArrayList(
-                                                ConstraintKey.ConstraintKeyColumn.of(
-                                                        "name", null))))
+                                Arrays.asList(
+                                        ConstraintKey.of(
+                                                ConstraintKey.ConstraintType.INDEX_KEY,
+                                                "name",
+                                                Lists.newArrayList(
+                                                        ConstraintKey.ConstraintKeyColumn.of(
+                                                                "name", null))),
+                                        ConstraintKey.of(
+                                                ConstraintKey.ConstraintType.INDEX_KEY,
+                                                "blob_v",
+                                                Lists.newArrayList(
+                                                        ConstraintKey.ConstraintKeyColumn.of(
+                                                                "blob_v", null)))))
                         .build();
         CatalogTable catalogTable =
                 CatalogTable.of(
@@ -98,12 +115,15 @@
         // create table sql is change; The old unit tests are no longer applicable
         String expect =
                 "CREATE TABLE `test_table` (\n"
-                        + "\t`id` null NOT NULL COMMENT 'id', \n"
-                        + "\t`name` null NOT NULL COMMENT 'name', \n"
-                        + "\t`age` null NULL COMMENT 'age', \n"
-                        + "\t`createTime` null NULL COMMENT 'createTime', \n"
-                        + "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n"
-                        + "\tPRIMARY KEY (`id`)\n"
+                        + "\t`id` BIGINT NOT NULL COMMENT 'id', \n"
+                        + "\t`name` VARCHAR(128) NOT NULL COMMENT 'name', \n"
+                        + "\t`age` INT NULL COMMENT 'age', \n"
+                        + "\t`blob_v` LONGBLOB NULL COMMENT 'blob_v', \n"
+                        + "\t`createTime` DATETIME NULL COMMENT 'createTime', \n"
+                        + "\t`lastUpdateTime` DATETIME NULL COMMENT 'lastUpdateTime', \n"
+                        + "\tPRIMARY KEY (`id`), \n"
+                        + "\tKEY `name` (`name`), \n"
+                        + "\tKEY `blob_v` (`blob_v`(255))\n"
                         + ") COMMENT = 'User table';";
         CONSOLE.println(expect);
         Assertions.assertEquals(expect, createTableSql);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
new file mode 100644
index 0000000..25f256f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+
+public class CatalogUtilsTest {
+
+    @Test
+    void testPrimaryKeysNameWithOutSpecialChar() throws SQLException {
+        Optional<PrimaryKey> primaryKey =
+                CatalogUtils.getPrimaryKey(new TestDatabaseMetaData(), TablePath.of("test.test"));
+        Assertions.assertEquals("testfdawe_", primaryKey.get().getPrimaryKey());
+    }
+
+    @Test
+    void testConstraintKeysNameWithOutSpecialChar() throws SQLException {
+        List<ConstraintKey> constraintKeys =
+                CatalogUtils.getConstraintKeys(
+                        new TestDatabaseMetaData(), TablePath.of("test.test"));
+        Assertions.assertEquals("testfdawe_", constraintKeys.get(0).getConstraintName());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
new file mode 100644
index 0000000..c0ea1c9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
@@ -0,0 +1,974 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestDatabaseMetaData implements DatabaseMetaData {
+    @Override
+    public boolean allProceduresAreCallable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean allTablesAreSelectable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getURL() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getUserName() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedHigh() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedLow() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedAtStart() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedAtEnd() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getDatabaseProductName() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getDatabaseProductVersion() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getDriverName() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getDriverVersion() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public int getDriverMajorVersion() {
+        return 0;
+    }
+
+    @Override
+    public int getDriverMinorVersion() {
+        return 0;
+    }
+
+    @Override
+    public boolean usesLocalFiles() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean usesLocalFilePerTable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMixedCaseIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesUpperCaseIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesLowerCaseIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesMixedCaseIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesUpperCaseQuotedIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesLowerCaseQuotedIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean storesMixedCaseQuotedIdentifiers() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getIdentifierQuoteString() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getSQLKeywords() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getNumericFunctions() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getStringFunctions() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getSystemFunctions() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getTimeDateFunctions() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getSearchStringEscape() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getExtraNameCharacters() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean supportsAlterTableWithAddColumn() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsAlterTableWithDropColumn() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsColumnAliasing() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean nullPlusNonNullIsNull() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsConvert() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsConvert(int fromType, int toType) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsTableCorrelationNames() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsDifferentTableCorrelationNames() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsExpressionsInOrderBy() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOrderByUnrelated() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsGroupBy() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsGroupByUnrelated() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsGroupByBeyondSelect() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsLikeEscapeClause() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleResultSets() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleTransactions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsNonNullableColumns() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMinimumSQLGrammar() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCoreSQLGrammar() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsExtendedSQLGrammar() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsANSI92EntryLevelSQL() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsANSI92IntermediateSQL() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsANSI92FullSQL() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsIntegrityEnhancementFacility() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOuterJoins() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsFullOuterJoins() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsLimitedOuterJoins() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getSchemaTerm() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getProcedureTerm() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getCatalogTerm() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isCatalogAtStart() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getCatalogSeparator() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean supportsSchemasInDataManipulation() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInProcedureCalls() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInTableDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInIndexDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInDataManipulation() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInProcedureCalls() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInTableDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInIndexDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsPositionedDelete() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsPositionedUpdate() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSelectForUpdate() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsStoredProcedures() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInComparisons() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInExists() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInIns() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInQuantifieds() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCorrelatedSubqueries() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsUnion() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsUnionAll() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossCommit() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossRollback() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossCommit() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossRollback() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getMaxBinaryLiteralLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxCharLiteralLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnsInGroupBy() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnsInIndex() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnsInOrderBy() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnsInSelect() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxColumnsInTable() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxConnections() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxCursorNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxIndexLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxSchemaNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxProcedureNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxCatalogNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxRowSize() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean doesMaxRowSizeIncludeBlobs() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getMaxStatementLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxStatements() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxTableNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxTablesInSelect() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getMaxUserNameLength() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getDefaultTransactionIsolation() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsTransactions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsDataManipulationTransactionsOnly() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean dataDefinitionCausesTransactionCommit() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean dataDefinitionIgnoredInTransactions() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public ResultSet getProcedures(
+            String catalog, String schemaPattern, String procedureNamePattern) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getProcedureColumns(
+            String catalog,
+            String schemaPattern,
+            String procedureNamePattern,
+            String columnNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getTables(
+            String catalog, String schemaPattern, String tableNamePattern, String[] types)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getTableTypes() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getColumns(
+            String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getColumnPrivileges(
+            String catalog, String schema, String table, String columnNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getTablePrivileges(
+            String catalog, String schemaPattern, String tableNamePattern) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getBestRowIdentifier(
+            String catalog, String schema, String table, int scope, boolean nullable)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getVersionColumns(String catalog, String schema, String table)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getPrimaryKeys(String catalog, String schema, String table)
+            throws SQLException {
+        List<Map<String, Object>> value = new ArrayList<>();
+        value.add(
+                new HashMap<String, Object>() {
+                    {
+                        put("COLUMN_NAME", "id");
+                        put("PK_NAME", "_test!#$#@fdawe_");
+                        put("KEY_SEQ", 1);
+                    }
+                });
+        return new TestResultSet(value);
+    }
+
+    @Override
+    public ResultSet getImportedKeys(String catalog, String schema, String table)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getExportedKeys(String catalog, String schema, String table)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getCrossReference(
+            String parentCatalog,
+            String parentSchema,
+            String parentTable,
+            String foreignCatalog,
+            String foreignSchema,
+            String foreignTable)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getTypeInfo() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getIndexInfo(
+            String catalog, String schema, String table, boolean unique, boolean approximate)
+            throws SQLException {
+        List<Map<String, Object>> value = new ArrayList<>();
+        value.add(
+                new HashMap<String, Object>() {
+                    {
+                        put("COLUMN_NAME", "id");
+                        put("INDEX_NAME", "_test!#$#@fdawe_");
+                        put("NON_UNIQUE", true);
+                        put("ASC_OR_DESC", "A");
+                    }
+                });
+        return new TestResultSet(value);
+    }
+
+    @Override
+    public boolean supportsResultSetType(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean ownUpdatesAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean ownDeletesAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean ownInsertsAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean othersUpdatesAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean othersDeletesAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean othersInsertsAreVisible(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean updatesAreDetected(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean deletesAreDetected(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean insertsAreDetected(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsBatchUpdates() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public ResultSet getUDTs(
+            String catalog, String schemaPattern, String typeNamePattern, int[] types)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean supportsSavepoints() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsNamedParameters() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOpenResults() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsGetGeneratedKeys() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getAttributes(
+            String catalog,
+            String schemaPattern,
+            String typeNamePattern,
+            String attributeNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean supportsResultSetHoldability(int holdability) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getResultSetHoldability() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getDatabaseMajorVersion() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getDatabaseMinorVersion() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getJDBCMajorVersion() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getJDBCMinorVersion() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getSQLStateType() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean locatorsUpdateCopy() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsStatementPooling() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public RowIdLifetime getRowIdLifetime() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean autoCommitFailureClosesAllResultSets() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public ResultSet getClientInfoProperties() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getFunctionColumns(
+            String catalog,
+            String schemaPattern,
+            String functionNamePattern,
+            String columnNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSet getPseudoColumns(
+            String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean generatedKeyAlwaysReturned() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java
new file mode 100644
index 0000000..f3a67a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java
@@ -0,0 +1,830 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+public class TestResultSet implements ResultSet {
+
+    private final List<Map<String, Object>> value;
+
+    private int index = -1;
+
+    public TestResultSet(List<Map<String, Object>> value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean next() throws SQLException {
+        return value.size() > ++index;
+    }
+
+    @Override
+    public void close() throws SQLException {}
+
+    @Override
+    public boolean wasNull() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getString(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean getBoolean(int columnIndex) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public byte getByte(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public short getShort(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getInt(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public long getLong(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public float getFloat(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public double getDouble(int columnIndex) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public byte[] getBytes(int columnIndex) throws SQLException {
+        return new byte[0];
+    }
+
+    @Override
+    public Date getDate(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Time getTime(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getAsciiStream(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getUnicodeStream(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getBinaryStream(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getString(String columnLabel) throws SQLException {
+        return value.get(index).get(columnLabel).toString();
+    }
+
+    @Override
+    public boolean getBoolean(String columnLabel) throws SQLException {
+        return (boolean) value.get(index).get(columnLabel);
+    }
+
+    @Override
+    public byte getByte(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public short getShort(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getInt(String columnLabel) throws SQLException {
+        return (int) value.get(index).get(columnLabel);
+    }
+
+    @Override
+    public long getLong(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public float getFloat(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public double getDouble(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public byte[] getBytes(String columnLabel) throws SQLException {
+        return new byte[0];
+    }
+
+    @Override
+    public Date getDate(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Time getTime(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getAsciiStream(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getUnicodeStream(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public InputStream getBinaryStream(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {}
+
+    @Override
+    public String getCursorName() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Object getObject(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Object getObject(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public int findColumn(String columnLabel) throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public Reader getCharacterStream(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Reader getCharacterStream(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isBeforeFirst() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean isAfterLast() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean isFirst() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean isLast() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void beforeFirst() throws SQLException {}
+
+    @Override
+    public void afterLast() throws SQLException {}
+
+    @Override
+    public boolean first() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean last() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getRow() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean absolute(int row) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean relative(int rows) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean previous() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {}
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {}
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getType() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public int getConcurrency() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean rowUpdated() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean rowInserted() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean rowDeleted() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void updateNull(int columnIndex) throws SQLException {}
+
+    @Override
+    public void updateBoolean(int columnIndex, boolean x) throws SQLException {}
+
+    @Override
+    public void updateByte(int columnIndex, byte x) throws SQLException {}
+
+    @Override
+    public void updateShort(int columnIndex, short x) throws SQLException {}
+
+    @Override
+    public void updateInt(int columnIndex, int x) throws SQLException {}
+
+    @Override
+    public void updateLong(int columnIndex, long x) throws SQLException {}
+
+    @Override
+    public void updateFloat(int columnIndex, float x) throws SQLException {}
+
+    @Override
+    public void updateDouble(int columnIndex, double x) throws SQLException {}
+
+    @Override
+    public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {}
+
+    @Override
+    public void updateString(int columnIndex, String x) throws SQLException {}
+
+    @Override
+    public void updateBytes(int columnIndex, byte[] x) throws SQLException {}
+
+    @Override
+    public void updateDate(int columnIndex, Date x) throws SQLException {}
+
+    @Override
+    public void updateTime(int columnIndex, Time x) throws SQLException {}
+
+    @Override
+    public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, int length)
+            throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {}
+
+    @Override
+    public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {}
+
+    @Override
+    public void updateObject(int columnIndex, Object x) throws SQLException {}
+
+    @Override
+    public void updateNull(String columnLabel) throws SQLException {}
+
+    @Override
+    public void updateBoolean(String columnLabel, boolean x) throws SQLException {}
+
+    @Override
+    public void updateByte(String columnLabel, byte x) throws SQLException {}
+
+    @Override
+    public void updateShort(String columnLabel, short x) throws SQLException {}
+
+    @Override
+    public void updateInt(String columnLabel, int x) throws SQLException {}
+
+    @Override
+    public void updateLong(String columnLabel, long x) throws SQLException {}
+
+    @Override
+    public void updateFloat(String columnLabel, float x) throws SQLException {}
+
+    @Override
+    public void updateDouble(String columnLabel, double x) throws SQLException {}
+
+    @Override
+    public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {}
+
+    @Override
+    public void updateString(String columnLabel, String x) throws SQLException {}
+
+    @Override
+    public void updateBytes(String columnLabel, byte[] x) throws SQLException {}
+
+    @Override
+    public void updateDate(String columnLabel, Date x) throws SQLException {}
+
+    @Override
+    public void updateTime(String columnLabel, Time x) throws SQLException {}
+
+    @Override
+    public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, int length)
+            throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, int length)
+            throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, int length)
+            throws SQLException {}
+
+    @Override
+    public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {}
+
+    @Override
+    public void updateObject(String columnLabel, Object x) throws SQLException {}
+
+    @Override
+    public void insertRow() throws SQLException {}
+
+    @Override
+    public void updateRow() throws SQLException {}
+
+    @Override
+    public void deleteRow() throws SQLException {}
+
+    @Override
+    public void refreshRow() throws SQLException {}
+
+    @Override
+    public void cancelRowUpdates() throws SQLException {}
+
+    @Override
+    public void moveToInsertRow() throws SQLException {}
+
+    @Override
+    public void moveToCurrentRow() throws SQLException {}
+
+    @Override
+    public Statement getStatement() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Object getObject(int columnIndex, Map<String, Class<?>> map) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Ref getRef(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Blob getBlob(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Clob getClob(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Array getArray(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Object getObject(String columnLabel, Map<String, Class<?>> map) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Ref getRef(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Blob getBlob(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Clob getClob(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Array getArray(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Date getDate(int columnIndex, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Time getTime(int columnIndex, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public URL getURL(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public URL getURL(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void updateRef(int columnIndex, Ref x) throws SQLException {}
+
+    @Override
+    public void updateRef(String columnLabel, Ref x) throws SQLException {}
+
+    @Override
+    public void updateBlob(int columnIndex, Blob x) throws SQLException {}
+
+    @Override
+    public void updateBlob(String columnLabel, Blob x) throws SQLException {}
+
+    @Override
+    public void updateClob(int columnIndex, Clob x) throws SQLException {}
+
+    @Override
+    public void updateClob(String columnLabel, Clob x) throws SQLException {}
+
+    @Override
+    public void updateArray(int columnIndex, Array x) throws SQLException {}
+
+    @Override
+    public void updateArray(String columnLabel, Array x) throws SQLException {}
+
+    @Override
+    public RowId getRowId(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public RowId getRowId(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void updateRowId(int columnIndex, RowId x) throws SQLException {}
+
+    @Override
+    public void updateRowId(String columnLabel, RowId x) throws SQLException {}
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void updateNString(int columnIndex, String nString) throws SQLException {}
+
+    @Override
+    public void updateNString(String columnLabel, String nString) throws SQLException {}
+
+    @Override
+    public void updateNClob(int columnIndex, NClob nClob) throws SQLException {}
+
+    @Override
+    public void updateNClob(String columnLabel, NClob nClob) throws SQLException {}
+
+    @Override
+    public NClob getNClob(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public NClob getNClob(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public SQLXML getSQLXML(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public SQLXML getSQLXML(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {}
+
+    @Override
+    public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {}
+
+    @Override
+    public String getNString(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String getNString(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Reader getNCharacterStream(int columnIndex) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Reader getNCharacterStream(String columnLabel) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream, long length)
+            throws SQLException {}
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {}
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {}
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {}
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {}
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader) throws SQLException {}
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader) throws SQLException {}
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader) throws SQLException {}
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader) throws SQLException {}
+
+    @Override
+    public <T> T getObject(int columnIndex, Class<T> type) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+}