blob: 095ecfd3a1caedc61a0fddd6f3ffa8cfe79441e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import io.netty.util.ResourceLeakDetector;
import java.io.IOError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeCompute;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeInternalTable;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.client.handler.DummyAuthenticationManager;
import org.apache.ignite.client.handler.FakeCatalogService;
import org.apache.ignite.client.handler.FakePlacementDriver;
import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.ClientClusterNode;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.mockito.Mockito;
/**
* Test server.
*/
public class TestServer implements AutoCloseable {
private final ConfigurationTreeGenerator generator;
private final ConfigurationRegistry cfg;
private final IgniteComponent module;
private final NettyBootstrapFactory bootstrapFactory;
private final String nodeName;
private final ClientHandlerMetricSource metrics;
private final AuthenticationManager authenticationManager;
private final Ignite ignite;
private final FakePlacementDriver placementDriver = new FakePlacementDriver(FakeInternalTable.PARTITIONS);
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
/**
* Constructor.
*
* @param idleTimeout Idle timeout.
* @param ignite Ignite.
*/
public TestServer(
long idleTimeout,
Ignite ignite
) {
this(
idleTimeout,
ignite,
null,
null,
null,
UUID.randomUUID(),
null,
null,
null
);
}
/**
* Constructor.
*/
public TestServer(
long idleTimeout,
Ignite ignite,
@Nullable Function<Integer, Boolean> shouldDropConnection,
@Nullable Function<Integer, Integer> responseDelay,
@Nullable String nodeName,
UUID clusterId,
@Nullable SecurityConfiguration securityConfiguration,
@Nullable Integer port
) {
this(
idleTimeout,
ignite,
shouldDropConnection,
responseDelay,
nodeName,
clusterId,
securityConfiguration,
port,
null
);
}
/**
* Constructor.
*
* @param idleTimeout Idle timeout.
* @param ignite Ignite.
*/
public TestServer(
long idleTimeout,
Ignite ignite,
@Nullable Function<Integer, Boolean> shouldDropConnection,
@Nullable Function<Integer, Integer> responseDelay,
@Nullable String nodeName,
UUID clusterId,
@Nullable SecurityConfiguration securityConfiguration,
@Nullable Integer port,
@Nullable HybridClock clock
) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
generator = new ConfigurationTreeGenerator(ClientConnectorConfiguration.KEY, NetworkConfiguration.KEY);
cfg = new ConfigurationRegistry(
List.of(ClientConnectorConfiguration.KEY, NetworkConfiguration.KEY),
new TestConfigurationStorage(LOCAL),
generator,
new TestConfigurationValidator()
);
assertThat(cfg.startAsync(), willCompleteSuccessfully());
cfg.getConfiguration(ClientConnectorConfiguration.KEY).change(
local -> local.changePort(port != null ? port : getFreePort()).changeIdleTimeout(idleTimeout)
).join();
bootstrapFactory = new NettyBootstrapFactory(cfg.getConfiguration(NetworkConfiguration.KEY), "TestServer-");
assertThat(bootstrapFactory.startAsync(), willCompleteSuccessfully());
if (nodeName == null) {
nodeName = "server-1";
}
this.nodeName = nodeName;
this.ignite = ignite;
ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn(getNodeId(nodeName));
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn(nodeName);
Mockito.when(clusterService.topologyService().localMember()).thenReturn(getClusterNode(nodeName));
Mockito.when(clusterService.topologyService().getByConsistentId(anyString())).thenAnswer(
i -> getClusterNode(i.getArgument(0, String.class)));
IgniteComputeInternal compute = new FakeCompute(nodeName);
metrics = new ClientHandlerMetricSource();
metrics.enable();
if (clock == null) {
clock = new HybridClockImpl();
}
if (securityConfiguration == null) {
authenticationManager = new DummyAuthenticationManager();
} else {
authenticationManager = new AuthenticationManagerImpl(securityConfiguration, ign -> {});
assertThat(authenticationManager.startAsync(), willCompleteSuccessfully());
}
ClusterTag tag = msgFactory.clusterTag()
.clusterName("Test Server")
.clusterId(clusterId)
.build();
ClientConnectorConfiguration clientConnectorConfiguration = cfg.getConfiguration(ClientConnectorConfiguration.KEY);
module = shouldDropConnection != null
? new TestClientHandlerModule(
ignite,
bootstrapFactory,
shouldDropConnection,
responseDelay,
clusterService,
compute,
tag,
metrics,
authenticationManager,
clock,
placementDriver,
clientConnectorConfiguration)
: new ClientHandlerModule(
((FakeIgnite) ignite).queryEngine(),
(IgniteTablesInternal) ignite.tables(),
(IgniteTransactionsImpl) ignite.transactions(),
compute,
clusterService,
bootstrapFactory,
() -> CompletableFuture.completedFuture(tag),
mock(MetricManager.class),
metrics,
authenticationManager,
new TestClockService(clock),
new AlwaysSyncedSchemaSyncService(),
new FakeCatalogService(FakeInternalTable.PARTITIONS),
placementDriver,
clientConnectorConfiguration,
new TestLowWatermark()
);
module.startAsync().join();
}
/**
* Gets the port where this instance is listening.
*
* @return TCP port.
*/
public int port() {
SocketAddress addr = module instanceof ClientHandlerModule
? ((ClientHandlerModule) module).localAddress()
: ((TestClientHandlerModule) module).localAddress();
return ((InetSocketAddress) Objects.requireNonNull(addr)).getPort();
}
/**
* Gets the Ignite.
*
* @return Ignite
*/
public Ignite ignite() {
return ignite;
}
/**
* Gets the node name.
*
* @return Node name.
*/
public String nodeName() {
return nodeName;
}
/**
* Gets the node name.
*
* @return Node name.
*/
public String nodeId() {
return getNodeId(nodeName);
}
/**
* Gets metrics.
*
* @return Metrics.
*/
public ClientHandlerMetricSource metrics() {
return metrics;
}
/**
* Gets the placement driver.
*
* @return Placement driver.
*/
public FakePlacementDriver placementDriver() {
return placementDriver;
}
/** {@inheritDoc} */
@Override
public void close() throws Exception {
assertThat(stopAsync(module, authenticationManager, bootstrapFactory, cfg), willCompleteSuccessfully());
generator.close();
}
private ClusterNode getClusterNode(String name) {
return new ClientClusterNode(getNodeId(name), name, new NetworkAddress("127.0.0.1", 8080));
}
private static String getNodeId(String name) {
return name + "-id";
}
private static int getFreePort() {
try (var serverSocket = new ServerSocket(0)) {
return serverSocket.getLocalPort();
} catch (IOException e) {
throw new IOError(e);
}
}
}