blob: 47173c8014120cde0d8dbc753e3e6103760f90ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCloseRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlColumnMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlFetchRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlPrimaryKeyMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlSchemasMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlTableMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.JdbcMetadataCatalog;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleContainsKeyRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndDeleteRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndReplaceRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndReplaceSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndUpsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndUpsertSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertAllSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertSchemalessRequest;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.internal.client.proto.ClientErrorCode;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.internal.processors.query.calcite.QueryProcessor;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.table.manager.IgniteTables;
/**
* Handles messages from thin clients.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ClientInboundMessageHandler.class);
/** Ignite tables API. */
private final IgniteTables igniteTables;
/** Context. */
private ClientContext clientContext;
/** Handler. */
private final JdbcQueryEventHandler handler;
/**
* Constructor.
*
* @param igniteTables Ignite tables API entry point.
* @param processor Sql query processor.
*/
public ClientInboundMessageHandler(IgniteTables igniteTables,
QueryProcessor processor) {
assert igniteTables != null;
this.igniteTables = igniteTables;
this.handler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables));
}
/** {@inheritDoc} */
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Each inbound handler in a pipeline has to release the received messages.
try (var unpacker = getUnpacker((ByteBuf) msg)) {
// Packer buffer is released by Netty on send, or by inner exception handlers below.
var packer = getPacker(ctx.alloc());
if (clientContext == null)
handshake(ctx, unpacker, packer);
else
processOperation(ctx, unpacker, packer);
}
}
private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) {
try {
writeMagic(ctx);
var clientVer = ProtocolVersion.unpack(unpacker);
if (!clientVer.equals(ProtocolVersion.LATEST_VER))
throw new IgniteException("Unsupported version: " +
clientVer.major() + "." + clientVer.minor() + "." + clientVer.patch());
var clientCode = unpacker.unpackInt();
var featuresLen = unpacker.unpackBinaryHeader();
var features = BitSet.valueOf(unpacker.readPayload(featuresLen));
clientContext = new ClientContext(clientVer, clientCode, features);
LOG.debug("Handshake: " + clientContext);
var extensionsLen = unpacker.unpackMapHeader();
unpacker.skipValue(extensionsLen);
// Response.
ProtocolVersion.LATEST_VER.pack(packer);
packer.packInt(ClientErrorCode.SUCCESS)
.packBinaryHeader(0) // Features.
.packMapHeader(0); // Extensions.
write(packer, ctx);
}
catch (Throwable t) {
packer.close();
var errPacker = getPacker(ctx.alloc());
try {
ProtocolVersion.LATEST_VER.pack(errPacker);
String message = t.getMessage();
if (message == null)
message = t.getClass().getName();
errPacker.packInt(ClientErrorCode.FAILED).packString(message);
write(errPacker, ctx);
}
catch (Throwable t2) {
errPacker.close();
exceptionCaught(ctx, t2);
}
}
}
private void writeMagic(ChannelHandlerContext ctx) {
ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
}
private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
var buf = packer.getBuffer();
// writeAndFlush releases pooled buffer.
ctx.writeAndFlush(buf);
}
private void writeError(long requestId, Throwable err, ChannelHandlerContext ctx) {
var packer = getPacker(ctx.alloc());
try {
assert err != null;
packer.packInt(ServerMessageType.RESPONSE);
packer.packLong(requestId);
packer.packInt(ClientErrorCode.FAILED);
String msg = err.getMessage();
if (msg == null)
msg = err.getClass().getName();
packer.packString(msg);
write(packer, ctx);
}
catch (Throwable t) {
packer.close();
exceptionCaught(ctx, t);
}
}
private ClientMessagePacker getPacker(ByteBufAllocator alloc) {
// Outgoing messages are released on write.
return new ClientMessagePacker(alloc.buffer());
}
private ClientMessageUnpacker getUnpacker(ByteBuf buf) {
return new ClientMessageUnpacker(buf);
}
private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker in, ClientMessagePacker out) {
long requestId = -1;
try {
var opCode = in.unpackInt();
requestId = in.unpackLong();
out.packInt(ServerMessageType.RESPONSE)
.packLong(requestId)
.packInt(ClientErrorCode.SUCCESS);
var fut = processOperation(in, out, opCode);
if (fut == null) {
// Operation completed synchronously.
write(out, ctx);
}
else {
var reqId = requestId;
fut.whenComplete((Object res, Object err) -> {
if (err != null) {
out.close();
writeError(reqId, (Throwable) err, ctx);
} else
write(out, ctx);
});
}
}
catch (Throwable t) {
out.close();
writeError(requestId, t, ctx);
}
}
private CompletableFuture processOperation(
ClientMessageUnpacker in,
ClientMessagePacker out,
int opCode
) {
switch (opCode) {
case ClientOp.TABLE_DROP:
return ClientTableDropRequest.process(in, igniteTables);
case ClientOp.TABLES_GET:
return ClientTablesGetRequest.process(out, igniteTables);
case ClientOp.SCHEMAS_GET:
return ClientSchemasGetRequest.process(in, out, igniteTables);
case ClientOp.TABLE_GET:
return ClientTableGetRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(in, igniteTables);
case ClientOp.TUPLE_UPSERT_SCHEMALESS:
return ClientTupleUpsertSchemalessRequest.process(in, igniteTables);
case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, igniteTables);
case ClientOp.TUPLE_UPSERT_ALL_SCHEMALESS:
return ClientTupleUpsertAllSchemalessRequest.process(in, igniteTables);
case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_UPSERT_SCHEMALESS:
return ClientTupleGetAndUpsertSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_SCHEMALESS:
return ClientTupleInsertSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_ALL_SCHEMALESS:
return ClientTupleInsertAllSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_SCHEMALESS:
return ClientTupleReplaceSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_EXACT_SCHEMALESS:
return ClientTupleReplaceExactSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_REPLACE_SCHEMALESS:
return ClientTupleGetAndReplaceSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, out, igniteTables);
case ClientOp.SQL_EXEC:
return ClientSqlExecuteRequest.execute(in, out, handler);
case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(in, out, handler);
case ClientOp.SQL_NEXT:
return ClientSqlFetchRequest.process(in, out, handler);
case ClientOp.SQL_CURSOR_CLOSE:
return ClientSqlCloseRequest.process(in, out, handler);
case ClientOp.SQL_TABLE_META:
return ClientSqlTableMetadataRequest.process(in, out, handler);
case ClientOp.SQL_COLUMN_META:
return ClientSqlColumnMetadataRequest.process(in, out, handler);
case ClientOp.SQL_SCHEMAS_META:
return ClientSqlSchemasMetadataRequest.process(in, out, handler);
case ClientOp.SQL_PK_META:
return ClientSqlPrimaryKeyMetadataRequest.process(in, out, handler);
default:
throw new IgniteException("Unexpected operation code: " + opCode);
}
}
/** {@inheritDoc} */
@Override public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/** {@inheritDoc} */
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error(cause.getMessage(), cause);
ctx.close();
}
}