blob: 321c1734e01bc10a6825c336d931e3ee29feea28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Network.ADDRESS_UNRESOLVED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Network.PORT_IN_USE_ERR;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.client.handler.configuration.ClientConnectorView;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.ssl.SslContextProvider;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Client handler module maintains TCP endpoint for thin client connections.
*/
public class ClientHandlerModule implements IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(ClientHandlerModule.class);
/** Connection id generator.
* The resulting connection id is local to the current node and is intended for logging, diagnostics, and management purposes. */
private static final AtomicLong CONNECTION_ID_GEN = new AtomicLong();
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;
/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** Cluster ID supplier. */
private final Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier;
/** Metrics. */
private final ClientHandlerMetricSource metrics;
/** Metric manager. */
private final MetricManager metricManager;
/** Netty channel. */
@Nullable
private volatile Channel channel;
/** Processor. */
private final QueryProcessor queryProcessor;
/** Compute. */
private final IgniteComputeInternal igniteCompute;
/** Cluster. */
private final ClusterService clusterService;
/** Netty bootstrap factory. */
private final NettyBootstrapFactory bootstrapFactory;
private final AuthenticationManager authenticationManager;
private final ClockService clockService;
private final SchemaSyncService schemaSyncService;
private final CatalogService catalogService;
private final ClientPrimaryReplicaTracker primaryReplicaTracker;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean stopGuard = new AtomicBoolean();
private final ClientConnectorConfiguration clientConnectorConfiguration;
@TestOnly
@SuppressWarnings("unused")
private volatile ChannelHandler handler;
/**
* Constructor.
*
* @param queryProcessor Sql query processor.
* @param igniteTables Ignite.
* @param igniteTransactions Transactions.
* @param igniteCompute Compute.
* @param clusterService Cluster.
* @param bootstrapFactory Bootstrap factory.
* @param clusterTagSupplier ClusterTag supplier.
* @param metricManager Metric manager.
* @param authenticationManager Authentication manager.
* @param clockService Clock service.
* @param clientConnectorConfiguration Configuration of the connector.
* @param lowWatermark Low watermark.
*/
public ClientHandlerModule(
QueryProcessor queryProcessor,
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
IgniteComputeInternal igniteCompute,
ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory,
Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier,
MetricManager metricManager,
ClientHandlerMetricSource metrics,
AuthenticationManager authenticationManager,
ClockService clockService,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
PlacementDriver placementDriver,
ClientConnectorConfiguration clientConnectorConfiguration,
LowWatermark lowWatermark
) {
assert igniteTables != null;
assert queryProcessor != null;
assert igniteCompute != null;
assert clusterService != null;
assert bootstrapFactory != null;
assert clusterTagSupplier != null;
assert metricManager != null;
assert metrics != null;
assert authenticationManager != null;
assert clockService != null;
assert schemaSyncService != null;
assert catalogService != null;
assert placementDriver != null;
assert clientConnectorConfiguration != null;
assert lowWatermark != null;
this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.igniteCompute = igniteCompute;
this.clusterService = clusterService;
this.bootstrapFactory = bootstrapFactory;
this.clusterTagSupplier = clusterTagSupplier;
this.metricManager = metricManager;
this.metrics = metrics;
this.authenticationManager = authenticationManager;
this.clockService = clockService;
this.schemaSyncService = schemaSyncService;
this.catalogService = catalogService;
this.primaryReplicaTracker = new ClientPrimaryReplicaTracker(placementDriver, catalogService, clockService, schemaSyncService,
lowWatermark);
this.clientConnectorConfiguration = clientConnectorConfiguration;
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> startAsync() {
if (channel != null) {
throw new IgniteInternalException(INTERNAL_ERR, "ClientHandlerModule is already started.");
}
var configuration = clientConnectorConfiguration.value();
metricManager.registerSource(metrics);
if (configuration.metricsEnabled()) {
metrics.enable();
}
primaryReplicaTracker.start();
return startEndpoint(configuration).thenAccept(ch -> channel = ch);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> stopAsync() {
if (!stopGuard.compareAndSet(false, true)) {
return nullCompletedFuture();
}
busyLock.block();
metricManager.unregisterSource(metrics);
primaryReplicaTracker.stop();
var ch = channel;
if (ch != null) {
try {
ch.close().await();
} catch (InterruptedException e) {
return failedFuture(e);
}
channel = null;
}
return nullCompletedFuture();
}
/**
* Returns the local address where this handler is bound to.
*
* @return the local address of this module.
* @throws IgniteInternalException if the module is not started.
*/
public InetSocketAddress localAddress() {
var ch = channel;
if (ch == null) {
throw new IgniteInternalException(INTERNAL_ERR, "ClientHandlerModule has not been started");
}
return (InetSocketAddress) ch.localAddress();
}
/**
* Starts the endpoint.
*
* @param configuration Configuration.
* @return Channel future.
*/
private CompletableFuture<Channel> startEndpoint(ClientConnectorView configuration) {
ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
CompletableFuture<ClusterTag> clusterTag = clusterTagSupplier.get();
CompletableFuture<Channel> result = new CompletableFuture<>();
// Initialize SslContext once on startup to avoid initialization on each connection, and to fail in case of incorrect config.
SslContext sslContext = configuration.ssl().enabled() ? SslContextProvider.createServerSslContext(configuration.ssl()) : null;
bootstrap.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
if (!busyLock.enterBusy()) {
ch.close();
return;
}
try {
long connectionId = CONNECTION_ID_GEN.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("New client connection [connectionId=" + connectionId
+ ", remoteAddress=" + ch.remoteAddress() + ']');
}
if (configuration.idleTimeout() > 0) {
IdleStateHandler idleStateHandler = new IdleStateHandler(
configuration.idleTimeout(), 0, 0, TimeUnit.MILLISECONDS);
ch.pipeline().addLast(idleStateHandler);
ch.pipeline().addLast(new IdleChannelHandler(configuration.idleTimeout(), metrics, connectionId));
}
if (sslContext != null) {
ch.pipeline().addFirst("ssl", sslContext.newHandler(ch.alloc()));
}
ClientInboundMessageHandler messageHandler = createInboundMessageHandler(
configuration, clusterTag, connectionId);
//noinspection TestOnlyProblems
handler = messageHandler;
ch.pipeline().addLast(
new ClientMessageDecoder(),
messageHandler
);
metrics.connectionsInitiatedIncrement();
} finally {
busyLock.leaveBusy();
}
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
int port = configuration.port();
String address = configuration.listenAddress();
ChannelFuture channelFuture = address.isEmpty() ? bootstrap.bind(port) : bootstrap.bind(address, port);
channelFuture.addListener((ChannelFutureListener) bindFut -> {
if (bindFut.isSuccess()) {
if (LOG.isInfoEnabled()) {
LOG.info(
"Thin client connector endpoint started successfully [{}]",
(address.isEmpty() ? "" : "address=" + address + ",") + "port=" + port);
}
result.complete(bindFut.channel());
} else if (bindFut.cause() instanceof BindException) {
// TODO IGNITE-21614
result.completeExceptionally(
new IgniteException(
PORT_IN_USE_ERR,
"Cannot start thin client connector endpoint. Port " + port + " is in use.",
bindFut.cause())
);
} else if (bindFut.cause() instanceof UnresolvedAddressException) {
result.completeExceptionally(
new IgniteException(
ADDRESS_UNRESOLVED_ERR,
"Failed to start thin connector endpoint, unresolved socket address \"" + address + "\"",
bindFut.cause()
)
);
} else {
result.completeExceptionally(
new IgniteException(
INTERNAL_ERR,
"Failed to start thin client connector endpoint: " + bindFut.cause().getMessage(),
bindFut.cause()));
}
});
return result;
}
private ClientInboundMessageHandler createInboundMessageHandler(
ClientConnectorView configuration,
CompletableFuture<ClusterTag> clusterTag,
long connectionId) {
return new ClientInboundMessageHandler(
igniteTables,
igniteTransactions,
queryProcessor,
configuration,
igniteCompute,
clusterService,
clusterTag,
metrics,
authenticationManager,
clockService,
schemaSyncService,
catalogService,
connectionId,
primaryReplicaTracker
);
}
}