blob: be18616fe994b64483c1f68d511feae9e2b0bd5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.proto.ClientDataType;
import org.apache.ignite.client.proto.ClientErrorCode;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.client.proto.ClientOp;
import org.apache.ignite.client.proto.ProtocolVersion;
import org.apache.ignite.client.proto.ServerMessageType;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TupleBuilderImpl;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.buffer.ByteBufferInput;
import org.slf4j.Logger;
/**
* Handles messages from thin clients.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** Logger. */
private final Logger log;
/** API entry point. */
private final Ignite ignite;
/** Context. */
private ClientContext clientContext;
/**
* Constructor.
*
* @param ignite Ignite API entry point.
* @param log Logger.
*/
public ClientInboundMessageHandler(Ignite ignite, Logger log) {
assert ignite != null;
assert log != null;
this.ignite = ignite;
this.log = log;
}
/** {@inheritDoc} */
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
var buf = (ByteBuffer) msg;
var unpacker = getUnpacker(buf);
var packer = getPacker();
if (clientContext == null)
handshake(ctx, unpacker, packer);
else
processOperation(ctx, unpacker, packer);
}
private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer)
throws IOException {
try {
var clientVer = ProtocolVersion.unpack(unpacker);
if (!clientVer.equals(ProtocolVersion.LATEST_VER))
throw new IgniteException("Unsupported version: " +
clientVer.major() + "." + clientVer.minor() + "." + clientVer.patch());
var clientCode = unpacker.unpackInt();
var featuresLen = unpacker.unpackBinaryHeader();
var features = BitSet.valueOf(unpacker.readPayload(featuresLen));
clientContext = new ClientContext(clientVer, clientCode, features);
log.debug("Handshake: " + clientContext);
var extensionsLen = unpacker.unpackMapHeader();
unpacker.skipValue(extensionsLen);
// Response.
ProtocolVersion.LATEST_VER.pack(packer);
packer.packInt(ClientErrorCode.SUCCESS)
.packBinaryHeader(0) // Features.
.packMapHeader(0); // Extensions.
write(packer, ctx);
}
catch (Throwable t) {
packer = getPacker();
ProtocolVersion.LATEST_VER.pack(packer);
packer.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
write(packer, ctx);
}
}
private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
var buf = packer.toMessageBuffer().sliceAsByteBuffer();
ctx.writeAndFlush(buf);
}
private void writeError(int requestId, Throwable err, ChannelHandlerContext ctx) {
try {
assert err != null;
ClientMessagePacker packer = getPacker();
packer.packInt(ServerMessageType.RESPONSE);
packer.packInt(requestId);
packer.packInt(ClientErrorCode.FAILED);
String msg = err.getMessage();
if (msg == null)
msg = err.getClass().getName();
packer.packString(msg);
write(packer, ctx);
}
catch (Throwable t) {
exceptionCaught(ctx, t);
}
}
private ClientMessagePacker getPacker() {
return new ClientMessagePacker();
}
private ClientMessageUnpacker getUnpacker(ByteBuffer buf) {
return new ClientMessageUnpacker(new ByteBufferInput(buf));
}
private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) throws IOException {
var opCode = unpacker.unpackInt();
var requestId = unpacker.unpackInt();
packer.packInt(ServerMessageType.RESPONSE)
.packInt(requestId)
.packInt(ClientErrorCode.SUCCESS);
try {
var fut = processOperation(unpacker, packer, opCode);
if (fut == null) {
// Operation completed synchronously.
write(packer, ctx);
}
else {
fut.whenComplete((Object res, Object err) -> {
if (err != null)
writeError(requestId, (Throwable) err, ctx);
else
write(packer, ctx);
});
}
}
catch (Throwable t) {
writeError(requestId, t, ctx);
}
}
private CompletableFuture processOperation(ClientMessageUnpacker unpacker, ClientMessagePacker packer, int opCode)
throws IOException {
// TODO: Handle all operations asynchronously (add async table API).
switch (opCode) {
case ClientOp.TABLE_DROP: {
var tableName = unpacker.unpackString();
ignite.tables().dropTable(tableName);
break;
}
case ClientOp.TABLES_GET: {
List<Table> tables = ignite.tables().tables();
packer.packMapHeader(tables.size());
for (var table : tables) {
var tableImpl = (TableImpl) table;
packer.packUuid(tableImpl.tableId());
packer.packString(table.tableName());
}
break;
}
case ClientOp.SCHEMAS_GET: {
var table = readTable(unpacker);
if (unpacker.getNextFormat() == MessageFormat.NIL) {
// Return the latest schema.
packer.packMapHeader(1);
var schema = table.schemaView().schema();
if (schema == null)
throw new IgniteException("Schema registry is not initialized.");
writeSchema(packer, schema.version(), schema);
}
else {
var cnt = unpacker.unpackArrayHeader();
packer.packMapHeader(cnt);
for (var i = 0; i < cnt; i++) {
var schemaVer = unpacker.unpackInt();
var schema = table.schemaView().schema(schemaVer);
writeSchema(packer, schemaVer, schema);
}
}
break;
}
case ClientOp.TABLE_GET: {
String tableName = unpacker.unpackString();
Table table = ignite.tables().table(tableName);
if (table == null)
packer.packNil();
else
packer.packUuid(((TableImpl) table).tableId());
break;
}
case ClientOp.TUPLE_UPSERT: {
var table = readTable(unpacker);
var tuple = readTuple(unpacker, table, false);
return table.upsertAsync(tuple);
}
case ClientOp.TUPLE_UPSERT_SCHEMALESS: {
var table = readTable(unpacker);
var tuple = readTupleSchemaless(unpacker, table);
return table.upsertAsync(tuple);
}
case ClientOp.TUPLE_GET: {
var table = readTable(unpacker);
var keyTuple = readTuple(unpacker, table, true);
return table.getAsync(keyTuple).thenAccept(t -> writeTuple(packer, t));
}
default:
throw new IgniteException("Unexpected operation code: " + opCode);
}
return null;
}
private void writeSchema(ClientMessagePacker packer, int schemaVer, SchemaDescriptor schema) throws IOException {
packer.packInt(schemaVer);
if (schema == null) {
packer.packNil();
return;
}
var colCnt = schema.columnNames().size();
packer.packArrayHeader(colCnt);
for (var colIdx = 0; colIdx < colCnt; colIdx++) {
var col = schema.column(colIdx);
packer.packArrayHeader(4);
packer.packString(col.name());
packer.packInt(getClientDataType(col.type().spec()));
packer.packBoolean(schema.isKeyColumn(colIdx));
packer.packBoolean(col.nullable());
}
}
private void writeTuple(ClientMessagePacker packer, Tuple tuple) {
try {
if (tuple == null) {
packer.packNil();
return;
}
var schema = ((SchemaAware) tuple).schema();
packer.packInt(schema.version());
for (var col : schema.keyColumns().columns())
writeColumnValue(packer, tuple, col);
for (var col : schema.valueColumns().columns())
writeColumnValue(packer, tuple, col);
}
catch (Throwable t) {
throw new IgniteException("Failed to serialize tuple", t);
}
}
private Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
var schemaId = unpacker.unpackInt();
var schema = table.schemaView().schema(schemaId);
var builder = (TupleBuilderImpl) table.tupleBuilder();
var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
for (int i = 0; i < cnt; i++) {
if (unpacker.getNextFormat() == MessageFormat.NIL) {
unpacker.skipValue();
continue;
}
readAndSetColumnValue(unpacker, builder, schema.column(i));
}
return builder.build();
}
private Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
var cnt = unpacker.unpackMapHeader();
var builder = table.tupleBuilder();
for (int i = 0; i < cnt; i++) {
var colName = unpacker.unpackString();
builder.set(colName, unpacker.unpackValue());
}
return builder.build();
}
private TableImpl readTable(ClientMessageUnpacker unpacker) throws IOException {
var tableId = unpacker.unpackUuid();
return ((IgniteTablesInternal)ignite.tables()).table(tableId);
}
private void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilderImpl builder, Column col)
throws IOException {
builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
}
private static int getClientDataType(NativeTypeSpec spec) {
switch (spec) {
case INT8:
return ClientDataType.INT8;
case INT16:
return ClientDataType.INT16;
case INT32:
return ClientDataType.INT32;
case INT64:
return ClientDataType.INT64;
case FLOAT:
return ClientDataType.FLOAT;
case DOUBLE:
return ClientDataType.DOUBLE;
case DECIMAL:
return ClientDataType.DECIMAL;
case UUID:
return ClientDataType.UUID;
case STRING:
return ClientDataType.STRING;
case BYTES:
return ClientDataType.BYTES;
case BITMASK:
return ClientDataType.BITMASK;
}
throw new IgniteException("Unsupported native type: " + spec);
}
private void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
var val = tuple.value(col.name());
if (val == null) {
packer.packNil();
return;
}
switch (col.type().spec()) {
case INT8:
packer.packByte((byte) val);
break;
case INT16:
packer.packShort((short) val);
break;
case INT32:
packer.packInt((int) val);
break;
case INT64:
packer.packLong((long) val);
break;
case FLOAT:
packer.packFloat((float) val);
break;
case DOUBLE:
packer.packDouble((double) val);
break;
case DECIMAL:
packer.packDecimal((BigDecimal) val);
break;
case UUID:
packer.packUuid((UUID) val);
break;
case STRING:
packer.packString((String) val);
break;
case BYTES:
byte[] bytes = (byte[]) val;
packer.packBinaryHeader(bytes.length);
packer.writePayload(bytes);
break;
case BITMASK:
packer.packBitSet((BitSet)val);
break;
default:
throw new IgniteException("Data type not supported: " + col.type());
}
}
/** {@inheritDoc} */
@Override public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/** {@inheritDoc} */
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
ctx.close();
}
}