blob: d02c7a0f1622c514517eba175b46ef119194a906 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
/**
* Jdbc query event handler implementation.
*/
public class JdbcQueryCursorHandlerImpl extends JdbcHandlerBase implements JdbcQueryCursorHandler {
/**
* Constructor.
*
* @param resources Client resources.
*/
JdbcQueryCursorHandlerImpl(ClientResourceRegistry resources) {
super(resources);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcFetchQueryResultsRequest req) {
AsyncSqlCursor<InternalSqlRow> asyncSqlCursor;
try {
asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
} catch (IgniteInternalCheckedException e) {
StringWriter sw = getWriterWithStackTrace(e);
return CompletableFuture.completedFuture(new JdbcQueryFetchResult(Response.STATUS_FAILED,
"Failed to find query cursor [curId=" + req.cursorId() + "]. Error message:" + sw));
}
if (req.fetchSize() <= 0) {
return CompletableFuture.completedFuture(new JdbcQueryFetchResult(Response.STATUS_FAILED,
"Invalid fetch size [fetchSize=" + req.fetchSize() + ']'));
}
return asyncSqlCursor.requestNextAsync(req.fetchSize()).handle((batch, t) -> {
if (t != null) {
StringWriter sw = getWriterWithStackTrace(t);
return new JdbcQueryFetchResult(Response.STATUS_FAILED,
"Failed to fetch query results [curId=" + req.cursorId() + "]. Error message: " + sw);
}
List<ByteBuffer> rows = new ArrayList<>(batch.items().size());
for (InternalSqlRow item : batch.items()) {
rows.add(item.asBinaryTuple().byteBuffer());
}
return new JdbcQueryFetchResult(rows, !batch.hasMore());
}).toCompletableFuture();
}
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcQuerySingleResult> getMoreResultsAsync(JdbcFetchQueryResultsRequest req) {
AsyncSqlCursor<InternalSqlRow> asyncSqlCursor;
try {
asyncSqlCursor = resources.remove(req.cursorId()).get(AsyncSqlCursor.class);
} catch (IgniteInternalCheckedException e) {
StringWriter sw = getWriterWithStackTrace(e);
return CompletableFuture.completedFuture(new JdbcQuerySingleResult(Response.STATUS_FAILED,
"Failed to find query cursor [curId=" + req.cursorId() + "]. Error message:" + sw));
}
CompletableFuture<Void> cursorCloseFuture = asyncSqlCursor.closeAsync();
if (!asyncSqlCursor.hasNextResult()) {
// driver should check presence of next result set on client side and avoid unnecessary calls
return CompletableFuture.completedFuture(new JdbcQuerySingleResult(Response.STATUS_FAILED,
"Cursor doesn't have next result"));
}
return cursorCloseFuture.thenCompose(c -> asyncSqlCursor.nextResult())
.thenCompose(cur -> createJdbcResult(cur, req.fetchSize()))
.exceptionally(t -> {
iterateThroughResultsAndCloseThem(asyncSqlCursor);
String msgPrefix = "Failed to fetch query results [curId=" + req.cursorId() + "].";
return createErrorResult(msgPrefix, t, msgPrefix + " Error message: ");
});
}
private static void iterateThroughResultsAndCloseThem(AsyncSqlCursor<InternalSqlRow> cursor) {
Function<AsyncSqlCursor<InternalSqlRow>, CompletableFuture<AsyncSqlCursor<InternalSqlRow>>> traverser = new Function<>() {
@Override
public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> apply(AsyncSqlCursor<InternalSqlRow> cur) {
return cur.closeAsync()
.thenCompose(none -> {
if (cur.hasNextResult()) {
return cur.nextResult().thenCompose(this);
} else {
return CompletableFuture.completedFuture(cur);
}
});
}
};
CompletableFuture.completedFuture(cursor).thenCompose(traverser);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcQueryCloseResult> closeAsync(JdbcQueryCloseRequest req) {
AsyncSqlCursor<List<Object>> asyncSqlCursor;
try {
if (req.removeFromResources()) {
asyncSqlCursor = resources.remove(req.cursorId()).get(AsyncSqlCursor.class);
} else {
asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
}
} catch (IgniteInternalCheckedException e) {
StringWriter sw = getWriterWithStackTrace(e);
return CompletableFuture.completedFuture(new JdbcQueryCloseResult(Response.STATUS_FAILED,
"Failed to find query cursor [curId=" + req.cursorId() + "]. Error message:" + sw));
}
return asyncSqlCursor.closeAsync().handle((none, t) -> {
if (t != null) {
StringWriter sw = getWriterWithStackTrace(t);
return new JdbcQueryCloseResult(Response.STATUS_FAILED,
"Failed to close SQL query cursor [curId=" + req.cursorId() + "]. Error message: " + sw);
}
return new JdbcQueryCloseResult();
});
}
/**
* Serializes the stack trace of given exception for further sending to the client.
*
* @param t Throwable.
* @return StringWriter filled with exception.
*/
private StringWriter getWriterWithStackTrace(Throwable t) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
t.printStackTrace(pw);
return sw;
}
}