Add statement proto
diff --git a/.gitignore b/.gitignore
index 08df579..d2123f1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+
.idea
*.iml
*.ipr
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
index 5c552d0..7a47235 100644
--- a/.idea/encodings.xml
+++ b/.idea/encodings.xml
@@ -1,10 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
+ <file url="file://$PROJECT_DIR$/health-proto/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/health-proto/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/jdbc-grpc-client/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/jdbc-grpc-client/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/jdbc-grpc-client/target/classes/generated-sources/protobuf/java" charset="UTF-8" />
+ <file url="file://$PROJECT_DIR$/jdbc-proto/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/jdbc-proto/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/kyuubi-health-proto/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/kyuubi-jdbc-proto/src/main/java" charset="UTF-8" />
diff --git a/.idea/scopes/apache.xml b/.idea/scopes/apache.xml
index 1cca035..c9770ea 100644
--- a/.idea/scopes/apache.xml
+++ b/.idea/scopes/apache.xml
@@ -1,3 +1,3 @@
<component name="DependencyValidationManager">
- <scope name="apache" pattern="" />
+ <scope name="apache" pattern="src:*..*||test:*..*" />
</component>
\ No newline at end of file
diff --git a/jdbc-grpc-client/pom.xml b/jdbc-grpc-client/pom.xml
index 95b8d8b..fc684ed 100644
--- a/jdbc-grpc-client/pom.xml
+++ b/jdbc-grpc-client/pom.xml
@@ -68,6 +68,4 @@
<version>2.1.214</version>
</dependency>
</dependencies>
-
-
-</project>
\ No newline at end of file
+</project>
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
index 3a72e5d..8e8ab7c 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/GrpcUtils.java
@@ -1,5 +1,89 @@
package org.apache.kyuubi.grpc;
+import org.apache.kyuubi.grpc.jdbc.statement.AutoGeneratedKeys;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetConcurrency;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetHoldability;
+import org.apache.kyuubi.grpc.jdbc.statement.ResultSetType;
+
public class GrpcUtils {
final static Status OK = Status.newBuilder().setStatusCode(StatusCode.OK).setSqlState("00000").build();
+
+ public static int toJDBC(ResultSetType type) {
+ switch (type) {
+ case TYPE_FORWARD_ONLY:
+ return java.sql.ResultSet.TYPE_FORWARD_ONLY;
+ case TYPE_SCROLL_INSENSITIVE:
+ return java.sql.ResultSet.TYPE_SCROLL_INSENSITIVE;
+ case TYPE_SCROLL_SENSITIVE:
+ return java.sql.ResultSet.TYPE_SCROLL_SENSITIVE;
+ default:
+ throw new IllegalArgumentException("Unknown ResultSetType: " + type);
+ }
+ }
+
+ public static int toJDBC(ResultSetConcurrency concurrency) {
+ switch (concurrency) {
+ case CONCUR_READ_ONLY:
+ return java.sql.ResultSet.CONCUR_READ_ONLY;
+ case CONCUR_UPDATABLE:
+ return java.sql.ResultSet.CONCUR_UPDATABLE;
+ default:
+ throw new IllegalArgumentException("Unknown ResultSetConcurrency: " + concurrency);
+ }
+ }
+
+ public static int toJDBC(ResultSetHoldability holdability) {
+ switch (holdability) {
+ case CLOSE_CURSORS_AT_COMMIT:
+ return java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT;
+ case HOLD_CURSORS_OVER_COMMIT:
+ return java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
+ default:
+ throw new IllegalArgumentException("Unknown ResultSetHoldability: " + holdability);
+ }
+ }
+
+ /**
+ * Convert a {@link AutoGeneratedKeys} to a {@link java.sql.Statement} constant.
+ *
+ * @param autoGeneratedKeys the {@link AutoGeneratedKeys} to convert
+ * @return the converted {@link java.sql.Statement} constant
+ */
+ public static int toJDBC(AutoGeneratedKeys autoGeneratedKeys) {
+ switch (autoGeneratedKeys) {
+ case RETURN_GENERATED_KEYS:
+ return java.sql.Statement.RETURN_GENERATED_KEYS;
+ case NO_GENERATED_KEYS:
+ return java.sql.Statement.NO_GENERATED_KEYS;
+ default:
+ throw new IllegalArgumentException("Unknown AutoGeneratedKeys: " + autoGeneratedKeys);
+ }
+ }
+
+ /**
+ * Convert a {@link java.sql.SQLWarning} to a {@link SQLWarning}.
+ *
+ * @param warning the {@link java.sql.SQLWarning} to convert
+ * @return the converted {@link SQLWarning}
+ */
+ public static SQLWarning toProto(java.sql.SQLWarning warning) {
+ if (warning == null) {
+ return null;
+ }
+ SQLWarning.Builder builder = SQLWarning.newBuilder();
+ if (warning.getMessage() != null) {
+ builder.setReason(warning.getMessage());
+ }
+ if (warning.getSQLState() != null) {
+ builder.setSqlState(warning.getSQLState());
+ }
+ if (warning.getErrorCode() != 0) {
+ builder.setVendorCode(warning.getErrorCode());
+ }
+ if (warning.getNextWarning() != null) {
+ builder.setNextWarning(toProto(warning.getNextWarning()));
+ }
+ return builder.build();
+ }
+
}
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
index e310057..6ab497d 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/JdbcGrpcClient.java
@@ -3,6 +3,7 @@
import java.util.Map;
import java.util.Optional;
+import org.apache.kyuubi.grpc.GetWarningsResp;
import org.apache.kyuubi.grpc.jdbc.DirectStatusResp;
import org.apache.kyuubi.grpc.jdbc.connection.*;
diff --git a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
index a132594..c22aed1 100644
--- a/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
+++ b/jdbc-grpc-client/src/main/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcClient.java
@@ -1,22 +1,26 @@
package org.apache.kyuubi.grpc.client;
import io.grpc.*;
+import org.apache.kyuubi.grpc.GetWarningsResp;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
+import org.apache.kyuubi.grpc.common.StatementHandle;
import org.apache.kyuubi.grpc.jdbc.*;
import org.apache.kyuubi.grpc.jdbc.JdbcGrpc.JdbcBlockingStub;
import org.apache.kyuubi.grpc.jdbc.connection.*;
+import org.apache.kyuubi.grpc.jdbc.statement.*;
import java.util.Map;
import java.util.Optional;
-import java.util.logging.Logger;
public class SimpleBlockingJdbcClient implements JdbcGrpcClient {
- private final Logger logger = Logger.getLogger(this.getClass().getName());
private JdbcBlockingStub blockingStub = null;
private ConnectionGrpc.ConnectionBlockingStub connectionBlockingStub = null;
+ private StatementGrpc.StatementBlockingStub statementBlockingStub = null;
public SimpleBlockingJdbcClient(Channel channel) {
blockingStub = JdbcGrpc.newBlockingStub(channel);
connectionBlockingStub = ConnectionGrpc.newBlockingStub(channel);
+ statementBlockingStub = StatementGrpc.newBlockingStub(channel);
}
public SimpleBlockingJdbcClient(ManagedChannelBuilder builder) {
@@ -39,12 +43,10 @@
public DirectStatusResp openConnection(
Map<String, String> configs,
Optional<String> connectionId) {
- OpenConnectionReq.Builder builder = OpenConnectionReq.newBuilder();
- if (connectionId.isPresent()) {
- logger.info("Reconnecting to server with existing connection" + connectionId.get());
- builder.setConnectionId(connectionId.get());
- }
- OpenConnectionReq req = builder
+ ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+ connectionId.ifPresent(builder::setId);
+ OpenConnectionReq req = OpenConnectionReq.newBuilder()
+ .setConnectionId(builder.build())
.putAllConfigs(configs)
.build();
return connectionBlockingStub.openConnection(req);
@@ -52,18 +54,18 @@
@Override
public DirectStatusResp closeConnection(String connectionId) {
- CloseConnectionReq.Builder builder = CloseConnectionReq.newBuilder();
- CloseConnectionReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+ ConnectionHandle req = builder
+ .setId(connectionId)
.build();
return connectionBlockingStub.closeConnection(req);
}
@Override
public DirectStatusResp abortConnection(String connectionId) {
- AbortConnectionReq.Builder builder = AbortConnectionReq.newBuilder();
- AbortConnectionReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle.Builder builder = ConnectionHandle.newBuilder();
+ ConnectionHandle req = builder
+ .setId(connectionId)
.build();
return connectionBlockingStub.abortConnection(req);
}
@@ -72,7 +74,7 @@
public DirectStatusResp setClientInfo(String connectionId, Map<String, String> info) {
SetClientInfoReq.Builder builder = SetClientInfoReq.newBuilder();
SetClientInfoReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.putAllConfigs(info)
.build();
return connectionBlockingStub.setClientInfo(req);
@@ -82,7 +84,7 @@
public DirectStatusResp setClientInfo(String connectionId, String name, String value) {
SetClientInfoReq.Builder builder = SetClientInfoReq.newBuilder();
SetClientInfoReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.putConfigs(name, value)
.build();
return connectionBlockingStub.setClientInfo(req);
@@ -90,9 +92,8 @@
@Override
public GetClientInfoResp getClientInfo(String connectionId) {
- GetClientInfoReq.Builder builder = GetClientInfoReq.newBuilder();
- GetClientInfoReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getClientInfo(req);
}
@@ -101,7 +102,7 @@
public DirectStatusResp setTypeMap(String connectionId, Map<String, String> map) {
SetTypeMapReq.Builder builder = SetTypeMapReq.newBuilder();
SetTypeMapReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.putAllTypeToClass(map)
.build();
return connectionBlockingStub.setTypeMap(req);
@@ -109,9 +110,8 @@
@Override
public GetTypeMapResp getTypeMap(String connectionId) {
- GetTypeMapReq.Builder builder = GetTypeMapReq.newBuilder();
- GetTypeMapReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getTypeMap(req);
}
@@ -120,7 +120,7 @@
public DirectStatusResp setHoldability(String connectionId, int holdability) {
SetHoldabilityReq.Builder builder = SetHoldabilityReq.newBuilder();
SetHoldabilityReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setHoldability(holdability)
.build();
return connectionBlockingStub.setHoldability(req);
@@ -128,9 +128,8 @@
@Override
public GetHoldabilityResp getHoldability(String connectionId) {
- GetHoldabilityReq.Builder builder = GetHoldabilityReq.newBuilder();
- GetHoldabilityReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getHoldability(req);
}
@@ -139,7 +138,7 @@
public DirectStatusResp setSchema(String connectionId, String schema) {
SetSchemaReq.Builder builder = SetSchemaReq.newBuilder();
SetSchemaReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSchema(schema)
.build();
return connectionBlockingStub.setSchema(req);
@@ -147,9 +146,8 @@
@Override
public GetSchemaResp getSchema(String connectionId) {
- GetSchemaReq.Builder builder = GetSchemaReq.newBuilder();
- GetSchemaReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getSchema(req);
}
@@ -158,7 +156,7 @@
public DirectStatusResp setNetworkTimeout(String connectionId, int milliseconds) {
SetNetworkTimeoutReq.Builder builder = SetNetworkTimeoutReq.newBuilder();
SetNetworkTimeoutReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setMilliseconds(milliseconds)
.build();
return connectionBlockingStub.setNetworkTimeout(req);
@@ -166,9 +164,8 @@
@Override
public GetNetworkTimeoutResp getNetworkTimeout(String connectionId) {
- GetNetworkTimeoutReq.Builder builder = GetNetworkTimeoutReq.newBuilder();
- GetNetworkTimeoutReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getNetworkTimeout(req);
}
@@ -177,7 +174,7 @@
public SetSavepointResp setSavepoint(String connectionId) {
SetSavepointReq.Builder builder = SetSavepointReq.newBuilder();
SetSavepointReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.build();
return connectionBlockingStub.setSavepoint(req);
}
@@ -186,7 +183,7 @@
public SetSavepointResp setSavepoint(String connectionId, String name) {
SetSavepointReq.Builder builder = SetSavepointReq.newBuilder();
SetSavepointReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSavepointName(name)
.build();
return connectionBlockingStub.setSavepoint(req);
@@ -196,7 +193,7 @@
public DirectStatusResp releaseSavepoint(String connectionId, Savepoint savepoint) {
ReleaseSavepointReq.Builder builder = ReleaseSavepointReq.newBuilder();
ReleaseSavepointReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSavepoint(savepoint)
.build();
return connectionBlockingStub.releaseSavepoint(req);
@@ -207,7 +204,7 @@
public DirectStatusResp setSchema(String connectionId, String schema, String catalog) {
SetSchemaReq.Builder builder = SetSchemaReq.newBuilder();
SetSchemaReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSchema(schema)
.build();
return connectionBlockingStub.setSchema(req);
@@ -217,7 +214,7 @@
public IsValidResp isValid(String connectionId, int timeout) {
IsValidReq.Builder builder = IsValidReq.newBuilder();
IsValidReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setTimeout(timeout)
.build();
return connectionBlockingStub.isValid(req);
@@ -227,7 +224,7 @@
public NativeSQLResp nativeSQL(String connectionId, String sql) {
NativeSQLReq.Builder builder = NativeSQLReq.newBuilder();
NativeSQLReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSql(sql)
.build();
return connectionBlockingStub.nativeSQL(req);
@@ -237,7 +234,7 @@
public DirectStatusResp setAutoCommit(String connectionId, boolean autoCommit) {
SetAutoCommitReq.Builder builder = SetAutoCommitReq.newBuilder();
SetAutoCommitReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setAutoCommit(autoCommit)
.build();
return connectionBlockingStub.setAutoCommit(req);
@@ -245,18 +242,16 @@
@Override
public GetAutoCommitResp getAutoCommit(String connectionId) {
- GetAutoCommitReq.Builder builder = GetAutoCommitReq.newBuilder();
- GetAutoCommitReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getAutoCommit(req);
}
@Override
public DirectStatusResp commit(String connectionId) {
- CommitReq.Builder builder = CommitReq.newBuilder();
- CommitReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.commit(req);
}
@@ -265,7 +260,7 @@
public DirectStatusResp rollback(String connectionId) {
RollbackReq.Builder builder = RollbackReq.newBuilder();
RollbackReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.build();
return connectionBlockingStub.rollback(req);
}
@@ -276,7 +271,7 @@
.setSavepointId(savepointId)
.build();
RollbackReq req = RollbackReq.newBuilder()
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSavepoint(sp)
.build();
return connectionBlockingStub.rollback(req);
@@ -289,7 +284,7 @@
.setSavepointName(savepointName)
.build();
RollbackReq req = RollbackReq.newBuilder()
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSavepoint(sp)
.build();
return connectionBlockingStub.rollback(req);
@@ -301,7 +296,7 @@
.setSavepointName(savepointName)
.build();
RollbackReq req = RollbackReq.newBuilder()
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setSavepoint(sp)
.build();
return connectionBlockingStub.rollback(req);
@@ -311,7 +306,7 @@
public DirectStatusResp setReadOnly(String connectionId, boolean readOnly) {
SetReadOnlyReq.Builder builder = SetReadOnlyReq.newBuilder();
SetReadOnlyReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setReadOnly(readOnly)
.build();
return connectionBlockingStub.setReadOnly(req);
@@ -319,9 +314,8 @@
@Override
public IsReadOnlyResp isReadOnly(String connectionId) {
- IsReadOnlyReq.Builder builder = IsReadOnlyReq.newBuilder();
- IsReadOnlyReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.isReadOnly(req);
}
@@ -330,7 +324,7 @@
public DirectStatusResp setCatalog(String connectionId, String catalog) {
SetCatalogReq.Builder builder = SetCatalogReq.newBuilder();
SetCatalogReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setCatalog(catalog)
.build();
return connectionBlockingStub.setCatalog(req);
@@ -338,9 +332,8 @@
@Override
public GetCatalogResp getCatalog(String connectionId) {
- GetCatalogReq.Builder builder = GetCatalogReq.newBuilder();
- GetCatalogReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getCatalog(req);
}
@@ -350,7 +343,7 @@
public DirectStatusResp setTransactionIsolation(String connectionId, int level) {
SetTransactionIsolationReq.Builder builder = SetTransactionIsolationReq.newBuilder();
SetTransactionIsolationReq req = builder
- .setConnectionId(connectionId)
+ .setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build())
.setLevel(level)
.build();
return connectionBlockingStub.setTransactionIsolation(req);
@@ -358,31 +351,58 @@
@Override
public GetTransactionIsolationResp getTransactionIsolation(String connectionId) {
- GetTransactionIsolationReq.Builder builder = GetTransactionIsolationReq.newBuilder();
- GetTransactionIsolationReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getTransactionIsolation(req);
}
@Override
public DirectStatusResp clearWarnings(String connectionId) {
- ClearWarningsReq.Builder builder = ClearWarningsReq.newBuilder();
- ClearWarningsReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.clearWarnings(req);
}
@Override
public GetWarningsResp getWarnings(String connectionId) {
- GetWarningsReq.Builder builder = GetWarningsReq.newBuilder();
- GetWarningsReq req = builder
- .setConnectionId(connectionId)
+ ConnectionHandle req = ConnectionHandle.newBuilder()
+ .setId(connectionId)
.build();
return connectionBlockingStub.getWarnings(req);
}
+ public DirectStatusResp createStatement(String connectionId, Optional<String> statementId) {
+ CreateStatementReq.Builder builder = CreateStatementReq.newBuilder();
+ builder.setConnectionId(ConnectionHandle.newBuilder().setId(connectionId).build());
+ if (statementId.isPresent()) {
+ StatementHandle handle = StatementHandle.newBuilder()
+ .setId(statementId.get())
+ .build();
+ builder.setStatementId(handle);
+ }
+ return statementBlockingStub.createStatement(builder.build());
+ }
+
+ public DirectStatusResp closeStatement(String statementId) {
+ StatementHandle req = StatementHandle.newBuilder()
+ .setId(statementId)
+ .build();
+ return statementBlockingStub.closeStatement(req);
+ }
+
+ public DirectStatusResp executeQuery(String statementId, String sql) {
+ StatementHandle handle = StatementHandle.newBuilder()
+ .setId(statementId)
+ .build();
+ ExecuteQueryReq req = ExecuteQueryReq.newBuilder()
+ .setStatementId(handle)
+ .setSql(sql)
+ .build();
+ return statementBlockingStub.executeQuery(req);
+ }
+
@Override
public DirectStatusResp getCatalogs(String connectionId) {
GetCatalogsReq.Builder builder = GetCatalogsReq.newBuilder();
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
index 6b30546..f6eceda 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/DummyJdbcService.java
@@ -15,8 +15,6 @@
public DummyJdbcService() throws IOException {
}
-
-
@Override
public void getCatalogs(GetCatalogsReq req, StreamObserver<DirectStatusResp> respOb) {
DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
index 1a9617b..71af03f 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestConnectionService.java
@@ -2,6 +2,7 @@
import io.grpc.stub.StreamObserver;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
import org.apache.kyuubi.grpc.jdbc.*;
import org.apache.kyuubi.grpc.jdbc.connection.ConnectionGrpc;
import org.apache.kyuubi.grpc.jdbc.connection.*;
@@ -15,41 +16,56 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static org.apache.kyuubi.grpc.GrpcUtils.OK;
public class TestConnectionService extends ConnectionGrpc.ConnectionImplBase {
-
private final Path _tempDir = Files.createTempDirectory(getClass().getSimpleName());
-
public String defaultCatalogName = _tempDir.getFileName().toString().toUpperCase();
private final String jdbcUrl = "jdbc:h2:" + _tempDir + ";MODE=DB2;user=testUser;password=testPass";
- private Connection _connection = null;
-
- private final Executor executor = Executors.newSingleThreadExecutor();
-
+ private Map<ConnectionHandle, Connection> connections = new HashMap<>();
+ private Executor executor = Executors.newSingleThreadExecutor();
private final Map<Savepoint, java.sql.Savepoint> savepoints = new HashMap<>();
-
- public void setConnection(Connection _connection) {
- this._connection = _connection;
+ public Connection getConnection(ConnectionHandle connectionId, Properties properties) throws SQLException {
+ if (connections.containsKey(connectionId)) {
+ return connections.get(connectionId);
+ } else {
+ Connection conn = DriverManager.getConnection(jdbcUrl, properties);
+ connections.put(connectionId, conn);
+ return conn;
+ }
}
- public Connection getConnection() throws SQLException {
- if (this._connection == null) {
- Connection conn = DriverManager.getConnection(jdbcUrl);
- setConnection(conn);
- return conn;
+ public Connection getConnection(ConnectionHandle connectionId) throws SQLException {
+ if (connections.containsKey(connectionId)) {
+ return connections.get(connectionId);
} else {
- return _connection;
+ Connection conn = DriverManager.getConnection(jdbcUrl, new Properties());
+ connections.put(connectionId, conn);
+ return conn;
}
}
public TestConnectionService() throws IOException {
}
- private Status errorStatus(Exception e) {
+ public void stop() {
+ for (Connection conn : connections.values()) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ connections.clear();
+ connections = null;
+ executor = null;
+ }
+
+ public Status errorStatus(Exception e) {
return Status.newBuilder()
.setSqlState("38808")
.setErrorMessage(e.getMessage())
@@ -57,14 +73,14 @@
.build();
}
- private DirectStatusResp error(Exception e) {
+ public DirectStatusResp error(Exception e) {
return DirectStatusResp
.newBuilder()
.setStatus(errorStatus(e))
.build();
}
- private DirectStatusResp ok(String id) {
+ public DirectStatusResp ok(String id) {
return DirectStatusResp
.newBuilder()
.setStatus(OK)
@@ -75,16 +91,19 @@
@Override
public void openConnection(OpenConnectionReq req, StreamObserver<DirectStatusResp> respOb) {
DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
- if (req.getConnectionId().isEmpty()) {
- builder.setIdentifier("hello, kyuubi");
- } else {
- builder.setIdentifier("hello, " + req.getConnectionId());
- }
try {
+ ConnectionHandle connectionId = null;
Properties properties = new Properties();
properties.putAll(req.getConfigsMap());
- setConnection(
- DriverManager.getConnection(jdbcUrl, properties));
+ if (req.getConnectionId() == ConnectionHandle.getDefaultInstance()) {
+ connectionId = ConnectionHandle.newBuilder()
+ .setId(UUID.randomUUID().toString())
+ .build();
+ } else {
+ connectionId = req.getConnectionId();
+ }
+ getConnection(connectionId, properties);
+ builder.setIdentifier(connectionId.getId());
builder.putExtraInfo("apache", "kyuubi");
builder.putExtraInfo("Kyuubi", "Serverless SQL on Lakehouse");
builder.setStatus(OK);
@@ -104,20 +123,21 @@
}
@Override
- public void closeConnection(CloseConnectionReq req, StreamObserver<DirectStatusResp> respOb) {
+ public void closeConnection(ConnectionHandle req, StreamObserver<DirectStatusResp> respOb) {
DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
try {
- if (req.getConnectionId().isEmpty()) {
- throw new IllegalArgumentException("ConnectionId cannot be empty for CloseConnection");
+ if (!connections.containsKey(req)) {
+ throw new IllegalArgumentException("invalid connection id " + req.getId());
} else {
- builder.setIdentifier(req.getConnectionId());
+ getConnection(req).close();
+ builder.setIdentifier(req.getId());
}
builder.setStatus(OK);
} catch (Exception e) {
Status status = Status.newBuilder()
.setStatusCode(StatusCode.ERROR)
.setSqlState("2E000")
- .setErrorMessage("invalid connection id" + req.getConnectionId())
+ .setErrorMessage(e.getMessage())
.build();
builder.setStatus(status);
}
@@ -129,7 +149,7 @@
public void nativeSQL(NativeSQLReq request, StreamObserver<NativeSQLResp> responseObserver) {
NativeSQLResp.Builder builder = NativeSQLResp.newBuilder();
try {
- String nativeSQL = getConnection().nativeSQL(request.getSql());
+ String nativeSQL = getConnection(request.getConnectionId()).nativeSQL(request.getSql());
NativeSQLResp resp = builder
.setSql(nativeSQL)
.setStatus(OK)
@@ -150,8 +170,8 @@
@Override
public void setAutoCommit(SetAutoCommitReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setAutoCommit(request.getAutoCommit());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setAutoCommit(request.getAutoCommit());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -159,10 +179,10 @@
}
@Override
- public void getAutoCommit(GetAutoCommitReq request, StreamObserver<GetAutoCommitResp> responseObserver) {
+ public void getAutoCommit(ConnectionHandle request, StreamObserver<GetAutoCommitResp> responseObserver) {
GetAutoCommitResp.Builder builder = GetAutoCommitResp.newBuilder();
try {
- boolean autoCommit = getConnection().getAutoCommit();
+ boolean autoCommit = getConnection(request).getAutoCommit();
GetAutoCommitResp resp = builder
.setStatus(OK)
.setAutoCommit(autoCommit)
@@ -175,10 +195,10 @@
}
@Override
- public void commit(CommitReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ public void commit(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().commit();
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request).commit();
+ responseObserver.onNext(ok(request.getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -188,14 +208,14 @@
@Override
public void rollback(RollbackReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- Connection conn = getConnection();
+ Connection conn = getConnection(request.getConnectionId());
Savepoint savepoint = request.getSavepoint();
if (savepoint == Savepoint.getDefaultInstance()) {
conn.rollback();
} else {
conn.rollback(savepoints.get(savepoint));
}
- responseObserver.onNext(ok(request.getConnectionId()));
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -205,8 +225,8 @@
@Override
public void setReadOnly(SetReadOnlyReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setReadOnly(request.getReadOnly());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setReadOnly(request.getReadOnly());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -214,12 +234,12 @@
}
@Override
- public void isReadOnly(IsReadOnlyReq request, StreamObserver<IsReadOnlyResp> responseObserver) {
+ public void isReadOnly(ConnectionHandle request, StreamObserver<IsReadOnlyResp> responseObserver) {
IsReadOnlyResp.Builder builder = IsReadOnlyResp.newBuilder();
try {
IsReadOnlyResp resp = builder
.setStatus(OK)
- .setReadOnly(getConnection().isReadOnly())
+ .setReadOnly(getConnection(request).isReadOnly())
.build();
responseObserver.onNext(resp);
} catch (SQLException e) {
@@ -234,8 +254,8 @@
@Override
public void setCatalog(SetCatalogReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setCatalog(request.getCatalog());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setCatalog(request.getCatalog());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -243,10 +263,10 @@
}
@Override
- public void getCatalog(GetCatalogReq request, StreamObserver<GetCatalogResp> responseObserver) {
+ public void getCatalog(ConnectionHandle request, StreamObserver<GetCatalogResp> responseObserver) {
GetCatalogResp.Builder builder = GetCatalogResp.newBuilder();
try {
- String catalog = getConnection().getCatalog();
+ String catalog = getConnection(request).getCatalog();
GetCatalogResp resp = builder
.setStatus(OK)
.setCatalog(catalog)
@@ -264,8 +284,8 @@
@Override
public void setTransactionIsolation(SetTransactionIsolationReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setTransactionIsolation(request.getLevel());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setTransactionIsolation(request.getLevel());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -273,10 +293,10 @@
}
@Override
- public void getTransactionIsolation(GetTransactionIsolationReq request, StreamObserver<GetTransactionIsolationResp> responseObserver) {
+ public void getTransactionIsolation(ConnectionHandle request, StreamObserver<GetTransactionIsolationResp> responseObserver) {
GetTransactionIsolationResp.Builder builder = GetTransactionIsolationResp.newBuilder();
try {
- int level = getConnection().getTransactionIsolation();
+ int level = getConnection(request).getTransactionIsolation();
GetTransactionIsolationResp resp = builder
.setStatus(OK)
.setLevel(level)
@@ -291,30 +311,14 @@
responseObserver.onCompleted();
}
- public static SQLWarning buildWarning(java.sql.SQLWarning cur) {
- SQLWarning.Builder builder = SQLWarning.newBuilder();
- if (cur.getMessage() != null) {
- builder.setReason(cur.getMessage());
- }
- if (cur.getSQLState() != null) {
- builder.setSqlState(cur.getSQLState());
- }
- if (cur.getErrorCode() != 0) {
- builder.setVendorCode(cur.getErrorCode());
- }
- if (cur.getNextWarning() != null) {
- builder.setNextWarning(buildWarning(cur.getNextWarning()));
- }
- return builder.build();
- }
@Override
- public void getWarnings(GetWarningsReq request, StreamObserver<GetWarningsResp> responseObserver) {
+ public void getWarnings(ConnectionHandle request, StreamObserver<GetWarningsResp> responseObserver) {
GetWarningsResp.Builder builder = GetWarningsResp.newBuilder();
try {
- java.sql.SQLWarning warnings = getConnection().getWarnings();
+ SQLWarning warnings = GrpcUtils.toProto(getConnection(request).getWarnings());
if (warnings != null) {
- builder.setWarnings(buildWarning(getConnection().getWarnings()));
+ builder.setWarnings(warnings);
}
responseObserver.onNext(builder.setStatus(OK).build());
} catch (SQLException e) {
@@ -324,10 +328,10 @@
}
@Override
- public void clearWarnings(ClearWarningsReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ public void clearWarnings(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().clearWarnings();
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request).clearWarnings();
+ responseObserver.onNext(ok(request.getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -342,8 +346,8 @@
for (Map.Entry<String, String> entry : request.getTypeToClassMap().entrySet()) {
classMap.put(entry.getKey(), Class.forName(entry.getValue()));
}
- getConnection().setTypeMap(classMap);
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setTypeMap(classMap);
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (Exception e) {
responseObserver.onNext(error(e));
}
@@ -351,10 +355,10 @@
}
@Override
- public void getTypeMap(GetTypeMapReq request, StreamObserver<GetTypeMapResp> responseObserver) {
+ public void getTypeMap(ConnectionHandle request, StreamObserver<GetTypeMapResp> responseObserver) {
GetTypeMapResp.Builder builder = GetTypeMapResp.newBuilder();
try {
- Map<String, Class<?>> typeMap = getConnection().getTypeMap();
+ Map<String, Class<?>> typeMap = getConnection(request).getTypeMap();
if (typeMap != null) {
Map<String, String> typeToClassMap = new HashMap<String, String>();
for (Map.Entry<String, Class<?>> entry : typeMap.entrySet()) {
@@ -373,8 +377,8 @@
@Override
public void setSchema(SetSchemaReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setSchema(request.getSchema());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setSchema(request.getSchema());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -382,10 +386,10 @@
}
@Override
- public void getSchema(GetSchemaReq request, StreamObserver<GetSchemaResp> responseObserver) {
+ public void getSchema(ConnectionHandle request, StreamObserver<GetSchemaResp> responseObserver) {
GetSchemaResp.Builder builder = GetSchemaResp.newBuilder();
try {
- String schema = getConnection().getSchema();
+ String schema = getConnection(request).getSchema();
// schema can be null, can you verify this?
GetSchemaResp resp = builder
.setStatus(OK)
@@ -404,8 +408,8 @@
@Override
public void setNetworkTimeout(SetNetworkTimeoutReq request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().setNetworkTimeout(executor, request.getMilliseconds());
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setNetworkTimeout(executor, request.getMilliseconds());
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -413,10 +417,10 @@
}
@Override
- public void getNetworkTimeout(GetNetworkTimeoutReq request, StreamObserver<GetNetworkTimeoutResp> responseObserver) {
+ public void getNetworkTimeout(ConnectionHandle request, StreamObserver<GetNetworkTimeoutResp> responseObserver) {
GetNetworkTimeoutResp.Builder builder = GetNetworkTimeoutResp.newBuilder();
try {
- int timeout = getConnection().getNetworkTimeout();
+ int timeout = getConnection(request).getNetworkTimeout();
GetNetworkTimeoutResp resp = builder
.setStatus(OK)
.setMilliseconds(timeout)
@@ -435,7 +439,7 @@
public void isValid(IsValidReq request, StreamObserver<IsValidResp> responseObserver) {
IsValidResp.Builder builder = IsValidResp.newBuilder();
try {
- boolean valid = getConnection().isValid(request.getTimeout());
+ boolean valid = getConnection(request.getConnectionId()).isValid(request.getTimeout());
IsValidResp resp = builder
.setStatus(OK)
.setValid(valid)
@@ -450,10 +454,10 @@
}
@Override
- public void abortConnection(AbortConnectionReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ public void abortConnection(ConnectionHandle request, StreamObserver<DirectStatusResp> responseObserver) {
try {
- getConnection().abort(executor);
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request).abort(executor);
+ responseObserver.onNext(ok(request.getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -465,8 +469,8 @@
try {
Properties properties = new Properties();
properties.putAll(request.getConfigsMap());
- getConnection().setClientInfo(properties);
- responseObserver.onNext(ok(request.getConnectionId()));
+ getConnection(request.getConnectionId()).setClientInfo(properties);
+ responseObserver.onNext(ok(request.getConnectionId().getId()));
} catch (SQLException e) {
responseObserver.onNext(error(e));
}
@@ -474,10 +478,10 @@
}
@Override
- public void getClientInfo(GetClientInfoReq request, StreamObserver<GetClientInfoResp> responseObserver) {
+ public void getClientInfo(ConnectionHandle request, StreamObserver<GetClientInfoResp> responseObserver) {
GetClientInfoResp.Builder builder = GetClientInfoResp.newBuilder();
try {
- Properties properties = getConnection().getClientInfo();
+ Properties properties = getConnection(request).getClientInfo();
// use a loop to put all properties into the builder
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
builder.putConfigs(entry.getKey().toString(), entry.getValue().toString());
@@ -488,5 +492,4 @@
}
responseObserver.onCompleted();
}
-
}
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java
new file mode 100644
index 0000000..8e52570
--- /dev/null
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/TestStatementService.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc;
+
+import com.google.protobuf.ProtocolStringList;
+import io.grpc.stub.StreamObserver;
+import org.apache.kyuubi.grpc.common.ConnectionHandle;
+import org.apache.kyuubi.grpc.common.StatementHandle;
+import org.apache.kyuubi.grpc.jdbc.DirectStatusResp;
+import org.apache.kyuubi.grpc.jdbc.statement.*;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.*;
+
+import static org.apache.kyuubi.grpc.GrpcUtils.OK;
+
+public class TestStatementService extends StatementGrpc.StatementImplBase {
+
+ private final TestConnectionService connectionService;
+
+ private final Map<StatementHandle, Statement> statements = new HashMap<>();
+
+ private Statement getStatement(StatementHandle statementId) {
+ Statement statement = statements.get(statementId);
+ if (statement == null) {
+ throw new IllegalArgumentException("Statement Id " + statementId.getId() + " not found");
+ }
+ return statement;
+ }
+
+ public TestStatementService(TestConnectionService connectionService) {
+ this.connectionService = connectionService;
+ }
+
+ @Override
+ public void createStatement(CreateStatementReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ DirectStatusResp.Builder builder = DirectStatusResp.newBuilder();
+ try {
+ ConnectionHandle connectionId = request.getConnectionId();
+ int resultSetType = GrpcUtils.toJDBC(request.getResultSetType());
+ int resultSetConcurrency = GrpcUtils.toJDBC(request.getResultSetConcurrency());
+ int resultSetHoldability = GrpcUtils.toJDBC(request.getResultSetHoldability());
+ StatementHandle statementId = request.getStatementId();
+ if (statementId == StatementHandle.getDefaultInstance()) {
+ statementId = StatementHandle.newBuilder()
+ .setId(UUID.randomUUID().toString())
+ .build();
+ }
+
+ if (!statements.containsKey(statementId)) {
+ Connection connection = connectionService.getConnection(connectionId);
+ Statement statement = connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ statements.put(statementId, statement);
+ }
+
+ DirectStatusResp resp = builder
+ .setIdentifier(statementId.getId())
+ .setStatus(OK)
+ .build();
+ responseObserver.onNext(resp);
+ } catch (Exception e) {
+ connectionService.error(e);
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void closeStatement(StatementHandle request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ Statement stmt = statements.remove(request);
+ if (stmt == null) {
+ throw new IllegalArgumentException("Statement Id " + request.getId() + " not found");
+ }
+ stmt.close();
+ responseObserver.onNext(connectionService.ok(request.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void executeQuery(ExecuteQueryReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ ExecuteQueryReq.AutoGeneratedKeysCase keysCase = request.getAutoGeneratedKeysCase();
+ switch (keysCase) {
+ case AUTOGENERATEDKEY:
+ int autoGeneratedKeys = GrpcUtils.toJDBC(request.getAutoGeneratedKey());
+ statement.execute(request.getSql(), autoGeneratedKeys);
+ break;
+ case COLUMNINDEXES:
+ List<Integer> indexesList = request.getColumnIndexes().getColumnIndexesList();
+ int[] columnIndexes = indexesList.stream().mapToInt(Integer::intValue).toArray();
+ statement.execute(request.getSql(), columnIndexes);
+ break;
+ case COLUMNNAMES:
+ ProtocolStringList namesList = request.getColumnNames().getColumnNamesList();
+ String[] columnNames = namesList.toArray(new String[namesList.size()]);
+ statement.execute(request.getSql(), columnNames);
+ break;
+ case AUTOGENERATEDKEYS_NOT_SET:
+ statement.executeQuery(request.getSql());
+ break;
+ }
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void executeUpdate(ExecuteQueryReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ ExecuteQueryReq.AutoGeneratedKeysCase keysCase = request.getAutoGeneratedKeysCase();
+ switch (keysCase) {
+ case AUTOGENERATEDKEY:
+ statement.execute(request.getSql(), request.getAutoGeneratedKey().getNumber());
+ break;
+ case COLUMNINDEXES:
+ List<Integer> indexesList = request.getColumnIndexes().getColumnIndexesList();
+ int[] columnIndexes = indexesList.stream().mapToInt(Integer::intValue).toArray();
+ statement.execute(request.getSql(), columnIndexes);
+ break;
+ case COLUMNNAMES:
+ ProtocolStringList namesList = request.getColumnNames().getColumnNamesList();
+ String[] columnNames = namesList.toArray(new String[namesList.size()]);
+ statement.execute(request.getSql(), columnNames);
+ break;
+ case AUTOGENERATEDKEYS_NOT_SET:
+ statement.executeQuery(request.getSql());
+ break;
+ }
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getMaxFieldSize(StatementHandle request, StreamObserver<GetMaxFieldSizeResp> responseObserver) {
+ GetMaxFieldSizeResp.Builder builder = GetMaxFieldSizeResp.newBuilder();
+ try {
+ Statement statement = getStatement(request);
+ int maxFieldSize = statement.getMaxFieldSize();
+ GetMaxFieldSizeResp resp = builder
+ .setStatus(OK)
+ .setMax(maxFieldSize)
+ .build();
+ responseObserver.onNext(resp);
+ } catch (Exception e) {
+ GetMaxFieldSizeResp resp = builder
+ .setStatus(connectionService.errorStatus(e))
+ .build();
+ responseObserver.onNext(resp);
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void setMaxFieldSize(SetMaxFieldSizeReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ statement.setMaxFieldSize(request.getMax());
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getMaxRows(StatementHandle request, StreamObserver<GetMaxRowsResp> responseObserver) {
+ GetMaxRowsResp.Builder builder = GetMaxRowsResp.newBuilder();
+ try {
+ Statement statement = getStatement(request);
+ int maxRows = statement.getMaxRows();
+ GetMaxRowsResp resp = builder
+ .setStatus(OK)
+ .setMax(maxRows)
+ .build();
+ responseObserver.onNext(resp);
+ } catch (Exception e) {
+ GetMaxRowsResp resp = builder
+ .setStatus(connectionService.errorStatus(e))
+ .build();
+ responseObserver.onNext(resp);
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void setMaxRows(SetMaxRowsReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ statement.setMaxRows(request.getMax());
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void setQueryTimeout(SetQueryTimeoutReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ statement.setQueryTimeout(request.getTimeout());
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getQueryTimeout(StatementHandle request, StreamObserver<GetQueryTimeoutResp> responseObserver) {
+ GetQueryTimeoutResp.Builder builder = GetQueryTimeoutResp.newBuilder();
+ try {
+ Statement statement = getStatement(request);
+ int timeout = statement.getQueryTimeout();
+ GetQueryTimeoutResp resp = builder
+ .setStatus(OK)
+ .setTimeout(timeout)
+ .build();
+ responseObserver.onNext(resp);
+ } catch (Exception e) {
+ GetQueryTimeoutResp resp = builder
+ .setStatus(connectionService.errorStatus(e))
+ .build();
+ responseObserver.onNext(resp);
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getWarnings(StatementHandle request, StreamObserver<GetWarningsResp> responseObserver) {
+ GetWarningsResp.Builder builder = GetWarningsResp.newBuilder();
+ try {
+ Statement statement = getStatement(request);
+ java.sql.SQLWarning warnings = statement.getWarnings();
+ GetWarningsResp resp = builder
+ .setStatus(OK)
+ .setWarnings(GrpcUtils.toProto(warnings))
+ .build();
+ responseObserver.onNext(resp);
+ } catch (Exception e) {
+ GetWarningsResp resp = builder
+ .setStatus(connectionService.errorStatus(e))
+ .build();
+ responseObserver.onNext(resp);
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void setEscapeProcessing(SetEscapeProcessingReq request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ StatementHandle statementId = request.getStatementId();
+ Statement statement = getStatement(statementId);
+ statement.setEscapeProcessing(request.getEnable());
+ responseObserver.onNext(connectionService.ok(statementId.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void cancelStatement(StatementHandle request, StreamObserver<DirectStatusResp> responseObserver) {
+ try {
+ Statement statement = getStatement(request);
+ statement.cancel();
+ responseObserver.onNext(connectionService.ok(request.getId()));
+ } catch (Exception e) {
+ responseObserver.onNext(connectionService.error(e));
+ }
+ responseObserver.onCompleted();
+ }
+}
diff --git a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
index 098746a..443b458 100644
--- a/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
+++ b/jdbc-grpc-client/src/test/java/org/apache/kyuubi/grpc/client/SimpleBlockingJdbcServiceClientTest.java
@@ -10,10 +10,7 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -26,6 +23,7 @@
private SimpleBlockingJdbcClient client = null;
DummyJdbcService dummyFrontendService = new DummyJdbcService();
TestConnectionService dummyConnectionService = new TestConnectionService();
+ TestStatementService dummyStatementService = new TestStatementService(dummyConnectionService);
public SimpleBlockingJdbcServiceClientTest() throws IOException {
}
@@ -35,6 +33,7 @@
ServerCredentials serverCredentials = InsecureServerCredentials.create();
ServerBuilder<?> builder = Grpc.newServerBuilderForPort(port, serverCredentials)
.addService(dummyConnectionService)
+ .addService(dummyStatementService)
.addService(dummyFrontendService);
server = builder.build();
server.start();
@@ -43,6 +42,7 @@
@After
public void tearDown() {
+ dummyConnectionService.stop();
if (server != null) {
try {
server.awaitTermination(5, TimeUnit.SECONDS);
@@ -63,28 +63,31 @@
Map<String, String> configs = new HashMap<>();
DirectStatusResp resp = client.openConnection(configs, Optional.empty());
Status status = resp.getStatus();
- assertEquals("hello, kyuubi", resp.getIdentifier());
+ String connectionId = resp.getIdentifier();
assertEquals(StatusCode.OK, status.getStatusCode());
assertEquals("00000", status.getSqlState());
assertEquals("Serverless SQL on Lakehouse", resp.getExtraInfoOrThrow("Kyuubi"));
configs.put("apache", "kyuubi");
configs.put("kent", "yao");
- DirectStatusResp resp1 = client.openConnection(configs, Optional.of("20181117"));
+ DirectStatusResp resp1 = client.openConnection(configs, Optional.of(connectionId));
Status status1 = resp1.getStatus();
- assertEquals("hello, 20181117", resp1.getIdentifier());
+ assertEquals(connectionId, resp1.getIdentifier());
assertEquals(StatusCode.OK, status1.getStatusCode());
assertEquals("kyuubi", resp1.getExtraInfoOrThrow("apache"));
}
@Test
public void testCloseConnection() {
- DirectStatusResp resp1 = client.closeConnection("");
+ String connectionId = UUID.randomUUID().toString();
+ DirectStatusResp resp1 = client.closeConnection(connectionId);
Status resp1Status = resp1.getStatus();
assertEquals(StatusCode.ERROR, resp1Status.getStatusCode());
assertEquals("2E000", resp1Status.getSqlState());
- assertEquals("invalid connection id", resp1Status.getErrorMessage());
- DirectStatusResp resp2 = client.closeConnection("apache kyuubi");
+ assertEquals("invalid connection id " + connectionId, resp1Status.getErrorMessage());
+ DirectStatusResp openConn = client.openConnection(Collections.emptyMap(), Optional.empty());
+ connectionId = openConn.getIdentifier();
+ DirectStatusResp resp2 = client.closeConnection(connectionId);
Status resp2Status = resp2.getStatus();
assertEquals(StatusCode.OK, resp2Status.getStatusCode());
assertEquals("00000", resp2Status.getSqlState());
@@ -181,7 +184,7 @@
public void testBuildSQLWarnings() {
java.sql.SQLWarning warning1 = new java.sql.SQLWarning("warning1");
warning1.setNextWarning(new java.sql.SQLWarning("warning2"));
- SQLWarning warning = TestConnectionService.buildWarning(warning1);
+ SQLWarning warning = GrpcUtils.toProto(warning1);
assertEquals("warning1", warning.getReason());
assertEquals("warning2", warning.getNextWarning().getReason());
}
@@ -258,4 +261,17 @@
DirectStatusResp resp = client.abortConnection("kyuubi");
assertEquals(StatusCode.OK, resp.getStatus().getStatusCode());
}
+
+ @Test
+ public void testExecuteQuery() {
+ String sql = "SELECT * FROM UNNEST(ARRAY['a', 'b', 'c'])";
+ DirectStatusResp resp = client.executeQuery("kyuubi", sql);
+ assertEquals(StatusCode.ERROR, resp.getStatus().getStatusCode());
+ assertTrue(resp.getStatus().getErrorMessage().contains("Statement Id kyuubi not found"));
+ DirectStatusResp resp1 = client.createStatement("kyuubi", Optional.empty());
+ String statementId = resp1.getIdentifier();
+ DirectStatusResp resp2 = client.executeQuery(statementId, sql);
+ assertEquals(StatusCode.OK, resp2.getStatus().getStatusCode());
+ client.closeStatement(statementId);
+ }
}
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
index def741e..6b82683 100644
--- a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/common/errors.proto
@@ -40,4 +40,9 @@
// a database vendor-specific warning code
uint32 vendor_code = 3;
SQLWarning next_warning = 4;
-}
\ No newline at end of file
+}
+
+message GetWarningsResp {
+ Status status = 1;
+ SQLWarning warnings = 2;
+}
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto
new file mode 100644
index 0000000..8ce0b13
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/common.proto
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.common";
+
+message ConnectionHandle {
+ string id = 1;
+}
+
+message StatementHandle {
+ string id = 1;
+}
+
+message OperationHandle {
+ string id = 1;
+}
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
index bcde549..0fc2079 100644
--- a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/connection.proto
@@ -17,6 +17,7 @@
syntax = "proto3";
import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
import "org/apache/kyuubi/grpc/jdbc/response.proto";
option java_multiple_files = true;
@@ -45,53 +46,43 @@
*/
service Connection {
rpc OpenConnection(OpenConnectionReq) returns (DirectStatusResp);
- rpc CloseConnection(CloseConnectionReq) returns (DirectStatusResp);
- rpc AbortConnection(AbortConnectionReq) returns (DirectStatusResp);
+ rpc CloseConnection(ConnectionHandle) returns (DirectStatusResp);
+ rpc AbortConnection(ConnectionHandle) returns (DirectStatusResp);
rpc NativeSQL(NativeSQLReq) returns (NativeSQLResp);
rpc SetAutoCommit(SetAutoCommitReq) returns (DirectStatusResp);
- rpc GetAutoCommit(GetAutoCommitReq) returns (GetAutoCommitResp);
- rpc Commit(CommitReq) returns (DirectStatusResp);
+ rpc GetAutoCommit(ConnectionHandle) returns (GetAutoCommitResp);
+ rpc Commit(ConnectionHandle) returns (DirectStatusResp);
rpc Rollback(RollbackReq) returns (DirectStatusResp);
rpc SetReadOnly(SetReadOnlyReq) returns (DirectStatusResp);
- rpc IsReadOnly(IsReadOnlyReq) returns (IsReadOnlyResp);
+ rpc IsReadOnly(ConnectionHandle) returns (IsReadOnlyResp);
rpc SetCatalog(SetCatalogReq) returns (DirectStatusResp);
- rpc GetCatalog(GetCatalogReq) returns (GetCatalogResp);
+ rpc GetCatalog(ConnectionHandle) returns (GetCatalogResp);
rpc SetTransactionIsolation(SetTransactionIsolationReq) returns (DirectStatusResp);
- rpc GetTransactionIsolation(GetTransactionIsolationReq) returns (GetTransactionIsolationResp);
- rpc GetWarnings(GetWarningsReq) returns (GetWarningsResp);
- rpc ClearWarnings(ClearWarningsReq) returns (DirectStatusResp);
+ rpc GetTransactionIsolation(ConnectionHandle) returns (GetTransactionIsolationResp);
+ rpc GetWarnings(ConnectionHandle) returns (GetWarningsResp);
+ rpc ClearWarnings(ConnectionHandle) returns (DirectStatusResp);
rpc SetTypeMap(SetTypeMapReq) returns (DirectStatusResp);
- rpc GetTypeMap(GetTypeMapReq) returns (GetTypeMapResp);
+ rpc GetTypeMap(ConnectionHandle) returns (GetTypeMapResp);
rpc SetSchema(SetSchemaReq) returns (DirectStatusResp);
- rpc GetSchema(GetSchemaReq) returns (GetSchemaResp);
+ rpc GetSchema(ConnectionHandle) returns (GetSchemaResp);
rpc SetNetworkTimeout(SetNetworkTimeoutReq) returns (DirectStatusResp);
- rpc GetNetworkTimeout(GetNetworkTimeoutReq) returns (GetNetworkTimeoutResp);
+ rpc GetNetworkTimeout(ConnectionHandle) returns (GetNetworkTimeoutResp);
rpc SetHoldability(SetHoldabilityReq) returns (DirectStatusResp);
- rpc GetHoldability(GetHoldabilityReq) returns (GetHoldabilityResp);
+ rpc GetHoldability(ConnectionHandle) returns (GetHoldabilityResp);
rpc SetSavepoint(SetSavepointReq) returns (SetSavepointResp);
rpc ReleaseSavepoint(ReleaseSavepointReq) returns (DirectStatusResp);
rpc IsValid(IsValidReq) returns (IsValidResp);
rpc SetClientInfo(SetClientInfoReq) returns (DirectStatusResp);
- rpc GetClientInfo(GetClientInfoReq) returns (GetClientInfoResp);
+ rpc GetClientInfo(ConnectionHandle) returns (GetClientInfoResp);
}
message OpenConnectionReq {
- string connection_id = 1;
- map<string, string> configs = 2;
-}
-
-message CloseConnectionReq {
- string connection_id = 1;
- map<string, string> configs = 2;
-}
-
-message AbortConnectionReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
map<string, string> configs = 2;
}
message NativeSQLReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
string sql = 2;
}
@@ -101,70 +92,50 @@
}
message SetAutoCommitReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
bool autoCommit = 2;
}
-message GetAutoCommitReq {
- string connection_id = 1;
-}
-
message GetAutoCommitResp {
Status status = 1;
bool autoCommit = 2;
}
-message CommitReq {
- string connection_id = 1;
-}
-
message Savepoint {
uint32 savepoint_id = 1;
string savepoint_name = 2;
}
message RollbackReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
Savepoint savepoint = 2;
}
message SetReadOnlyReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
bool read_only = 2;
}
-message IsReadOnlyReq {
- string connection_id = 1;
-}
-
message IsReadOnlyResp {
Status status = 1;
bool read_only = 2;
}
message SetClientInfoReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
map<string, string> configs = 2;
}
-message GetClientInfoReq {
- string connection_id = 1;
-}
-
message GetClientInfoResp {
Status status = 1;
map<string, string> configs = 2;
}
message SetCatalogReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
string catalog = 2;
}
-message GetCatalogReq {
- string connection_id = 1;
-}
-
message GetCatalogResp {
Status status = 1;
string catalog = 2;
@@ -176,7 +147,7 @@
* @see java.sql.Connection
*/
message SetTransactionIsolationReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
// The possible transaction isolation levels
// level one of the following <code>Connection</code> constants:
// <code>Connection.TRANSACTION_READ_UNCOMMITTED(1)</code>,
@@ -186,101 +157,67 @@
uint32 level = 2;
}
-message GetTransactionIsolationReq {
- string connection_id = 1;
-}
-
message GetTransactionIsolationResp {
Status status = 1;
uint32 level = 2;
}
-message GetWarningsReq {
- string connection_id = 1;
-}
-
-message GetWarningsResp {
- Status status = 1;
- SQLWarning warnings = 2;
-}
-
-
-message ClearWarningsReq {
- string connection_id = 1;
-}
-
message TypeClassMap {
string type_name = 1;
string class_name = 2;
}
message SetTypeMapReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
map<string, string> type_to_class = 2;
}
-message GetTypeMapReq {
- string connection_id = 1;
-}
-
message GetTypeMapResp {
Status status = 1;
map<string, string> type_to_class = 2;
}
message SetSchemaReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
string schema = 2;
}
-message GetSchemaReq {
- string connection_id = 1;
-}
-
message GetSchemaResp {
Status status = 1;
string schema = 2;
}
message SetHoldabilityReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
// one of the following <code>ResultSet</code> constants:
// <code>ResultSet.HOLD_CURSORS_OVER_COMMIT</code> or
// <code>ResultSet.CLOSE_CURSORS_AT_COMMIT</code>
uint32 holdability = 2;
}
-message GetHoldabilityReq {
- string connection_id = 1;
-}
-
message GetHoldabilityResp {
Status status = 1;
uint32 holdability = 2;
}
message SetNetworkTimeoutReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
// The new network timeout value in milliseconds
uint32 milliseconds = 2;
}
-message GetNetworkTimeoutReq {
- string connection_id = 1;
-}
-
message GetNetworkTimeoutResp {
Status status = 1;
uint32 milliseconds = 2;
}
message ReleaseSavepointReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
Savepoint savepoint = 2;
}
message SetSavepointReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
string savepoint_name = 2;
}
@@ -290,7 +227,7 @@
}
message IsValidReq {
- string connection_id = 1;
+ ConnectionHandle connection_id = 1;
uint32 timeout = 2;
}
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto
new file mode 100644
index 0000000..4cc1eaa
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/resultset.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
+import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/response.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.jdbc.resultset";
+
+service ResultSet {
+ rpc Fetch(FetchRequest) returns (FetchResponse) {}
+ rpc Close(CloseRequest) returns (CloseResponse) {}
+}
+
+message FetchResultReq {
+ OperationHandle operation_id = 1;
+ int32 fetch_size = 1;
+}
\ No newline at end of file
diff --git a/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto
new file mode 100644
index 0000000..4596e3f
--- /dev/null
+++ b/jdbc-proto/src/main/protobuf/org/apache/kyuubi/grpc/jdbc/statment.proto
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "org/apache/kyuubi/grpc/jdbc/common.proto";
+import "org/apache/kyuubi/grpc/common/errors.proto";
+import "org/apache/kyuubi/grpc/jdbc/response.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.kyuubi.grpc.jdbc.statement";
+
+/**
+ * The service definition of JDBC statement service, mapping to
+ * java.sql.Statement
+ * TODO: add more methods
+ * - executeLargeBatch
+ * - executeLargeUpdate
+ */
+service Statement {
+ rpc CreateStatement(CreateStatementReq) returns (DirectStatusResp);
+ rpc CloseStatement(StatementHandle) returns (DirectStatusResp);
+ rpc ExecuteQuery(ExecuteQueryReq) returns (DirectStatusResp);
+ rpc ExecuteUpdate(ExecuteQueryReq) returns (DirectStatusResp);
+ rpc ExecuteLargeUpdate(ExecuteQueryReq) returns (DirectStatusResp);
+ rpc GetMaxFieldSize(StatementHandle) returns (GetMaxFieldSizeResp);
+ rpc SetMaxFieldSize(SetMaxFieldSizeReq) returns (DirectStatusResp);
+ rpc GetMaxRows(StatementHandle) returns (GetMaxRowsResp);
+ rpc SetMaxRows(SetMaxRowsReq) returns (DirectStatusResp);
+ rpc SetEscapeProcessing(SetEscapeProcessingReq) returns (DirectStatusResp);
+ rpc GetQueryTimeout(StatementHandle) returns (GetQueryTimeoutResp);
+ rpc SetQueryTimeout(SetQueryTimeoutReq) returns (DirectStatusResp);
+ rpc CancelStatement(StatementHandle) returns (DirectStatusResp);
+ rpc GetWarnings(StatementHandle) returns (GetWarningsResp);
+ rpc ClearWarnings(StatementHandle) returns (DirectStatusResp);
+ rpc SetCursorName(SetCursorNameReq) returns (DirectStatusResp);
+ rpc GetMoreResults(GetMoreResultsReq) returns (GetMoreResultsResp);
+ rpc GetFetchDirection(StatementHandle) returns (GetFetchDirectionResp);
+ rpc SetFetchDirection(SetFetchDirectionReq) returns (DirectStatusResp);
+ rpc GetFetchSize(StatementHandle) returns (GetFetchSizeResp);
+ rpc SetFetchSize(SetFetchSizeReq) returns (DirectStatusResp);
+ rpc AddBatch(AddBatchReq) returns (DirectStatusResp);
+ rpc ClearBatch(StatementHandle) returns (DirectStatusResp);
+ rpc ExecuteBatch(StatementHandle) returns (ExecuteBatchResp);
+ rpc GetResultSet(StatementHandle) returns (DirectStatusResp);
+ rpc GetGeneratedKeys(StatementHandle) returns (DirectStatusResp);
+ rpc IsClosed(StatementHandle) returns (IsClosedResp);
+ rpc SetPoolable(SetPoolableReq) returns (DirectStatusResp);
+ rpc IsPoolable(StatementHandle) returns (IsPoolableResp);
+ rpc CloseOnCompletion(StatementHandle) returns (DirectStatusResp);
+ rpc IsCloseOnCompletion(StatementHandle) returns (IsCloseOnCompletionResp);
+}
+
+enum ResultSetType {
+ TYPE_FORWARD_ONLY = 0; // 1003
+ TYPE_SCROLL_INSENSITIVE = 1; // 1004
+ TYPE_SCROLL_SENSITIVE = 2; // 1005
+}
+
+enum ResultSetConcurrency {
+ CONCUR_READ_ONLY = 0; // 1007
+ CONCUR_UPDATABLE = 1; // 1008
+}
+
+enum ResultSetHoldability {
+ HOLD_CURSORS_OVER_COMMIT = 0; // 1
+ CLOSE_CURSORS_AT_COMMIT = 1; // 2
+}
+
+enum AutoGeneratedKeys {
+ NO_GENERATED_KEYS = 0; // 1
+ RETURN_GENERATED_KEYS = 1; // 2
+}
+
+enum FetchDirection {
+ // The constant indicating that the rows in a result set will be
+ // processed in a forward direction; first-to-last.
+ FETCH_FORWARD = 0; // 1000
+ // The constant indicating that the rows in a result set will be
+ // processed in a reverse direction; last-to-first.
+ FETCH_REVERSE = 1; // 1001
+ FETCH_UNKNOWN = 2; // 1002
+}
+
+enum ResultSetCloseOperation {
+ CLOSE_CURRENT_RESULT = 0; // 1
+ KEEP_CURRENT_RESULT = 1; // 2
+ CLOSE_ALL_RESULTS = 2; // 3
+}
+
+message CreateStatementReq {
+ ConnectionHandle connection_id = 1;
+ // The statement id is optional in the request. If it's present, the server
+ // will try to reuse the existing statement with the same id.
+ StatementHandle statement_id = 2;
+ ResultSetType resultSetType = 3;
+ ResultSetConcurrency resultSetConcurrency = 4;
+ ResultSetHoldability resultSetHoldability = 5;
+}
+
+message ExecuteQueryReq {
+ StatementHandle statement_id = 1;
+ string sql = 2;
+
+ message ColumnIndexes {
+ repeated uint32 columnIndexes = 1;
+ }
+
+ message ColumnNames {
+ repeated string columnNames = 1;
+ }
+
+ oneof autoGeneratedKeys {
+ AutoGeneratedKeys autoGeneratedKey = 3;
+ ColumnNames columnNames = 4;
+ ColumnIndexes columnIndexes = 5;
+ }
+}
+
+message GetMaxFieldSizeResp {
+ Status status = 1;
+ uint32 max = 2;
+}
+
+message SetMaxFieldSizeReq {
+ StatementHandle statement_id = 1;
+ uint32 max = 2;
+}
+
+message GetMaxRowsResp {
+ Status status = 1;
+ uint32 max = 2;
+}
+
+message SetMaxRowsReq {
+ StatementHandle statement_id = 1;
+ uint32 max = 2;
+}
+
+message SetEscapeProcessingReq {
+ StatementHandle statement_id = 1;
+ bool enable = 2;
+}
+
+message GetQueryTimeoutResp {
+ Status status = 1;
+ uint32 timeout = 2;
+}
+
+message SetQueryTimeoutReq {
+ StatementHandle statement_id = 1;
+ uint32 timeout = 2;
+}
+
+message SetCursorNameReq {
+ StatementHandle statement_id = 1;
+ string name = 2;
+}
+
+message GetMoreResultsReq {
+ StatementHandle statement_id = 1;
+ ResultSetCloseOperation close_operation = 2;
+}
+
+message GetMoreResultsResp {
+ Status status = 1;
+ bool has_more_results = 2;
+}
+
+message SetFetchDirectionReq {
+ StatementHandle statement_id = 1;
+ FetchDirection direction = 2;
+}
+
+message GetFetchDirectionResp {
+ Status status = 1;
+ FetchDirection direction = 2;
+}
+
+message GetFetchSizeResp {
+ Status status = 1;
+ uint32 fetch_size = 2;
+}
+
+message SetFetchSizeReq {
+ StatementHandle statement_id = 1;
+ uint32 fetch_size = 2;
+}
+
+message GetResultSetConcurrencyResp {
+ Status status = 1;
+ ResultSetConcurrency concurrency = 2;
+}
+
+message GetResultSetHoldabilityResp {
+ Status status = 1;
+ ResultSetHoldability holdability = 2;
+}
+
+message GetResultSetTypeResp {
+ Status status = 1;
+ ResultSetType type = 2;
+}
+
+message AddBatchReq {
+ StatementHandle statement_id = 1;
+ string sql = 2;
+}
+
+message ClearBatchReq {
+ StatementHandle statement_id = 1;
+}
+
+message ExecuteBatchResp {
+ Status status = 1;
+ repeated uint64 update_counts = 2;
+}
+
+message IsClosedResp {
+ Status status = 1;
+ bool is_closed = 2;
+}
+
+message SetPoolableReq {
+ StatementHandle statement_id = 1;
+ bool poolable = 2;
+}
+
+message IsPoolableResp {
+ Status status = 1;
+ bool is_poolable = 2;
+}
+
+message IsCloseOnCompletionResp {
+ Status status = 1;
+ bool close_on_completion = 2;
+}
diff --git a/pom.xml b/pom.xml
index d2f3797..2f929fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.47.0:exe:${os.detected.classifier}</pluginArtifact>
<protocArtifact>com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier}</protocArtifact>
<protoSourceRoot>src/main/protobuf/</protoSourceRoot>
+ <pluginParameter>--escape_quotes</pluginParameter>
</configuration>
<executions>
<execution>