Add statement proto
diff --git a/.gitignore b/.gitignore
index 08df579..d2123f1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+
 .idea
 *.iml
 *.ipr
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
index 5c552d0..7a47235 100644
--- a/.idea/encodings.xml
+++ b/.idea/encodings.xml
@@ -1,10 +1,12 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="Encoding">
+    <file url="file://$PROJECT_DIR$/health-proto/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/health-proto/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/jdbc-grpc-client/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/jdbc-grpc-client/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/jdbc-grpc-client/target/classes/generated-sources/protobuf/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/jdbc-proto/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/jdbc-proto/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/kyuubi-health-proto/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/kyuubi-jdbc-proto/src/main/java" charset="UTF-8" />
diff --git a/.idea/scopes/apache.xml b/.idea/scopes/apache.xml
index 1cca035..c9770ea 100644
--- a/.idea/scopes/apache.xml
+++ b/.idea/scopes/apache.xml
@@ -1,3 +1,3 @@
 <component name="DependencyValidationManager">
-  <scope name="apache" pattern="" />
+  <scope name="apache" pattern="src:*..*||test:*..*" />
 </component>
\ No newline at end of file
diff --git a/jdbc-grpc-client/pom.xml b/jdbc-grpc-client/pom.xml
index 95b8d8b..fc684ed 100644
--- a/jdbc-grpc-client/pom.xml
+++ b/jdbc-grpc-client/pom.xml
@@ -68,6 +68,4 @@
             <version>2.1.214</version>
         </dependency>
     </dependencies>
-
-
-</project>
\ No newline at end of file
+</project>
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
index 3a72e5d..8e8ab7c 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
@@ -1,5 +1,89 @@
 package org.apache.kyuubi.grpc;
 
+import org.apache.kyuubi.grpc.jdbc.statement.AutoGeneratedKeys;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetConcurrency;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetHoldability;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetType;
+
 public class GrpcUtils {
   final static Status OK = Status.newBuilder().setStatusCode(StatusCode.OK).setSqlState("00000").build();
+
+  public static int toJDBC(ResultSetType type) {
+    switch (type) {
+      case TYPE_FORWARD_ONLY:
+        return java.sql.ResultSet.TYPE_FORWARD_ONLY;
+      case TYPE_SCROLL_INSENSITIVE:
+        return java.sql.ResultSet.TYPE_SCROLL_INSENSITIVE;
+      case TYPE_SCROLL_SENSITIVE:
+        return java.sql.ResultSet.TYPE_SCROLL_SENSITIVE;
+      default:
+        throw new IllegalArgumentException("Unknown ResultSetType: " + type);
+    }
+  }
+
+  public static int toJDBC(ResultSetConcurrency concurrency) {
+    switch (concurrency) {
+      case CONCUR_READ_ONLY:
+        return java.sql.ResultSet.CONCUR_READ_ONLY;
+      case CONCUR_UPDATABLE:
+        return java.sql.ResultSet.CONCUR_UPDATABLE;
+      default:
+        throw new IllegalArgumentException("Unknown ResultSetConcurrency: " + concurrency);
+    }
+  }
+
+  public static int toJDBC(ResultSetHoldability holdability) {
+    switch (holdability) {
+      case CLOSE_CURSORS_AT_COMMIT:
+        return java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT;
+      case HOLD_CURSORS_OVER_COMMIT:
+        return java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
+      default:
+        throw new IllegalArgumentException("Unknown ResultSetHoldability: " + holdability);
+    }
+  }
+
+  /**
+   * Convert a {@link AutoGeneratedKeys} to a {@link java.sql.Statement} constant.
+   *
+   * @param autoGeneratedKeys the {@link AutoGeneratedKeys} to convert
+   * @return the converted {@link java.sql.Statement} constant
+   */
+  public static int toJDBC(AutoGeneratedKeys autoGeneratedKeys) {
+    switch (autoGeneratedKeys) {
+      case RETURN_GENERATED_KEYS:
+        return java.sql.Statement.RETURN_GENERATED_KEYS;
+      case NO_GENERATED_KEYS:
+        return java.sql.Statement.NO_GENERATED_KEYS;
+      default:
+        throw new IllegalArgumentException("Unknown AutoGeneratedKeys: " + autoGeneratedKeys);
+    }
+  }
+
+  /**
+   * Convert a {@link java.sql.SQLWarning} to a {@link SQLWarning}.
+   *
+   * @param warning the {@link java.sql.SQLWarning} to convert
+   * @return the converted {@link SQLWarning}
+   */
+  public static SQLWarning toProto(java.sql.SQLWarning warning) {
+    if (warning == null) {
+      return null;
+    }
+    SQLWarning.Builder builder = SQLWarning.newBuilder();
+    if (warning.getMessage() != null) {
+      builder.setReason(warning.getMessage());
+    }
+    if (warning.getSQLState() != null) {
+      builder.setSqlState(warning.getSQLState());
+    }
+    if (warning.getErrorCode() != 0) {
+      builder.setVendorCode(warning.getErrorCode());
+    }
+    if (warning.getNextWarning() != null) {
+      builder.setNextWarning(toProto(warning.getNextWarning()));
+    }
+    return builder.build();
+  }
+
 }
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
index e310057..6ab497d 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
@@ -3,6 +3,7 @@
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.kyuubi.grpc.GetWarningsResp;
 import org.apache.kyuubi.grpc.jdbc.DirectStatusResp;
 import org.apache.kyuubi.grpc.jdbc.connection.*;
 
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
index a132594..c22aed1 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
@@ -1,22 +1,26 @@
 package org.apache.kyuubi.grpc.client;
 
 import io.grpc.*;
+import org.apache.kyuubi.grpc.GetWarningsResp;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
+import org.apache.kyuubi.grpc.common.StatementHandle;
 import org.apache.kyuubi.grpc.jdbc.*;
 import org.apache.kyuubi.grpc.jdbc.JdbcGrpc.JdbcBlockingStub;
 import org.apache.kyuubi.grpc.jdbc.connection.*;
+import org.apache.kyuubi.grpc.jdbc.statement.*;
 
 import java.util.Map;
 import java.util.Optional;
