blob: 5669f4d285e37cc9c216f6e558c2903c0955a38c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Client.HANDSHAKE_HEADER_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_COMPATIBILITY_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.apache.ignite.client.handler.configuration.ClientConnectorView;
import org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeCancelRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeChangePriorityRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcCloseRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcColumnMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcConnectRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcExecuteBatchRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcExecuteRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcFetchRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcFinishTxRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcHasMoreRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcPreparedStmntBatchRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcPrimaryKeyMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcQueryMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcSchemasMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcTableMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorCloseRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteScriptRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlQueryMetadataRequest;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleContainsKeyRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndDeleteRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndReplaceRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndUpsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleInsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ErrorExtensions;
import org.apache.ignite.internal.client.proto.HandshakeExtension;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.schema.SchemaVersionMismatchException;
import org.apache.ignite.internal.security.authentication.AnonymousRequest;
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.security.authentication.AuthenticationRequest;
import org.apache.ignite.internal.security.authentication.UserDetails;
import org.apache.ignite.internal.security.authentication.UsernamePasswordRequest;
import org.apache.ignite.internal.security.authentication.event.AuthenticationEvent;
import org.apache.ignite.internal.security.authentication.event.AuthenticationEventParameters;
import org.apache.ignite.internal.security.authentication.event.AuthenticationProviderEventParameters;
import org.apache.ignite.internal.security.authentication.event.UserEventParameters;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.security.AuthenticationType;
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.jetbrains.annotations.Nullable;
/**
* Handles messages from thin clients.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter implements EventListener<AuthenticationEventParameters> {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(ClientInboundMessageHandler.class);
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;
/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** JDBC Handler. */
private final JdbcQueryEventHandler jdbcQueryEventHandler;
/** Connection resources. */
private final ClientResourceRegistry resources = new ClientResourceRegistry();
/** Configuration. */
private final ClientConnectorView configuration;
/** Compute. */
private final IgniteComputeInternal compute;
/** Cluster. */
private final ClusterService clusterService;
/** Query processor. */
private final QueryProcessor queryProcessor;
/** SQL query cursor handler. */
private final JdbcQueryCursorHandler jdbcQueryCursorHandler;
/** Cluster ID. */
private final CompletableFuture<ClusterTag> clusterTag;
/** Metrics. */
private final ClientHandlerMetricSource metrics;
private final ClockService clockService;
/** Context. */
private ClientContext clientContext;
/** Read-write lock. Protects {@link #clientContext}. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/** Chanel handler context. */
private volatile ChannelHandlerContext channelHandlerContext;
/** Primary replicas update counter. */
private final AtomicLong primaryReplicaMaxStartTime;
private final ClientPrimaryReplicaTracker primaryReplicaTracker;
/** Authentication manager. */
private final AuthenticationManager authenticationManager;
private final SchemaVersions schemaVersions;
private final long connectionId;
/**
* Constructor.
*
* @param igniteTables Ignite tables API entry point.
* @param igniteTransactions Ignite transactions API.
* @param processor Sql query processor.
* @param configuration Configuration.
* @param compute Compute.
* @param clusterService Cluster.
* @param clusterTag Cluster tag.
* @param metrics Metrics.
* @param authenticationManager Authentication manager.
* @param clockService Clock service.
*/
public ClientInboundMessageHandler(
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
QueryProcessor processor,
ClientConnectorView configuration,
IgniteComputeInternal compute,
ClusterService clusterService,
CompletableFuture<ClusterTag> clusterTag,
ClientHandlerMetricSource metrics,
AuthenticationManager authenticationManager,
ClockService clockService,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
long connectionId,
ClientPrimaryReplicaTracker primaryReplicaTracker
) {
assert igniteTables != null;
assert igniteTransactions != null;
assert processor != null;
assert configuration != null;
assert compute != null;
assert clusterService != null;
assert clusterTag != null;
assert metrics != null;
assert authenticationManager != null;
assert clockService != null;
assert schemaSyncService != null;
assert catalogService != null;
assert primaryReplicaTracker != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.configuration = configuration;
this.compute = compute;
this.clusterService = clusterService;
this.queryProcessor = processor;
this.clusterTag = clusterTag;
this.metrics = metrics;
this.authenticationManager = authenticationManager;
this.clockService = clockService;
this.primaryReplicaTracker = primaryReplicaTracker;
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(
processor,
new JdbcMetadataCatalog(clockService, schemaSyncService, catalogService),
resources,
igniteTransactions
);
schemaVersions = new SchemaVersionsImpl(schemaSyncService, catalogService, clockService);
this.connectionId = connectionId;
this.primaryReplicaMaxStartTime = new AtomicLong(HybridTimestamp.MIN_VALUE.longValue());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
authenticationEventsToSubscribe().forEach(event -> {
authenticationManager.listen(event, this);
});
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
authenticationEventsToSubscribe().forEach(event -> {
authenticationManager.removeListener(event, this);
});
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
channelHandlerContext = ctx;
super.channelRegistered(ctx);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection registered [connectionId=" + connectionId + ", remoteAddress=" + ctx.channel().remoteAddress() + "]");
}
}
/** {@inheritDoc} */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
// Each inbound handler in a pipeline has to release the received messages.
var unpacker = getUnpacker(byteBuf);
metrics.bytesReceivedAdd(byteBuf.readableBytes() + ClientMessageCommon.HEADER_SIZE);
// Packer buffer is released by Netty on send, or by inner exception handlers below.
var packer = getPacker(ctx.alloc());
if (clientContext == null) {
metrics.bytesReceivedAdd(ClientMessageCommon.MAGIC_BYTES.length);
handshake(ctx, unpacker, packer);
} else {
processOperation(ctx, unpacker, packer);
}
}
/** {@inheritDoc} */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
resources.close();
super.channelInactive(ctx);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection closed [connectionId=" + connectionId + ", remoteAddress=" + ctx.channel().remoteAddress() + "]");
}
}
private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) {
try {
writeMagic(ctx);
var clientVer = ProtocolVersion.unpack(unpacker);
if (!clientVer.equals(ProtocolVersion.LATEST_VER)) {
throw new IgniteException(PROTOCOL_COMPATIBILITY_ERR, "Unsupported version: "
+ clientVer.major() + "." + clientVer.minor() + "." + clientVer.patch());
}
var clientCode = unpacker.unpackInt();
var featuresLen = unpacker.unpackBinaryHeader();
var features = BitSet.valueOf(unpacker.readPayload(featuresLen));
Map<HandshakeExtension, Object> extensions = extractExtensions(unpacker);
// It's necessary to perform authentication and update the client context while holding a write lock.
// This prevents a race condition where authentication succeeds but the context isn't updated in time.
// In such a scenario, we might receive an authentication event and attempt to close the connection,
// but fail because the context is still null.
readWriteLock.writeLock().lock();
try {
AuthenticationRequest<?, ?> authenticationRequest = createAuthenticationRequest(extensions);
UserDetails userDetails = authenticationManager.authenticate(authenticationRequest);
clientContext = new ClientContext(clientVer, clientCode, features, userDetails);
} finally {
readWriteLock.writeLock().unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Handshake [connectionId=" + connectionId + ", remoteAddress=" + ctx.channel().remoteAddress() + "]: "
+ clientContext);
}
// Response.
ProtocolVersion.LATEST_VER.pack(packer);
packer.packNil(); // No error.
packer.packLong(configuration.idleTimeout());
ClusterNode localMember = clusterService.topologyService().localMember();
packer.packString(localMember.id());
packer.packString(localMember.name());
ClusterTag tag = clusterTag.join();
packer.packUuid(tag.clusterId());
packer.packString(tag.clusterName());
packer.packLong(observableTimestamp(null));
// Pack current version
packer.packByte(IgniteProductVersion.CURRENT_VERSION.major());
packer.packByte(IgniteProductVersion.CURRENT_VERSION.minor());
packer.packByte(IgniteProductVersion.CURRENT_VERSION.maintenance());
packer.packByteNullable(IgniteProductVersion.CURRENT_VERSION.patch());
packer.packStringNullable(IgniteProductVersion.CURRENT_VERSION.preRelease());
packer.packBinaryHeader(0); // Features.
packer.packInt(0); // Extensions.
write(packer, ctx);
metrics.sessionsAcceptedIncrement();
metrics.sessionsActiveIncrement();
ctx.channel().closeFuture().addListener(f -> metrics.sessionsActiveDecrement());
} catch (Throwable t) {
LOG.warn("Handshake failed [connectionId=" + connectionId + ", remoteAddress=" + ctx.channel().remoteAddress() + "]: "
+ t.getMessage(), t);
packer.close();
var errPacker = getPacker(ctx.alloc());
try {
ProtocolVersion.LATEST_VER.pack(errPacker);
writeErrorCore(t, errPacker);
write(errPacker, ctx);
} catch (Throwable t2) {
LOG.warn("Handshake failed [connectionId=" + connectionId + ", remoteAddress=" + ctx.channel().remoteAddress() + "]: "
+ t2.getMessage(), t2);
errPacker.close();
exceptionCaught(ctx, t2);
}
metrics.sessionsRejectedIncrement();
} finally {
unpacker.close();
}
}
private static AuthenticationRequest<?, ?> createAuthenticationRequest(Map<HandshakeExtension, Object> extensions) {
Object authnType = extensions.get(HandshakeExtension.AUTHENTICATION_TYPE);
if (authnType == null) {
return new AnonymousRequest();
}
if (authnType instanceof String && AuthenticationType.BASIC.name().equalsIgnoreCase((String) authnType)) {
return new UsernamePasswordRequest(
(String) extensions.get(HandshakeExtension.AUTHENTICATION_IDENTITY),
(String) extensions.get(HandshakeExtension.AUTHENTICATION_SECRET));
}
throw new UnsupportedAuthenticationTypeException("Unsupported authentication type: " + authnType);
}
private void writeMagic(ChannelHandlerContext ctx) {
ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
metrics.bytesSentAdd(ClientMessageCommon.MAGIC_BYTES.length);
}
private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
var buf = packer.getBuffer();
int bytes = buf.readableBytes();
try {
// writeAndFlush releases pooled buffer.
ctx.writeAndFlush(buf);
} catch (Throwable t) {
buf.release();
throw t;
}
metrics.bytesSentAdd(bytes);
}
private void writeResponseHeader(
ClientMessagePacker packer, long requestId, ChannelHandlerContext ctx, boolean isNotification, boolean isError) {
packer.packLong(requestId);
writeFlags(packer, ctx, isNotification, isError);
// Include server timestamp in error and notification responses as well:
// an operation can modify data and then throw an exception (e.g. Compute task),
// so we still need to update client-side timestamp to preserve causality guarantees.
packer.packLong(observableTimestamp(null));
}
private void writeError(long requestId, int opCode, Throwable err, ChannelHandlerContext ctx, boolean isNotification) {
if (isNotification) {
LOG.warn("Error processing client notification [connectionId=" + connectionId + ", id=" + requestId
+ ", remoteAddress=" + ctx.channel().remoteAddress() + "]:" + err.getMessage(), err);
} else {
LOG.warn("Error processing client request [connectionId=" + connectionId + ", id=" + requestId + ", op=" + opCode
+ ", remoteAddress=" + ctx.channel().remoteAddress() + "]:" + err.getMessage(), err);
}
ClientMessagePacker packer = getPacker(ctx.alloc());
try {
assert err != null;
writeResponseHeader(packer, requestId, ctx, isNotification, true);
writeErrorCore(err, packer);
write(packer, ctx);
} catch (Throwable t) {
packer.close();
exceptionCaught(ctx, t);
}
}
private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
SchemaVersionMismatchException schemaVersionMismatchException = schemaVersionMismatchException(err);
err = schemaVersionMismatchException == null ? ExceptionUtils.unwrapCause(err) : schemaVersionMismatchException;
// Trace ID and error code.
if (err instanceof TraceableException) {
TraceableException iex = (TraceableException) err;
packer.packUuid(iex.traceId());
packer.packInt(iex.code());
} else {
packer.packUuid(UUID.randomUUID());
packer.packInt(INTERNAL_ERR);
}
// No need to send internal errors to client.
Throwable pubErr = IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(err));
// Class name and message.
packer.packString(pubErr.getClass().getName());
packer.packString(pubErr.getMessage());
// Stack trace.
if (configuration.sendServerExceptionStackTraceToClient()) {
packer.packString(ExceptionUtils.getFullStackTrace(pubErr));
} else {
packer.packNil();
}
// Extensions.
if (schemaVersionMismatchException != null) {
packer.packInt(1);
packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
packer.packInt(schemaVersionMismatchException.expectedVersion());
} else {
packer.packNil(); // No extensions.
}
}
private static ClientMessagePacker getPacker(ByteBufAllocator alloc) {
// Outgoing messages are released on write.
return new ClientMessagePacker(alloc.buffer());
}
private static ClientMessageUnpacker getUnpacker(ByteBuf buf) {
return new ClientMessageUnpacker(buf);
}
private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker in, ClientMessagePacker out) {
long requestId = -1;
int opCode = -1;
metrics.requestsActiveIncrement();
try {
opCode = in.unpackInt();
requestId = in.unpackLong();
if (LOG.isTraceEnabled()) {
LOG.trace("Client request started [id=" + requestId + ", op=" + opCode
+ ", remoteAddress=" + ctx.channel().remoteAddress() + "]");
}
out.packLong(requestId);
writeFlags(out, ctx, false, false);
// Observable timestamp should be calculated after the operation is processed; reserve space, write later.
int observableTimestampIdx = out.reserveLong();
CompletableFuture fut = processOperation(in, out, opCode, requestId);
if (fut == null) {
// Operation completed synchronously.
in.close();
out.setLong(observableTimestampIdx, observableTimestamp(out));
write(out, ctx);
if (LOG.isTraceEnabled()) {
LOG.trace("Client request processed synchronously [id=" + requestId + ", op=" + opCode
+ ", remoteAddress=" + ctx.channel().remoteAddress() + "]");
}
metrics.requestsProcessedIncrement();
metrics.requestsActiveDecrement();
} else {
var reqId = requestId;
var op = opCode;
fut.whenComplete((Object res, Object err) -> {
in.close();
metrics.requestsActiveDecrement();
if (err != null) {
out.close();
writeError(reqId, op, (Throwable) err, ctx, false);
metrics.requestsFailedIncrement();
} else {
out.setLong(observableTimestampIdx, observableTimestamp(out));
write(out, ctx);
metrics.requestsProcessedIncrement();
if (LOG.isTraceEnabled()) {
LOG.trace("Client request processed [id=" + reqId + ", op=" + op
+ ", remoteAddress=" + ctx.channel().remoteAddress() + "]");
}
}
});
}
} catch (Throwable t) {
in.close();
out.close();
writeError(requestId, opCode, t, ctx, false);
metrics.requestsFailedIncrement();
}
}
private @Nullable CompletableFuture processOperation(
ClientMessageUnpacker in,
ClientMessagePacker out,
int opCode,
long requestId
) throws IgniteInternalCheckedException {
switch (opCode) {
case ClientOp.HEARTBEAT:
return null;
case ClientOp.TABLES_GET:
return ClientTablesGetRequest.process(out, igniteTables);
case ClientOp.SCHEMAS_GET:
return ClientSchemasGetRequest.process(in, out, igniteTables, schemaVersions);
case ClientOp.TABLE_GET:
return ClientTableGetRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources);
case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources);
case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_EXEC:
return ClientJdbcExecuteRequest.execute(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_EXEC_BATCH:
return ClientJdbcExecuteBatchRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_SQL_EXEC_PS_BATCH:
return ClientJdbcPreparedStmntBatchRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_NEXT:
return ClientJdbcFetchRequest.process(in, out, jdbcQueryCursorHandler);
case ClientOp.JDBC_MORE_RESULTS:
return ClientJdbcHasMoreRequest.process(in, out, jdbcQueryCursorHandler);
case ClientOp.JDBC_CURSOR_CLOSE:
return ClientJdbcCloseRequest.process(in, out, jdbcQueryCursorHandler);
case ClientOp.JDBC_TABLE_META:
return ClientJdbcTableMetadataRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_COLUMN_META:
return ClientJdbcColumnMetadataRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_SCHEMAS_META:
return ClientJdbcSchemasMetadataRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_PK_META:
return ClientJdbcPrimaryKeyMetadataRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.JDBC_QUERY_META:
return ClientJdbcQueryMetadataRequest.process(in, out, jdbcQueryCursorHandler);
case ClientOp.TX_BEGIN:
return ClientTransactionBeginRequest.process(in, out, igniteTransactions, resources, metrics);
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources, metrics);
case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics);
case ClientOp.COMPUTE_EXECUTE:
return ClientComputeExecuteRequest.process(in, out, compute, clusterService, notificationSender(requestId));
case ClientOp.COMPUTE_EXECUTE_COLOCATED:
return ClientComputeExecuteColocatedRequest.process(
in,
out,
compute,
igniteTables,
clusterService,
notificationSender(requestId)
);
case ClientOp.COMPUTE_GET_STATUS:
return ClientComputeGetStatusRequest.process(in, out, compute);
case ClientOp.COMPUTE_CANCEL:
return ClientComputeCancelRequest.process(in, out, compute);
case ClientOp.COMPUTE_CHANGE_PRIORITY:
return ClientComputeChangePriorityRequest.process(in, out, compute);
case ClientOp.CLUSTER_GET_NODES:
return ClientClusterGetNodesRequest.process(out, clusterService);
case ClientOp.SQL_EXEC:
return ClientSqlExecuteRequest.process(in, out, queryProcessor, resources, metrics, igniteTransactions);
case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientSqlCursorNextPageRequest.process(in, out, resources, igniteTransactions);
case ClientOp.SQL_CURSOR_CLOSE:
return ClientSqlCursorCloseRequest.process(in, out, resources, igniteTransactions);
case ClientOp.PARTITION_ASSIGNMENT_GET:
return ClientTablePartitionPrimaryReplicasGetRequest.process(in, out, primaryReplicaTracker);
case ClientOp.JDBC_TX_FINISH:
return ClientJdbcFinishTxRequest.process(in, out, jdbcQueryEventHandler);
case ClientOp.SQL_EXEC_SCRIPT:
return ClientSqlExecuteScriptRequest.process(in, queryProcessor, igniteTransactions);
case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(in, out, queryProcessor, resources);
case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(in, out, queryProcessor, resources, igniteTransactions);
case ClientOp.STREAMER_BATCH_SEND:
return ClientStreamerBatchSendRequest.process(in, out, igniteTables);
default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation code: " + opCode);
}
}
private void writeFlags(ClientMessagePacker out, ChannelHandlerContext ctx, boolean isNotification, boolean isError) {
// Notify the client about primary replica change that happened for ANY table since the last request.
// We can't assume that the client only uses uses a particular table (e.g. the one present in the replica tracker), because
// the client can be connected to multiple nodes.
long lastSentMaxStartTime = primaryReplicaMaxStartTime.get();
long currentMaxStartTime = primaryReplicaTracker.maxStartTime();
boolean primaryReplicasUpdated = currentMaxStartTime > lastSentMaxStartTime
&& primaryReplicaMaxStartTime.compareAndSet(lastSentMaxStartTime, currentMaxStartTime);
if (primaryReplicasUpdated && LOG.isInfoEnabled()) {
LOG.info("Partition primary replica changed, notifying client [connectionId=" + connectionId + ", remoteAddress="
+ ctx.channel().remoteAddress() + ']');
}
int flags = ResponseFlags.getFlags(primaryReplicasUpdated, isNotification, isError);
out.packInt(flags);
if (primaryReplicasUpdated) {
out.packLong(currentMaxStartTime);
}
}
/** {@inheritDoc} */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/** {@inheritDoc} */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof SSLException || cause.getCause() instanceof SSLException) {
metrics.sessionsRejectedTlsIncrement();
}
if (cause instanceof DecoderException && cause.getCause() instanceof IgniteException) {
var err = (IgniteException) cause.getCause();
if (err.code() == HANDSHAKE_HEADER_ERR) {
metrics.sessionsRejectedIncrement();
}
}
LOG.warn("Exception in client connector pipeline [connectionId=" + connectionId + ", remoteAddress="
+ ctx.channel().remoteAddress() + "]: " + cause.getMessage(), cause);
ctx.close();
}
private static Map<HandshakeExtension, Object> extractExtensions(ClientMessageUnpacker unpacker) {
EnumMap<HandshakeExtension, Object> extensions = new EnumMap<>(HandshakeExtension.class);
int mapSize = unpacker.unpackInt();
for (int i = 0; i < mapSize; i++) {
HandshakeExtension handshakeExtension = HandshakeExtension.fromKey(unpacker.unpackString());
if (handshakeExtension != null) {
extensions.put(handshakeExtension, unpackExtensionValue(handshakeExtension, unpacker));
}
}
return extensions;
}
private static Object unpackExtensionValue(HandshakeExtension handshakeExtension, ClientMessageUnpacker unpacker) {
Class<?> type = handshakeExtension.valueType();
if (type == String.class) {
return unpacker.unpackString();
} else {
throw new IllegalArgumentException("Unsupported extension type: " + type.getName());
}
}
private static @Nullable SchemaVersionMismatchException schemaVersionMismatchException(Throwable e) {
while (e != null) {
if (e instanceof SchemaVersionMismatchException) {
return (SchemaVersionMismatchException) e;
}
e = e.getCause();
}
return null;
}
private long observableTimestamp(@Nullable ClientMessagePacker out) {
// Certain operations can override the timestamp and provide it in the meta object.
if (out != null) {
Object meta = out.meta();
if (meta instanceof HybridTimestamp) {
return ((HybridTimestamp) meta).longValue();
}
}
return clockService.nowLong();
}
private void sendNotification(long requestId, @Nullable Consumer<ClientMessagePacker> writer, @Nullable Throwable err) {
if (err != null) {
writeError(requestId, -1, err, channelHandlerContext, true);
return;
}
var packer = getPacker(channelHandlerContext.alloc());
try {
writeResponseHeader(packer, requestId, channelHandlerContext, true, false);
if (writer != null) {
writer.accept(packer);
}
write(packer, channelHandlerContext);
} catch (Throwable t) {
packer.close();
exceptionCaught(channelHandlerContext, t);
}
}
private NotificationSender notificationSender(long requestId) {
// Notification can be sent before the response to the current request.
// This is fine, because the client registers a listener before sending the request.
return (writer, err) -> sendNotification(requestId, writer, err);
}
@Override
public CompletableFuture<Boolean> notify(AuthenticationEventParameters parameters) {
if (shouldCloseConnection(parameters)) {
LOG.warn("Closing connection due to authentication event [connectionId=" + connectionId + ", remoteAddress="
+ channelHandlerContext.channel().remoteAddress() + ", event=" + parameters.type() + ']');
closeConnection();
}
return falseCompletedFuture();
}
private boolean shouldCloseConnection(AuthenticationEventParameters parameters) {
switch (parameters.type()) {
case AUTHENTICATION_ENABLED:
return true;
case AUTHENTICATION_PROVIDER_REMOVED:
case AUTHENTICATION_PROVIDER_UPDATED:
return currentUserAffected((AuthenticationProviderEventParameters) parameters);
case USER_REMOVED:
case USER_UPDATED:
return currentUserAffected((UserEventParameters) parameters);
default:
return false;
}
}
private boolean currentUserAffected(AuthenticationProviderEventParameters parameters) {
readWriteLock.readLock().lock();
try {
return clientContext != null && clientContext.userDetails().providerName().equals(parameters.name());
} finally {
readWriteLock.readLock().unlock();
}
}
private boolean currentUserAffected(UserEventParameters parameters) {
return clientContext != null
&& clientContext.userDetails().providerName().equals(parameters.providerName())
&& clientContext.userDetails().username().equals(parameters.username());
}
private void closeConnection() {
if (channelHandlerContext != null) {
channelHandlerContext.close();
}
}
private static Set<AuthenticationEvent> authenticationEventsToSubscribe() {
return Set.of(
AuthenticationEvent.AUTHENTICATION_ENABLED,
AuthenticationEvent.AUTHENTICATION_PROVIDER_UPDATED,
AuthenticationEvent.AUTHENTICATION_PROVIDER_REMOVED,
AuthenticationEvent.USER_UPDATED,
AuthenticationEvent.USER_REMOVED
);
}
}