blob: 56b6636e296f0f0a542bdea70d7392017039dd85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.client;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientAsyncResponse;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryNextPageRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.security.SecurityException;
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.BITMAP_FEATURES;
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.PARTITION_AWARENESS;
/**
* Thin client request handler.
*/
public class ClientRequestHandler implements ClientListenerRequestHandler {
/** Timeout to wait for async requests completion, to handle them as regular sync requests. */
private static final long ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS = 10L;
/** Client context. */
private final ClientConnectionContext ctx;
/** Protocol context. */
private final ClientProtocolContext protocolCtx;
/** Logger. */
private final IgniteLogger log;
/**
* Constructor.
*
* @param ctx Kernal context.
* @param protocolCtx Protocol context.
*/
ClientRequestHandler(ClientConnectionContext ctx, ClientProtocolContext protocolCtx) {
assert ctx != null;
this.ctx = ctx;
this.protocolCtx = protocolCtx;
log = ctx.kernalContext().log(getClass());
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handle(ClientListenerRequest req) {
try {
if (req instanceof ClientTxAwareRequest) {
ClientTxAwareRequest req0 = (ClientTxAwareRequest)req;
if (req0.isTransactional()) {
int txId = req0.txId();
ClientTxContext txCtx = ctx.txContext(txId);
if (txCtx != null) {
try {
txCtx.acquire(true);
return handle0(req);
}
catch (IgniteCheckedException e) {
throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
}
finally {
try {
txCtx.release(true);
}
catch (Exception e) {
log.warning("Failed to release client transaction context", e);
}
}
}
}
}
return handle0(req);
}
catch (SecurityException ex) {
throw new IgniteClientException(
ClientStatus.SECURITY_VIOLATION,
"Client is not authorized to perform this operation",
ex
);
}
}
/** */
private ClientListenerResponse handle0(ClientListenerRequest req) {
ClientRequest req0 = (ClientRequest)req;
if (req0.isAsync(ctx)) {
IgniteInternalFuture<ClientResponse> fut = req0.processAsync(ctx);
try {
// Give request a chance to be executed and response processed by the current thread,
// so we can avoid any performance drops caused by async requests execution.
return fut.get(ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS);
}
catch (IgniteFutureTimeoutCheckedException ignored) {
return new ClientAsyncResponse(req0.requestId(), fut);
}
catch (IgniteCheckedException e) {
throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
}
}
else
return req0.process(ctx);
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handleException(Throwable e, ClientListenerRequest req) {
assert req != null;
assert e != null;
int status = getStatus(e);
String msg = e.getMessage();
if (req instanceof ClientCacheSqlQueryRequest ||
req instanceof ClientCacheSqlFieldsQueryRequest ||
req instanceof ClientCacheQueryNextPageRequest) {
String sqlState = IgniteQueryErrorCode.codeToSqlState(SqlListenerUtils.exceptionToSqlErrorCode(e));
msg = sqlState + ": " + msg;
}
if (ctx.kernalContext().clientListener().sendServerExceptionStackTraceToClient())
msg = msg + U.nl() + X.getFullStackTrace(e);
return new ClientResponse(req.requestId(), status, msg);
}
/** {@inheritDoc} */
@Override public void writeHandshake(BinaryWriterExImpl writer) {
writer.writeBoolean(true);
if (protocolCtx.isFeatureSupported(BITMAP_FEATURES))
writer.writeByteArray(protocolCtx.featureBytes());
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS))
writer.writeUuid(ctx.kernalContext().localNodeId());
}
/** {@inheritDoc} */
@Override public boolean isCancellationCommand(int cmdId) {
return false;
}
/** {@inheritDoc} */
@Override public boolean isCancellationSupported() {
return false;
}
/** {@inheritDoc} */
@Override public void registerRequest(long reqId, int cmdType) {
// No-op.
}
/** {@inheritDoc} */
@Override public void unregisterRequest(long reqId) {
// No-op.
}
/** {@inheritDoc} */
@Override public ClientListenerProtocolVersion protocolVersion() {
return protocolCtx.version();
}
/**
* Gets the status based on the provided exception.
*
* @param e Exception.
* @return Status code.
*/
private int getStatus(Throwable e) {
if (e instanceof IgniteClientException)
return ((IgniteClientException)e).statusCode();
if (e instanceof IgniteIllegalStateException) {
IgniteIllegalStateException ex = (IgniteIllegalStateException)e;
if (ex.getMessage().startsWith("Grid is in invalid state"))
return ClientStatus.INVALID_NODE_STATE;
}
if (e instanceof IllegalStateException) {
IllegalStateException ex = (IllegalStateException)e;
if (ex.getMessage().contains("grid is stopping"))
return ClientStatus.INVALID_NODE_STATE;
}
return ClientStatus.FAILED;
}
}