-import java.util.logging.Logger;
 
 public class SimpleBlockingJdbcClient implements JdbcGrpcClient {
-  private final Logger logger = Logger.getLogger(this.getClass().getName());
   private JdbcBlockingStub blockingStub = null;
   private ConnectionGrpc.ConnectionBlockingStub connectionBlockingStub = null;
+  private StatementGrpc.StatementBlockingStub statementBlockingStub = null;
 
   public SimpleBlockingJdbcClient(Channel channel) {
     blockingStub = JdbcGrpc.newBlockingStub(channel);
     connectionBlockingStub = ConnectionGrpc.newBlockingStub(channel);
+    statementBlockingStub = StatementGrpc.newBlockingStub(channel);
   }
 
   public SimpleBlockingJdbcClient(ManagedChannelBuilder builder) {
@@ -39,12 +43,10 @@
   public DirectStatusResp openConnection(
     Map<String, String> configs,
     Optional<String> connectionId) {
-    OpenConnectionReq.Builder builder = OpenConnectionReq.newBuilder();
-    if (connectionId.isPresent()) {
-      logger.info("Reconnecting to server with existing connection" + connectionId.get());
-      builder.setConnectionId(connectionId.get());
-    }
-    OpenConnectionReq req = builder
+    ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+    connectionId.ifPresent(builder::setId);
+    OpenConnectionReq req = OpenConnectionReq.newBuilder()
+      .setConnectionId(builder.build())
       .putAllConfigs(configs)
       .build();
     return connectionBlockingStub.openConnection(req);
@@ -52,18 +54,18 @@
 
   @Override
   public DirectStatusResp closeConnection(String connectionId) {
-    CloseConnectionReq.Builder builder = CloseConnectionReq.newBuilder();
-    CloseConnectionReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+    ConnectionHandle req = builder
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.closeConnection(req);
   }
 
   @Override
   public DirectStatusResp abortConnection(String connectionId) {
-    AbortConnectionReq.Builder builder = AbortConnectionReq.newBuilder();
-    AbortConnectionReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+    ConnectionHandle req = builder
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.abortConnection(req);
   }
@@ -72,7 +74,7 @@
   public DirectStatusResp setClientInfo(String connectionId, Map<String, String> info) {
     SetClientInfoReq.Builder builder = SetClientInfoReq.newBuilder();
     SetClientInfoReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .putAllConfigs(info)
       .build();
     return connectionBlockingStub.setClientInfo(req);
@@ -82,7 +84,7 @@
   public DirectStatusResp setClientInfo(String connectionId, String name, String value) {
     SetClientInfoReq.Builder builder = SetClientInfoReq.newBuilder();
     SetClientInfoReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .putConfigs(name, value)
       .build();
     return connectionBlockingStub.setClientInfo(req);
@@ -90,9 +92,8 @@
 
   @Override
   public GetClientInfoResp getClientInfo(String connectionId) {
-    GetClientInfoReq.Builder builder = GetClientInfoReq.newBuilder();
-    GetClientInfoReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getClientInfo(req);
   }
@@ -101,7 +102,7 @@
   public DirectStatusResp setTypeMap(String connectionId, Map<String, String> map) {
     SetTypeMapReq.Builder builder = SetTypeMapReq.newBuilder();
     SetTypeMapReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .putAllTypeToClass(map)
       .build();
     return connectionBlockingStub.setTypeMap(req);
@@ -109,9 +110,8 @@
 
   @Override
   public GetTypeMapResp getTypeMap(String connectionId) {
-    GetTypeMapReq.Builder builder = GetTypeMapReq.newBuilder();
-    GetTypeMapReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getTypeMap(req);
   }
@@ -120,7 +120,7 @@
   public DirectStatusResp setHoldability(String connectionId, int holdability) {
     SetHoldabilityReq.Builder builder = SetHoldabilityReq.newBuilder();
     SetHoldabilityReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setHoldability(holdability)
       .build();
     return connectionBlockingStub.setHoldability(req);
@@ -128,9 +128,8 @@
 
   @Override
   public GetHoldabilityResp getHoldability(String connectionId) {
-    GetHoldabilityReq.Builder builder = GetHoldabilityReq.newBuilder();
-    GetHoldabilityReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getHoldability(req);
   }
@@ -139,7 +138,7 @@
   public DirectStatusResp setSchema(String connectionId, String schema) {
     SetSchemaReq.Builder builder = SetSchemaReq.newBuilder();
     SetSchemaReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSchema(schema)
       .build();
     return connectionBlockingStub.setSchema(req);
@@ -147,9 +146,8 @@
 
   @Override
   public GetSchemaResp getSchema(String connectionId) {
-    GetSchemaReq.Builder builder = GetSchemaReq.newBuilder();
-    GetSchemaReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getSchema(req);
   }
@@ -158,7 +156,7 @@
   public DirectStatusResp setNetworkTimeout(String connectionId, int milliseconds) {
     SetNetworkTimeoutReq.Builder builder = SetNetworkTimeoutReq.newBuilder();
     SetNetworkTimeoutReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setMilliseconds(milliseconds)
       .build();
     return connectionBlockingStub.setNetworkTimeout(req);
@@ -166,9 +164,8 @@
 
   @Override
   public GetNetworkTimeoutResp getNetworkTimeout(String connectionId) {
-    GetNetworkTimeoutReq.Builder builder = GetNetworkTimeoutReq.newBuilder();
-    GetNetworkTimeoutReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getNetworkTimeout(req);
   }
@@ -177,7 +174,7 @@
   public SetSavepointResp setSavepoint(String connectionId) {
     SetSavepointReq.Builder builder = SetSavepointReq.newBuilder();
     SetSavepointReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .build();
     return connectionBlockingStub.setSavepoint(req);
   }
@@ -186,7 +183,7 @@
   public SetSavepointResp setSavepoint(String connectionId, String name) {
     SetSavepointReq.Builder builder = SetSavepointReq.newBuilder();
     SetSavepointReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSavepointName(name)
       .build();
     return connectionBlockingStub.setSavepoint(req);
@@ -196,7 +193,7 @@
   public DirectStatusResp releaseSavepoint(String connectionId, Savepoint savepoint) {
     ReleaseSavepointReq.Builder builder = ReleaseSavepointReq.newBuilder();
     ReleaseSavepointReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSavepoint(savepoint)
       .build();
     return connectionBlockingStub.releaseSavepoint(req);
@@ -207,7 +204,7 @@
   public DirectStatusResp setSchema(String connectionId, String schema, String catalog) {
     SetSchemaReq.Builder builder = SetSchemaReq.newBuilder();
     SetSchemaReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSchema(schema)
       .build();
     return connectionBlockingStub.setSchema(req);
@@ -217,7 +214,7 @@
   public IsValidResp isValid(String connectionId, int timeout) {
     IsValidReq.Builder builder = IsValidReq.newBuilder();
     IsValidReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setTimeout(timeout)
       .build();
     return connectionBlockingStub.isValid(req);
@@ -227,7 +224,7 @@
   public NativeSQLResp nativeSQL(String connectionId, String sql) {
     NativeSQLReq.Builder builder = NativeSQLReq.newBuilder();
     NativeSQLReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSql(sql)
       .build();
     return connectionBlockingStub.nativeSQL(req);
@@ -237,7 +234,7 @@
   public DirectStatusResp setAutoCommit(String connectionId, boolean autoCommit) {
     SetAutoCommitReq.Builder builder = SetAutoCommitReq.newBuilder();
     SetAutoCommitReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setAutoCommit(autoCommit)
       .build();
     return connectionBlockingStub.setAutoCommit(req);
@@ -245,18 +242,16 @@
 
   @Override
   public GetAutoCommitResp getAutoCommit(String connectionId) {
-    GetAutoCommitReq.Builder builder = GetAutoCommitReq.newBuilder();
-    GetAutoCommitReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getAutoCommit(req);
   }
 
   @Override
   public DirectStatusResp commit(String connectionId) {
-    CommitReq.Builder builder = CommitReq.newBuilder();
-    CommitReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.commit(req);
   }
@@ -265,7 +260,7 @@
   public DirectStatusResp rollback(String connectionId) {
     RollbackReq.Builder builder = RollbackReq.newBuilder();
     RollbackReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .build();
     return connectionBlockingStub.rollback(req);
   }
@@ -276,7 +271,7 @@
       .setSavepointId(savepointId)
       .build();
     RollbackReq req = RollbackReq.newBuilder()
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSavepoint(sp)
       .build();
     return connectionBlockingStub.rollback(req);
@@ -289,7 +284,7 @@
       .setSavepointName(savepointName)
       .build();
     RollbackReq req = RollbackReq.newBuilder()
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSavepoint(sp)
       .build();
     return connectionBlockingStub.rollback(req);
@@ -301,7 +296,7 @@
       .setSavepointName(savepointName)
       .build();
     RollbackReq req = RollbackReq.newBuilder()
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setSavepoint(sp)
       .build();
     return connectionBlockingStub.rollback(req);
@@ -311,7 +306,7 @@
   public DirectStatusResp setReadOnly(String connectionId, boolean readOnly) {
     SetReadOnlyReq.Builder builder = SetReadOnlyReq.newBuilder();
     SetReadOnlyReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setReadOnly(readOnly)
       .build();
     return connectionBlockingStub.setReadOnly(req);
@@ -319,9 +314,8 @@
 
   @Override
   public IsReadOnlyResp isReadOnly(String connectionId) {
-    IsReadOnlyReq.Builder builder = IsReadOnlyReq.newBuilder();
-    IsReadOnlyReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.isReadOnly(req);
   }
@@ -330,7 +324,7 @@
   public DirectStatusResp setCatalog(String connectionId, String catalog) {
     SetCatalogReq.Builder builder = SetCatalogReq.newBuilder();
     SetCatalogReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setCatalog(catalog)
       .build();
     return connectionBlockingStub.setCatalog(req);
@@ -338,9 +332,8 @@
 
   @Override
   public GetCatalogResp getCatalog(String connectionId) {
-    GetCatalogReq.Builder builder = GetCatalogReq.newBuilder();
-    GetCatalogReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getCatalog(req);
   }
@@ -350,7 +343,7 @@
   public DirectStatusResp setTransactionIsolation(String connectionId, int level) {
     SetTransactionIsolationReq.Builder builder = SetTransactionIsolationReq.newBuilder();
     SetTransactionIsolationReq req = builder
-      .setConnectionId(connectionId)
+      .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
       .setLevel(level)
       .build();
     return connectionBlockingStub.setTransactionIsolation(req);
@@ -358,31 +351,58 @@
 
   @Override
   public GetTransactionIsolationResp getTransactionIsolation(String connectionId) {
-    GetTransactionIsolationReq.Builder builder = GetTransactionIsolationReq.newBuilder();
-    GetTransactionIsolationReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getTransactionIsolation(req);
   }
 
   @Override
   public DirectStatusResp clearWarnings(String connectionId) {
-    ClearWarningsReq.Builder builder = ClearWarningsReq.newBuilder();
-    ClearWarningsReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.clearWarnings(req);
   }
 
   @Override
   public GetWarningsResp getWarnings(String connectionId) {
-    GetWarningsReq.Builder builder = GetWarningsReq.newBuilder();
-    GetWarningsReq req = builder
-      .setConnectionId(connectionId)
+    ConnectionHandle req = ConnectionHandle.newBuilder()
+      .setId(connectionId)
       .build();
     return connectionBlockingStub.getWarnings(req);
   }
 
+  public DirectStatusResp createStatement(String connectionId, Optional<String> statementId) {
+    CreateStatementReq.Builder builder = CreateStatementReq.newBuilder();
+    builder.setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build());
+    if (statementId.isPresent()) {
+      StatementHandle handle = StatementHandle.newBuilder()
+        .setId(statementId.get())
+        .build();
+      builder.setStatementId(handle);
+    }
+    return statementBlockingStub.createStatement(builder.build());
+  }
+
+  public DirectStatusResp closeStatement(String statementId) {
+    StatementHandle req = StatementHandle.newBuilder()
+      .setId(statementId)
+      .build();
+    return statementBlockingStub.closeStatement(req);
+  }
+
+  public DirectStatusResp executeQuery(String statementId, String sql) {
+    StatementHandle handle = StatementHandle.newBuilder()
+      .setId(statementId)
+      .build();
+    ExecuteQueryReq req = ExecuteQueryReq.newBuilder()
+      .setStatementId(handle)
+      .setSql(sql)
+      .build();
+    return statementBlockingStub.executeQuery(req);
+  }
+
   @Override
   public DirectStatusResp getCatalogs(String connectionId) {
     GetCatalogsReq.Builder builder = GetCatalogsReq.newBuilder();
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
index 6b30546..f6eceda 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
@@ -15,8 +15,6 @@
   public DummyJdbcService() throws IOException {
   }
 
-
-
   @Override
   public void getCatalogs(GetCatalogsReq req, StreamObserver<DirectStatusResp> respOb) {
     DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
index 1a9617b..71af03f 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
@@ -2,6 +2,7 @@
 
 
 import io.grpc.stub.StreamObserver;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
 import org.apache.kyuubi.grpc.jdbc.*;
 import org.apache.kyuubi.grpc.jdbc.connection.ConnectionGrpc;
 import org.apache.kyuubi.grpc.jdbc.connection.*;
@@ -15,41 +16,56 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
 import static org.apache.kyuubi.grpc.GrpcUtils.OK;
 
 public class TestConnectionService extends ConnectionGrpc.ConnectionImplBase {
-
   private final Path _tempDir = Files.createTempDirectory(getClass().getSimpleName());
-
   public String defaultCatalogName = _tempDir.getFileName().toString().toUpperCase();
   private final String jdbcUrl = "jdbc:h2:" + _tempDir + ";MODE=DB2;user=testUser;password=testPass";
-  private Connection _connection = null;
-
-  private final Executor executor = Executors.newSingleThreadExecutor();
-
+  private Map<ConnectionHandle, Connection> connections = new HashMap<>();
+  private Executor executor = Executors.newSingleThreadExecutor();
   private final Map<Savepoint, java.sql.Savepoint> savepoints = new HashMap<>();
-
-  public void setConnection(Connection _connection) {
-    this._connection = _connection;
+  public Connection getConnection(ConnectionHandle connectionId, Properties properties) throws SQLException {
+    if (connections.containsKey(connectionId)) {
+      return connections.get(connectionId);
+    } else {
+      Connection conn = DriverManager.getConnection(jdbcUrl, properties);
+      connections.put(connectionId, conn);
+      return conn;
+    }
   }
 
-  public Connection getConnection() throws SQLException {
-    if (this._connection == null) {
-      Connection conn = DriverManager.getConnection(jdbcUrl);
-      setConnection(conn);
-      return conn;
+  public Connection getConnection(ConnectionHandle connectionId) throws SQLException {
+    if (connections.containsKey(connectionId)) {
+      return connections.get(connectionId);
     } else {
-      return _connection;
+      Connection conn = DriverManager.getConnection(jdbcUrl, new Properties());
+      connections.put(connectionId, conn);
+      return conn;
     }
   }
 
   public TestConnectionService() throws IOException {
   }
 
-  private Status errorStatus(Exception e) {
+  public void stop() {
+    for (Connection conn : connections.values()) {
+      try {
+        conn.close();
+      } catch (SQLException e) {
+        e.printStackTrace();
+      }
+    }
+    connections.clear();
+    connections = null;
+    executor = null;
+  }
+
+  public Status errorStatus(Exception e) {
     return Status.newBuilder()
       .setSqlState("38808")
       .setErrorMessage(e.getMessage())
@@ -57,14 +73,14 @@
       .build();
   }
 
-  private DirectStatusResp error(Exception e) {
+  public DirectStatusResp error(Exception e) {
     return DirectStatusResp
       .newBuilder()
       .setStatus(errorStatus(e))
       .build();
   }
 
-  private DirectStatusResp ok(String id) {
+  public DirectStatusResp ok(String id) {
     return DirectStatusResp
       .newBuilder()
       .setStatus(OK)
@@ -75,16 +91,19 @@
   @Override
   public void openConnection(OpenConnectionReq req, StreamObserver<DirectStatusResp> respOb) {
     DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
-    if (req.getConnectionId().isEmpty()) {
-      builder.setIdentifier("hello, kyuubi");
-    } else {
-      builder.setIdentifier("hello, " + req.getConnectionId());
-    }
     try {
+      ConnectionHandle connectionId = null;
       Properties properties = new Properties();
       properties.putAll(req.getConfigsMap());
-      setConnection(
-        DriverManager.getConnection(jdbcUrl, properties));
+      if (req.getConnectionId() == ConnectionHandle.getDefaultInstance()) {
+        connectionId = ConnectionHandle.newBuilder()
+          .setId(UUID.randomUUID().toString())
+          .build();
+      } else {
+        connectionId = req.getConnectionId();
+      }
+      getConnection(connectionId, properties);
+      builder.setIdentifier(connectionId.getId());
       builder.putExtraInfo("apache", "kyuubi");
       builder.putExtraInfo("Kyuubi", "Serverless SQL on Lakehouse");
       builder.setStatus(OK);
@@ -104,20 +123,21 @@
   }
 
   @Override
-  public void closeConnection(CloseConnectionReq req, StreamObserver<DirectStatusResp> respOb) {
+  public void closeConnection(ConnectionHandle req, StreamObserver<DirectStatusResp> respOb) {
     DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
     try {
-      if (req.getConnectionId().isEmpty()) {
-        throw new IllegalArgumentException("ConnectionId cannot be empty for CloseConnection");
+      if (!connections.containsKey(req)) {
+        throw new IllegalArgumentException("invalid connection id " + req.getId());
       } else {
-        builder.setIdentifier(req.getConnectionId());
+        getConnection(req).close();
+        builder.setIdentifier(req.getId());
       }
       builder.setStatus(OK);
     } catch (Exception e) {
       Status status = Status.newBuilder()
         .setStatusCode(StatusCode.ERROR)
         .setSqlState("2E000")
-        .setErrorMessage("invalid connection id" + req.getConnectionId())
+        .setErrorMessage(e.getMessage())
         .build();
       builder.setStatus(status);
     }
@@ -129,7 +149,7 @@
   public void nativeSQL(NativeSQLReq request, StreamObserver<NativeSQLResp> responseObserver) {
     NativeSQLResp.Builder builder = NativeSQLResp.newBuilder();
     try {
-      String nativeSQL = getConnection().nativeSQL(request.getSql());
+      String nativeSQL = getConnection(request.getConnectionId()).nativeSQL(request.getSql());
       NativeSQLResp resp = builder
         .setSql(nativeSQL)
         .setStatus(OK)
@@ -150,8 +170,8 @@
   @Override
   public void setAutoCommit(SetAutoCommitReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().setAutoCommit(request.getAutoCommit());
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setAutoCommit(request.getAutoCommit());
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -159,10 +179,10 @@
   }
 
   @Override
-  public void getAutoCommit(GetAutoCommitReq request, StreamObserver<GetAutoCommitResp> responseObserver) {
+  public void getAutoCommit(ConnectionHandle request, StreamObserver<GetAutoCommitResp> responseObserver) {
     GetAutoCommitResp.Builder builder = GetAutoCommitResp.newBuilder();
     try {
-      boolean autoCommit = getConnection().getAutoCommit();
+      boolean autoCommit = getConnection(request).getAutoCommit();
       GetAutoCommitResp resp = builder
         .setStatus(OK)
         .setAutoCommit(autoCommit)
@@ -175,10 +195,10 @@
   }
 
   @Override
-  public void commit(CommitReq request, StreamObserver<DirectStatusResp> responseObserver) {
+  public void commit(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().commit();
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request).commit();
+      responseObserver.onNext(ok(request.getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -188,14 +208,14 @@
   @Override
   public void rollback(RollbackReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      Connection conn = getConnection();
+      Connection conn = getConnection(request.getConnectionId());
       Savepoint savepoint = request.getSavepoint();
       if (savepoint == Savepoint.getDefaultInstance()) {
         conn.rollback();
       } else {
         conn.rollback(savepoints.get(savepoint));
       }
-      responseObserver.onNext(ok(request.getConnectionId()));
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -205,8 +225,8 @@
   @Override
   public void setReadOnly(SetReadOnlyReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().setReadOnly(request.getReadOnly());
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setReadOnly(request.getReadOnly());
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -214,12 +234,12 @@
   }
 
   @Override
-  public void isReadOnly(IsReadOnlyReq request, StreamObserver<IsReadOnlyResp> responseObserver) {
+  public void isReadOnly(ConnectionHandle request, StreamObserver<IsReadOnlyResp> responseObserver) {
     IsReadOnlyResp.Builder builder = IsReadOnlyResp.newBuilder();
     try {
       IsReadOnlyResp resp = builder
         .setStatus(OK)
-        .setReadOnly(getConnection().isReadOnly())
+        .setReadOnly(getConnection(request).isReadOnly())
         .build();
       responseObserver.onNext(resp);
     } catch (SQLException e) {
@@ -234,8 +254,8 @@
   @Override
   public void setCatalog(SetCatalogReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().setCatalog(request.getCatalog());
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setCatalog(request.getCatalog());
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -243,10 +263,10 @@
   }
 
   @Override
-  public void getCatalog(GetCatalogReq request, StreamObserver<GetCatalogResp> responseObserver) {
+  public void getCatalog(ConnectionHandle request, StreamObserver<GetCatalogResp> responseObserver) {
     GetCatalogResp.Builder builder = GetCatalogResp.newBuilder();
     try {
-      String catalog = getConnection().getCatalog();
+      String catalog = getConnection(request).getCatalog();
       GetCatalogResp resp = builder
         .setStatus(OK)
         .setCatalog(catalog)
@@ -264,8 +284,8 @@
   @Override
   public void setTransactionIsolation(SetTransactionIsolationReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().setTransactionIsolation(request.getLevel());
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setTransactionIsolation(request.getLevel());
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -273,10 +293,10 @@
   }
 
   @Override
-  public void getTransactionIsolation(GetTransactionIsolationReq request, StreamObserver<GetTransactionIsolationResp> responseObserver) {
+  public void getTransactionIsolation(ConnectionHandle request, StreamObserver<GetTransactionIsolationResp> responseObserver) {
     GetTransactionIsolationResp.Builder builder = GetTransactionIsolationResp.newBuilder();
     try {
-      int level = getConnection().getTransactionIsolation();
+      int level = getConnection(request).getTransactionIsolation();
       GetTransactionIsolationResp resp = builder
         .setStatus(OK)
         .setLevel(level)
@@ -291,30 +311,14 @@
     responseObserver.onCompleted();
   }
 
-  public static SQLWarning buildWarning(java.sql.SQLWarning cur) {
-    SQLWarning.Builder builder = SQLWarning.newBuilder();
-    if (cur.getMessage() != null) {
-      builder.setReason(cur.getMessage());
-    }
-    if (cur.getSQLState() != null) {
-      builder.setSqlState(cur.getSQLState());
-    }
-    if (cur.getErrorCode() != 0) {
-      builder.setVendorCode(cur.getErrorCode());
-    }
-    if (cur.getNextWarning() != null) {
-      builder.setNextWarning(buildWarning(cur.getNextWarning()));
-    }
-    return builder.build();
-  }
   @Override
-  public void getWarnings(GetWarningsReq request, StreamObserver<GetWarningsResp> responseObserver) {
+  public void getWarnings(ConnectionHandle request, StreamObserver<GetWarningsResp> responseObserver) {
     GetWarningsResp.Builder builder = GetWarningsResp.newBuilder();
     try {
-      java.sql.SQLWarning warnings = getConnection().getWarnings();
+      SQLWarning warnings = GrpcUtils.toProto(getConnection(request).getWarnings());
 
       if (warnings != null) {
-        builder.setWarnings(buildWarning(getConnection().getWarnings()));
+        builder.setWarnings(warnings);
       }
       responseObserver.onNext(builder.setStatus(OK).build());
     } catch (SQLException e) {
@@ -324,10 +328,10 @@
   }
 
   @Override
-  public void clearWarnings(ClearWarningsReq request, StreamObserver<DirectStatusResp> responseObserver) {
+  public void clearWarnings(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().clearWarnings();
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request).clearWarnings();
+      responseObserver.onNext(ok(request.getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -342,8 +346,8 @@
       for (Map.Entry<String, String> entry : request.getTypeToClassMap().entrySet()) {
         classMap.put(entry.getKey(), Class.forName(entry.getValue()));
       }
-      getConnection().setTypeMap(classMap);
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setTypeMap(classMap);
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (Exception e) {
       responseObserver.onNext(error(e));
     }
@@ -351,10 +355,10 @@
   }
 
   @Override
-  public void getTypeMap(GetTypeMapReq request, StreamObserver<GetTypeMapResp> responseObserver) {
+  public void getTypeMap(ConnectionHandle request, StreamObserver<GetTypeMapResp> responseObserver) {
     GetTypeMapResp.Builder builder = GetTypeMapResp.newBuilder();
     try {
-      Map<String, Class<?>> typeMap = getConnection().getTypeMap();
+      Map<String, Class<?>> typeMap = getConnection(request).getTypeMap();
       if (typeMap != null) {
         Map<String, String> typeToClassMap = new HashMap<String, String>();
         for (Map.Entry<String, Class<?>> entry : typeMap.entrySet()) {
@@ -373,8 +377,8 @@
   @Override
   public void setSchema(SetSchemaReq request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().setSchema(request.getSchema());
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setSchema(request.getSchema());
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -382,10 +386,10 @@
   }
 
   @Override
-  public void getSchema(GetSchemaReq request, StreamObserver<GetSchemaResp> responseObserver) {
+  public void getSchema(ConnectionHandle request, StreamObserver<GetSchemaResp> responseObserver) {
     GetSchemaResp.Builder builder = GetSchemaResp.newBuilder();
     try {
-      String schema = getConnection().getSchema();
+      String schema = getConnection(request).getSchema();
       // schema can be null, can you verify this?
       GetSchemaResp resp = builder
         .setStatus(OK)
@@ -404,8 +408,8 @@
   @Override
   public void setNetworkTimeout(SetNetworkTimeoutReq request, StreamObserver<DirectStatusResp> responseObserver) {
      try {
-        getConnection().setNetworkTimeout(executor, request.getMilliseconds());
-        responseObserver.onNext(ok(request.getConnectionId()));
+        getConnection(request.getConnectionId()).setNetworkTimeout(executor, request.getMilliseconds());
+        responseObserver.onNext(ok(request.getConnectionId().getId()));
       } catch (SQLException e) {
         responseObserver.onNext(error(e));
       }
@@ -413,10 +417,10 @@
   }
 
   @Override
-  public void getNetworkTimeout(GetNetworkTimeoutReq request, StreamObserver<GetNetworkTimeoutResp> responseObserver) {
+  public void getNetworkTimeout(ConnectionHandle request, StreamObserver<GetNetworkTimeoutResp> responseObserver) {
     GetNetworkTimeoutResp.Builder builder = GetNetworkTimeoutResp.newBuilder();
     try {
-      int timeout = getConnection().getNetworkTimeout();
+      int timeout = getConnection(request).getNetworkTimeout();
       GetNetworkTimeoutResp resp = builder
         .setStatus(OK)
         .setMilliseconds(timeout)
@@ -435,7 +439,7 @@
   public void isValid(IsValidReq request, StreamObserver<IsValidResp> responseObserver) {
     IsValidResp.Builder builder = IsValidResp.newBuilder();
     try {
-      boolean valid = getConnection().isValid(request.getTimeout());
+      boolean valid = getConnection(request.getConnectionId()).isValid(request.getTimeout());
       IsValidResp resp = builder
         .setStatus(OK)
         .setValid(valid)
@@ -450,10 +454,10 @@
   }
 
   @Override
-  public void abortConnection(AbortConnectionReq request, StreamObserver<DirectStatusResp> responseObserver) {
+  public void abortConnection(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
     try {
-      getConnection().abort(executor);
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request).abort(executor);
+      responseObserver.onNext(ok(request.getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -465,8 +469,8 @@
     try {
       Properties properties = new Properties();
       properties.putAll(request.getConfigsMap());
-      getConnection().setClientInfo(properties);
-      responseObserver.onNext(ok(request.getConnectionId()));
+      getConnection(request.getConnectionId()).setClientInfo(properties);
+      responseObserver.onNext(ok(request.getConnectionId().getId()));
     } catch (SQLException e) {
       responseObserver.onNext(error(e));
     }
@@ -474,10 +478,10 @@
   }
 
   @Override
-  public void getClientInfo(GetClientInfoReq request, StreamObserver<GetClientInfoResp> responseObserver) {
+  public void getClientInfo(ConnectionHandle request, StreamObserver<GetClientInfoResp> responseObserver) {
     GetClientInfoResp.Builder builder = GetClientInfoResp.newBuilder();
     try {
-      Properties properties = getConnection().getClientInfo();
+      Properties properties = getConnection(request).getClientInfo();
       // use a loop to put all properties into the builder
       for (Map.Entry<Object, Object> entry : properties.entrySet()) {
         builder.putConfigs(entry.getKey().toString(), entry.getValue().toString());
@@ -488,5 +492,4 @@
     }
     responseObserver.onCompleted();
   }
-
 }
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java
new file mode 100644
index 0000000..8e52570
--- /dev/null
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc;
+
+import com.google.protobuf.ProtocolStringList;
+import io.grpc.stub.StreamObserver;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
+import org.apache.kyuubi.grpc.common.StatementHandle;
+import org.apache.kyuubi.grpc.jdbc.DirectStatusResp;
+import org.apache.kyuubi.grpc.jdbc.statement.*;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.*;
+
+import static org.apache.kyuubi.grpc.GrpcUtils.OK;
+
+public class TestStatementService extends StatementGrpc.StatementImplBase {
+
+  private final TestConnectionService connectionService;
+
+  private final Map<StatementHandle, Statement> statements = new HashMap<>();
+
+  private Statement getStatement(StatementHandle statementId) {
+    Statement statement = statements.get(statementId);
+    if (statement == null) {
+      throw new IllegalArgumentException("Statement Id " + statementId.getId() + " not found");
+    }
+    return statement;
+  }
+
+  public TestStatementService(TestConnectionService connectionService) {
+    this.connectionService = connectionService;
+  }
+
+  @Override
+  public void createStatement(CreateStatementReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
+    try {
+      ConnectionHandle connectionId = request.getConnectionId();
+      int resultSetType = GrpcUtils.toJDBC(request.getResultSetType());
+      int resultSetConcurrency = GrpcUtils.toJDBC(request.getResultSetConcurrency());
+      int resultSetHoldability = GrpcUtils.toJDBC(request.getResultSetHoldability());
+      StatementHandle statementId = request.getStatementId();
+      if (statementId == StatementHandle.getDefaultInstance()) {
+        statementId = StatementHandle.newBuilder()
+          .setId(UUID.randomUUID().toString())
+          .build();
+      }
+
+      if (!statements.containsKey(statementId)) {
+        Connection connection = connectionService.getConnection(connectionId);
+        Statement statement = connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+        statements.put(statementId, statement);
+      }
+
+      DirectStatusResp resp = builder
+        .setIdentifier(statementId.getId())
+        .setStatus(OK)
+        .build();
+      responseObserver.onNext(resp);
+    } catch (Exception e) {
+      connectionService.error(e);
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void closeStatement(StatementHandle request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      Statement stmt = statements.remove(request);
+      if (stmt == null) {
+        throw new IllegalArgumentException("Statement Id " + request.getId() + " not found");
+      }
+      stmt.close();
+      responseObserver.onNext(connectionService.ok(request.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void executeQuery(ExecuteQueryReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      ExecuteQueryReq.AutoGeneratedKeysCase keysCase = request.getAutoGeneratedKeysCase();
+      switch (keysCase) {
+        case AUTOGENERATEDKEY:
+          int autoGeneratedKeys = GrpcUtils.toJDBC(request.getAutoGeneratedKey());
+          statement.execute(request.getSql(), autoGeneratedKeys);
+          break;
+        case COLUMNINDEXES:
+          List<Integer> indexesList = request.getColumnIndexes().getColumnIndexesList();
+          int[] columnIndexes = indexesList.stream().mapToInt(Integer::intValue).toArray();
+          statement.execute(request.getSql(), columnIndexes);
+          break;
+        case COLUMNNAMES:
+          ProtocolStringList namesList = request.getColumnNames().getColumnNamesList();
+          String[] columnNames = namesList.toArray(new String[namesList.size()]);
+          statement.execute(request.getSql(), columnNames);
+          break;
+        case AUTOGENERATEDKEYS_NOT_SET:
+          statement.executeQuery(request.getSql());
+          break;
+      }
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void executeUpdate(ExecuteQueryReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      ExecuteQueryReq.AutoGeneratedKeysCase keysCase = request.getAutoGeneratedKeysCase();
+      switch (keysCase) {
+        case AUTOGENERATEDKEY:
+          statement.execute(request.getSql(), request.getAutoGeneratedKey().getNumber());
+          break;
+        case COLUMNINDEXES:
+          List<Integer> indexesList = request.getColumnIndexes().getColumnIndexesList();
+          int[] columnIndexes = indexesList.stream().mapToInt(Integer::intValue).toArray();
+          statement.execute(request.getSql(), columnIndexes);
+          break;
+        case COLUMNNAMES:
+          ProtocolStringList namesList = request.getColumnNames().getColumnNamesList();
+          String[] columnNames = namesList.toArray(new String[namesList.size()]);
+          statement.execute(request.getSql(), columnNames);
+          break;
+        case AUTOGENERATEDKEYS_NOT_SET:
+          statement.executeQuery(request.getSql());
+          break;
+      }
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void getMaxFieldSize(StatementHandle request, StreamObserver<GetMaxFieldSizeResp> responseObserver) {
+    GetMaxFieldSizeResp.Builder builder = GetMaxFieldSizeResp.newBuilder();
+    try {
+      Statement statement = getStatement(request);
+      int maxFieldSize = statement.getMaxFieldSize();
+      GetMaxFieldSizeResp resp = builder
+        .setStatus(OK)
+        .setMax(maxFieldSize)
+        .build();
+      responseObserver.onNext(resp);
+    } catch (Exception e) {
+      GetMaxFieldSizeResp resp = builder
+        .setStatus(connectionService.errorStatus(e))
+        .build();
+      responseObserver.onNext(resp);
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void setMaxFieldSize(SetMaxFieldSizeReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      statement.setMaxFieldSize(request.getMax());
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void getMaxRows(StatementHandle request, StreamObserver<GetMaxRowsResp> responseObserver) {
+    GetMaxRowsResp.Builder builder = GetMaxRowsResp.newBuilder();
+    try {
+      Statement statement = getStatement(request);
+      int maxRows = statement.getMaxRows();
+      GetMaxRowsResp resp = builder
+        .setStatus(OK)
+        .setMax(maxRows)
+        .build();
+      responseObserver.onNext(resp);
+    } catch (Exception e) {
+      GetMaxRowsResp resp = builder
+        .setStatus(connectionService.errorStatus(e))
+        .build();
+      responseObserver.onNext(resp);
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void setMaxRows(SetMaxRowsReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      statement.setMaxRows(request.getMax());
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void setQueryTimeout(SetQueryTimeoutReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      statement.setQueryTimeout(request.getTimeout());
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void getQueryTimeout(StatementHandle request, StreamObserver<GetQueryTimeoutResp> responseObserver) {
+    GetQueryTimeoutResp.Builder builder = GetQueryTimeoutResp.newBuilder();
+    try {
+      Statement statement = getStatement(request);
+      int timeout = statement.getQueryTimeout();
+      GetQueryTimeoutResp resp = builder
+        .setStatus(OK)
+        .setTimeout(timeout)
+        .build();
+      responseObserver.onNext(resp);
+    } catch (Exception e) {
+      GetQueryTimeoutResp resp = builder
+        .setStatus(connectionService.errorStatus(e))
+        .build();
+      responseObserver.onNext(resp);
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void getWarnings(StatementHandle request, StreamObserver<GetWarningsResp> responseObserver) {
+    GetWarningsResp.Builder builder = GetWarningsResp.newBuilder();
+    try {
+      Statement statement = getStatement(request);
+      java.sql.SQLWarning warnings = statement.getWarnings();
+      GetWarningsResp resp = builder
+        .setStatus(OK)
+        .setWarnings(GrpcUtils.toProto(warnings))
+        .build();
+      responseObserver.onNext(resp);
+    } catch (Exception e) {
+      GetWarningsResp resp = builder
+        .setStatus(connectionService.errorStatus(e))
+        .build();
+      responseObserver.onNext(resp);
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void setEscapeProcessing(SetEscapeProcessingReq request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      StatementHandle statementId = request.getStatementId();
+      Statement statement = getStatement(statementId);
+      statement.setEscapeProcessing(request.getEnable());
+      responseObserver.onNext(connectionService.ok(statementId.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void cancelStatement(StatementHandle request, StreamObserver<DirectStatusResp> responseObserver) {
+    try {
+      Statement statement = getStatement(request);
+      statement.cancel();
+      responseObserver.onNext(connectionService.ok(request.getId()));
+    } catch (Exception e) {
+      responseObserver.onNext(connectionService.error(e));
+    }
+    responseObserver.onCompleted();
+  }
+}
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
index 098746a..443b458 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
@@ -10,10 +10,7 @@
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
@@ -26,6 +23,7 @@
   private SimpleBlockingJdbcClient client = null;
   DummyJdbcService dummyFrontendService = new DummyJdbcService();
   TestConnectionService dummyConnectionService = new TestConnectionService();
+  TestStatementService dummyStatementService = new TestStatementService(dummyConnectionService);
 
   public SimpleBlockingJdbcServiceClientTest() throws IOException {
   }
@@ -35,6 +33,7 @@
     ServerCredentials serverCredentials = InsecureServerCredentials.create();
     ServerBuilder<?> builder = Grpc.newServerBuilderForPort(port, serverCredentials)
       .addService(dummyConnectionService)
+      .addService(dummyStatementService)
       .addService(dummyFrontendService);
     server = builder.build();
     server.start();
@@ -43,6 +42,7 @@
 
   @After
   public void tearDown() {
+    dummyConnectionService.stop();
     if (server != null) {
       try {
         server.awaitTermination(5, TimeUnit.SECONDS);
@@ -63,28 +63,31 @@
     Map<String, String> configs = new HashMap<>();
     DirectStatusResp resp = client.openConnection(configs, Optional.empty());
     Status status = resp.getStatus();
-    assertEquals("hello, kyuubi", resp.getIdentifier());
+    String connectionId = resp.getIdentifier();
     assertEquals(StatusCode.OK, status.getStatusCode());
     assertEquals("00000", status.getSqlState());
     assertEquals("Serverless SQL on Lakehouse", resp.getExtraInfoOrThrow("Kyuubi"));
 
     configs.put("apache", "kyuubi");
     configs.put("kent", "yao");
-    DirectStatusResp resp1 = client.openConnection(configs, Optional.of("20181117"));
+    DirectStatusResp resp1 = client.openConnection(configs, Optional.of(connectionId));
     Status status1 = resp1.getStatus();
-    assertEquals("hello, 20181117", resp1.getIdentifier());
+    assertEquals(connectionId, resp1.getIdentifier());
     assertEquals(StatusCode.OK, status1.getStatusCode());
     assertEquals("kyuubi", resp1.getExtraInfoOrThrow("apache"));
   }
 
   @Test
   public void testCloseConnection() {
-    DirectStatusResp resp1 = client.closeConnection("");
+    String connectionId = UUID.randomUUID().toString();
+    DirectStatusResp resp1 = client.closeConnection(connectionId);
     Status resp1Status = resp1.getStatus();
     assertEquals(StatusCode.ERROR, resp1Status.getStatusCode());
     assertEquals("2E000", resp1Status.getSqlState());
-    assertEquals("invalid connection id", resp1Status.getErrorMessage());
-    DirectStatusResp resp2 = client.closeConnection("apache kyuubi");
+    assertEquals("invalid connection id " + connectionId, resp1Status.getErrorMessage());
+    DirectStatusResp openConn = client.openConnection(Collections.emptyMap(), Optional.empty());
+    connectionId = openConn.getIdentifier();
+    DirectStatusResp resp2 = client.closeConnection(connectionId);
     Status resp2Status = resp2.getStatus();
     assertEquals(StatusCode.OK, resp2Status.getStatusCode());
     assertEquals("00000", resp2Status.getSqlState());
@@ -181,7 +184,7 @@
   public void testBuildSQLWarnings() {
     java.sql.SQLWarning warning1 = new java.sql.SQLWarning("warning1");
     warning1.setNextWarning(new java.sql.SQLWarning("warning2"));
-    SQLWarning warning = TestConnectionService.buildWarning(warning1);
+    SQLWarning warning = GrpcUtils.toProto(warning1);
     assertEquals("warning1", warning.getReason());
     assertEquals("warning2", warning.getNextWarning().getReason());
   }
@@ -258,4 +261,17 @@
     DirectStatusResp resp = client.abortConnection("kyuubi");
     assertEquals(StatusCode.OK, resp.getStatus().getStatusCode());
   }
+
+  @Test
+  public void testExecuteQuery() {
+    String sql = "SELECT * FROM UNNEST(ARRAY['a', 'b', 'c'])";
+    DirectStatusResp resp = client.executeQuery("kyuubi", sql);
+    assertEquals(StatusCode.ERROR, resp.getStatus().getStatusCode());
+    assertTrue(resp.getStatus().getErrorMessage().contains("Statement Id kyuubi not found"));
+    DirectStatusResp resp1 = client.createStatement("kyuubi", Optional.empty());
+    String statementId = resp1.getIdentifier();
+    DirectStatusResp resp2 = client.executeQuery(statementId, sql);
+    assertEquals(StatusCode.OK, resp2.getStatus().getStatusCode());
+    client.closeStatement(statementId);
+  }
 }
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
index def741e..6b82683 100644
--- a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
@@ -40,4 +40,9 @@
   // a database vendor-specific warning code
   uint32 vendor_code = 3;
   SQLWarning next_warning = 4;
-}
\ No newline at end of file
+}
+
+message GetWarningsResp {
+  Status status = 1;
+  SQLWarning warnings = 2;
+}
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto
new file mode 100644
index 0000000..8ce0b13
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.common";
+
+message ConnectionHandle {
+  string id = 1;
+}
+
+message StatementHandle {
+  string id = 1;
+}
+
+message OperationHandle {
+  string id = 1;
+}
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
index bcde549..0fc2079 100644
--- a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
@@ -17,6 +17,7 @@
 syntax = "proto3";
 
 import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
 import "org/apache/kyuubi/grpc/jdbc/response.proto";
 
 option java_multiple_files = true;
@@ -45,53 +46,43 @@
  */
 service Connection {
   rpc OpenConnection(OpenConnectionReq) returns (DirectStatusResp);
-  rpc CloseConnection(CloseConnectionReq) returns (DirectStatusResp);
-  rpc AbortConnection(AbortConnectionReq) returns (DirectStatusResp);
+  rpc CloseConnection(ConnectionHandle) returns (DirectStatusResp);
+  rpc AbortConnection(ConnectionHandle) returns (DirectStatusResp);
   rpc NativeSQL(NativeSQLReq) returns (NativeSQLResp);
   rpc SetAutoCommit(SetAutoCommitReq) returns (DirectStatusResp);
-  rpc GetAutoCommit(GetAutoCommitReq) returns (GetAutoCommitResp);
-  rpc Commit(CommitReq) returns (DirectStatusResp);
+  rpc GetAutoCommit(ConnectionHandle) returns (GetAutoCommitResp);
+  rpc Commit(ConnectionHandle) returns (DirectStatusResp);
   rpc Rollback(RollbackReq) returns (DirectStatusResp);
   rpc SetReadOnly(SetReadOnlyReq) returns (DirectStatusResp);
-  rpc IsReadOnly(IsReadOnlyReq) returns (IsReadOnlyResp);
+  rpc IsReadOnly(ConnectionHandle) returns (IsReadOnlyResp);
   rpc SetCatalog(SetCatalogReq) returns (DirectStatusResp);
-  rpc GetCatalog(GetCatalogReq) returns (GetCatalogResp);
+  rpc GetCatalog(ConnectionHandle) returns (GetCatalogResp);
   rpc SetTransactionIsolation(SetTransactionIsolationReq) returns (DirectStatusResp);
-  rpc GetTransactionIsolation(GetTransactionIsolationReq) returns (GetTransactionIsolationResp);
-  rpc GetWarnings(GetWarningsReq) returns (GetWarningsResp);
-  rpc ClearWarnings(ClearWarningsReq) returns (DirectStatusResp);
+  rpc GetTransactionIsolation(ConnectionHandle) returns (GetTransactionIsolationResp);
+  rpc GetWarnings(ConnectionHandle) returns (GetWarningsResp);
+  rpc ClearWarnings(ConnectionHandle) returns (DirectStatusResp);
   rpc SetTypeMap(SetTypeMapReq) returns (DirectStatusResp);
-  rpc GetTypeMap(GetTypeMapReq) returns (GetTypeMapResp);
+  rpc GetTypeMap(ConnectionHandle) returns (GetTypeMapResp);
   rpc SetSchema(SetSchemaReq) returns (DirectStatusResp);
-  rpc GetSchema(GetSchemaReq) returns (GetSchemaResp);
+  rpc GetSchema(ConnectionHandle) returns (GetSchemaResp);
   rpc SetNetworkTimeout(SetNetworkTimeoutReq) returns (DirectStatusResp);
-  rpc GetNetworkTimeout(GetNetworkTimeoutReq) returns (GetNetworkTimeoutResp);
+  rpc GetNetworkTimeout(ConnectionHandle) returns (GetNetworkTimeoutResp);
   rpc SetHoldability(SetHoldabilityReq) returns (DirectStatusResp);
-  rpc GetHoldability(GetHoldabilityReq) returns (GetHoldabilityResp);
+  rpc GetHoldability(ConnectionHandle) returns (GetHoldabilityResp);
   rpc SetSavepoint(SetSavepointReq) returns (SetSavepointResp);
   rpc ReleaseSavepoint(ReleaseSavepointReq) returns (DirectStatusResp);
   rpc IsValid(IsValidReq) returns (IsValidResp);
   rpc SetClientInfo(SetClientInfoReq) returns (DirectStatusResp);
-  rpc GetClientInfo(GetClientInfoReq) returns (GetClientInfoResp);
+  rpc GetClientInfo(ConnectionHandle) returns (GetClientInfoResp);
 }
 
 message OpenConnectionReq {
-  string connection_id = 1;
-  map<string, string> configs = 2;
-}
-
-message CloseConnectionReq {
-  string connection_id = 1;
-  map<string, string> configs = 2;
-}
-
-message AbortConnectionReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   map<string, string> configs = 2;
 }
 
 message NativeSQLReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   string sql = 2;
 }
 
@@ -101,70 +92,50 @@
 }
 
 message SetAutoCommitReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   bool autoCommit = 2;
 }
 
-message GetAutoCommitReq {
-  string connection_id = 1;
-}
-
 message GetAutoCommitResp {
   Status status = 1;
   bool autoCommit = 2;
 }
 
-message CommitReq {
-  string connection_id = 1;
-}
-
 message Savepoint {
   uint32 savepoint_id = 1;
   string savepoint_name = 2;
 }
 
 message RollbackReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   Savepoint savepoint = 2;
 }
 
 message SetReadOnlyReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   bool read_only = 2;
 }
 
-message IsReadOnlyReq {
-  string connection_id = 1;
-}
-
 message IsReadOnlyResp {
   Status status = 1;
   bool read_only = 2;
 }
 
 message SetClientInfoReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   map<string, string> configs = 2;
 }
 
-message GetClientInfoReq {
-  string connection_id = 1;
-}
-
 message GetClientInfoResp {
   Status status = 1;
   map<string, string> configs = 2;
 }
 
 message SetCatalogReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   string catalog = 2;
 }
 
-message GetCatalogReq {
-  string connection_id = 1;
-}
-
 message GetCatalogResp {
   Status status = 1;
   string catalog = 2;
@@ -176,7 +147,7 @@
  * @see java.sql.Connection
  */
 message SetTransactionIsolationReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   // The possible transaction isolation levels
   // level one of the following <code>Connection</code> constants:
   // <code>Connection.TRANSACTION_READ_UNCOMMITTED(1)</code>,
@@ -186,101 +157,67 @@
   uint32 level = 2;
 }
 
-message GetTransactionIsolationReq {
-  string connection_id = 1;
-}
-
 message GetTransactionIsolationResp {
   Status status = 1;
   uint32 level = 2;
 }
 
-message GetWarningsReq {
-  string connection_id = 1;
-}
-
-message GetWarningsResp {
-  Status status = 1;
-  SQLWarning warnings = 2;
-}
-
-
-message ClearWarningsReq {
-  string connection_id = 1;
-}
-
 message TypeClassMap {
   string type_name = 1;
   string class_name = 2;
 }
 
 message SetTypeMapReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   map<string, string> type_to_class = 2;
 }
 
-message GetTypeMapReq {
-  string connection_id = 1;
-}
-
 message GetTypeMapResp {
   Status status = 1;
   map<string, string> type_to_class = 2;
 }
 
 message SetSchemaReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   string schema = 2;
 }
 
-message GetSchemaReq {
-  string connection_id = 1;
-}
-
 message GetSchemaResp {
   Status status = 1;
   string schema = 2;
 }
 
 message SetHoldabilityReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   // one of the following <code>ResultSet</code> constants:
   // <code>ResultSet.HOLD_CURSORS_OVER_COMMIT</code> or
   // <code>ResultSet.CLOSE_CURSORS_AT_COMMIT</code>
   uint32 holdability = 2;
 }
 
-message GetHoldabilityReq {
-  string connection_id = 1;
-}
-
 message GetHoldabilityResp {
   Status status = 1;
   uint32 holdability = 2;
 }
 
 message SetNetworkTimeoutReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   // The new network timeout value in milliseconds
   uint32 milliseconds = 2;
 }
 
-message GetNetworkTimeoutReq {
-  string connection_id = 1;
-}
-
 message GetNetworkTimeoutResp {
   Status status = 1;
   uint32 milliseconds = 2;
 }
 
 message ReleaseSavepointReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   Savepoint savepoint = 2;
 }
 
 message SetSavepointReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   string savepoint_name = 2;
 }
 
@@ -290,7 +227,7 @@
 }
 
 message IsValidReq {
-  string connection_id = 1;
+  ConnectionHandle connection_id = 1;
   uint32 timeout = 2;
 }
 
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto
new file mode 100644
index 0000000..4cc1eaa
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
+import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/response.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.jdbc.resultset";
+
+service ResultSet {
+  rpc Fetch(FetchRequest) returns (FetchResponse) {}
+  rpc Close(CloseRequest) returns (CloseResponse) {}
+}
+
+message FetchResultReq {
+  OperationHandle operation_id = 1;
+  int32 fetch_size = 1;
+}
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto
new file mode 100644
index 0000000..4596e3f
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
+import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/response.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.jdbc.statement";
+
+/**
+ * The service definition of JDBC statement service, mapping to
+ * java.sql.Statement
+ * TODO: add more methods
+ *   - executeLargeBatch
+ *   - executeLargeUpdate
+ */
+service Statement {
+  rpc CreateStatement(CreateStatementReq) returns (DirectStatusResp);
+  rpc CloseStatement(StatementHandle) returns (DirectStatusResp);
+  rpc ExecuteQuery(ExecuteQueryReq) returns (DirectStatusResp);
+  rpc ExecuteUpdate(ExecuteQueryReq) returns (DirectStatusResp);
+  rpc ExecuteLargeUpdate(ExecuteQueryReq) returns (DirectStatusResp);
+  rpc GetMaxFieldSize(StatementHandle) returns (GetMaxFieldSizeResp);
+  rpc SetMaxFieldSize(SetMaxFieldSizeReq) returns (DirectStatusResp);
+  rpc GetMaxRows(StatementHandle) returns (GetMaxRowsResp);
+  rpc SetMaxRows(SetMaxRowsReq) returns (DirectStatusResp);
+  rpc SetEscapeProcessing(SetEscapeProcessingReq) returns (DirectStatusResp);
+  rpc GetQueryTimeout(StatementHandle) returns (GetQueryTimeoutResp);
+  rpc SetQueryTimeout(SetQueryTimeoutReq) returns (DirectStatusResp);
+  rpc CancelStatement(StatementHandle) returns (DirectStatusResp);
+  rpc GetWarnings(StatementHandle) returns (GetWarningsResp);
+  rpc ClearWarnings(StatementHandle) returns (DirectStatusResp);
+  rpc SetCursorName(SetCursorNameReq) returns (DirectStatusResp);
+  rpc GetMoreResults(GetMoreResultsReq) returns (GetMoreResultsResp);
+  rpc GetFetchDirection(StatementHandle) returns (GetFetchDirectionResp);
+  rpc SetFetchDirection(SetFetchDirectionReq) returns (DirectStatusResp);
+  rpc GetFetchSize(StatementHandle) returns (GetFetchSizeResp);
+  rpc SetFetchSize(SetFetchSizeReq) returns (DirectStatusResp);
+  rpc AddBatch(AddBatchReq) returns (DirectStatusResp);
+  rpc ClearBatch(StatementHandle) returns (DirectStatusResp);
+  rpc ExecuteBatch(StatementHandle) returns (ExecuteBatchResp);
+  rpc GetResultSet(StatementHandle) returns (DirectStatusResp);
+  rpc GetGeneratedKeys(StatementHandle) returns (DirectStatusResp);
+  rpc IsClosed(StatementHandle) returns (IsClosedResp);
+  rpc SetPoolable(SetPoolableReq) returns (DirectStatusResp);
+  rpc IsPoolable(StatementHandle) returns (IsPoolableResp);
+  rpc CloseOnCompletion(StatementHandle) returns (DirectStatusResp);
+  rpc IsCloseOnCompletion(StatementHandle) returns (IsCloseOnCompletionResp);
+}
+
+enum ResultSetType {
+  TYPE_FORWARD_ONLY = 0; // 1003
+  TYPE_SCROLL_INSENSITIVE = 1; // 1004
+  TYPE_SCROLL_SENSITIVE = 2; // 1005
+}
+
+enum ResultSetConcurrency {
+  CONCUR_READ_ONLY = 0; // 1007
+  CONCUR_UPDATABLE = 1; // 1008
+}
+
+enum ResultSetHoldability {
+  HOLD_CURSORS_OVER_COMMIT = 0; // 1
+  CLOSE_CURSORS_AT_COMMIT = 1; // 2
+}
+
+enum AutoGeneratedKeys {
+  NO_GENERATED_KEYS = 0; // 1
+  RETURN_GENERATED_KEYS = 1; // 2
+}
+
+enum FetchDirection {
+  // The constant indicating that the rows in a result set will be
+  // processed in a forward direction; first-to-last.
+  FETCH_FORWARD = 0; // 1000
+  // The constant indicating that the rows in a result set will be
+  // processed in a reverse direction; last-to-first.
+  FETCH_REVERSE = 1; // 1001
+  FETCH_UNKNOWN = 2; // 1002
+}
+
+enum ResultSetCloseOperation {
+  CLOSE_CURRENT_RESULT = 0; // 1
+  KEEP_CURRENT_RESULT = 1; // 2
+  CLOSE_ALL_RESULTS = 2; // 3
+}
+
+message CreateStatementReq {
+  ConnectionHandle connection_id = 1;
+  // The statement id is optional in the request. If it's present, the server
+  // will try to reuse the existing statement with the same id.
+  StatementHandle statement_id = 2;
+  ResultSetType resultSetType = 3;
+  ResultSetConcurrency resultSetConcurrency = 4;
+  ResultSetHoldability resultSetHoldability = 5;
+}
+
+message ExecuteQueryReq {
+  StatementHandle statement_id = 1;
+  string sql = 2;
+
+  message ColumnIndexes {
+    repeated uint32 columnIndexes = 1;
+  }
+
+  message ColumnNames {
+    repeated string columnNames = 1;
+  }
+
+  oneof autoGeneratedKeys {
+    AutoGeneratedKeys autoGeneratedKey = 3;
+    ColumnNames columnNames = 4;
+    ColumnIndexes columnIndexes = 5;
+  }
+}
+
+message GetMaxFieldSizeResp {
+  Status status = 1;
+  uint32 max = 2;
+}
+
+message SetMaxFieldSizeReq {
+  StatementHandle statement_id = 1;
+  uint32 max = 2;
+}
+
+message GetMaxRowsResp {
+  Status status = 1;
+  uint32 max = 2;
+}
+
+message SetMaxRowsReq {
+  StatementHandle statement_id = 1;
+  uint32 max = 2;
+}
+
+message SetEscapeProcessingReq {
+  StatementHandle statement_id = 1;
+  bool enable = 2;
+}
+
+message GetQueryTimeoutResp {
+  Status status = 1;
+  uint32 timeout = 2;
+}
+
+message SetQueryTimeoutReq {
+  StatementHandle statement_id = 1;
+  uint32 timeout = 2;
+}
+
+message SetCursorNameReq {
+  StatementHandle statement_id = 1;
+  string name = 2;
+}
+
+message GetMoreResultsReq {
+  StatementHandle statement_id = 1;
+  ResultSetCloseOperation close_operation = 2;
+}
+
+message GetMoreResultsResp {
+  Status status = 1;
+  bool has_more_results = 2;
+}
+
+message SetFetchDirectionReq {
+  StatementHandle statement_id = 1;
+  FetchDirection direction = 2;
+}
+
+message GetFetchDirectionResp {
+  Status status = 1;
+  FetchDirection direction = 2;
+}
+
+message GetFetchSizeResp {
+  Status status = 1;
+  uint32 fetch_size = 2;
+}
+
+message SetFetchSizeReq {
+  StatementHandle statement_id = 1;
+  uint32 fetch_size = 2;
+}
+
+message GetResultSetConcurrencyResp {
+  Status status = 1;
+  ResultSetConcurrency concurrency = 2;
+}
+
+message GetResultSetHoldabilityResp {
+  Status status = 1;
+  ResultSetHoldability holdability = 2;
+}
+
+message GetResultSetTypeResp {
+  Status status = 1;
+  ResultSetType type = 2;
+}
+
+message AddBatchReq {
+  StatementHandle statement_id = 1;
+  string sql = 2;
+}
+
+message ClearBatchReq {
+  StatementHandle statement_id = 1;
+}
+
+message ExecuteBatchResp {
+  Status status = 1;
+  repeated uint64 update_counts = 2;
+}
+
+message IsClosedResp {
+  Status status = 1;
+  bool is_closed = 2;
+}
+
+message SetPoolableReq {
+  StatementHandle statement_id = 1;
+  bool poolable = 2;
+}
+
+message IsPoolableResp {
+  Status status = 1;
+  bool is_poolable = 2;
+}
+
+message IsCloseOnCompletionResp {
+  Status status = 1;
+  bool close_on_completion = 2;
+}
diff --git a/pom.xml b/pom.xml
index d2f3797..2f929fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
                         <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.47.0:exe:${os.detected.classifier}</pluginArtifact>
                         <protocArtifact>com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier}</protocArtifact>
                         <protoSourceRoot>src/main/protobuf/</protoSourceRoot>
+                        <pluginParameter>--escape_quotes</pluginParameter>
                     </configuration>
                     <executions>
                         <execution>