blob: 50ae358a4d9e04c402ebece27a61a306989b77af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
/**
* Several Ignite servers running on localhost.
*/
public class LocalIgniteCluster implements AutoCloseable {
/** Host. */
private static String host = "127.0.0.1";
/** Randomizer. */
private static final Random rnd = new Random();
/** Servers. */
private final List<Ignite> srvs = new ArrayList<>();
/** Configurations of the failed servers. */
private final List<NodeConfiguration> failedCfgs = new ArrayList<>();
/** Initial cluster size. */
private int initSize;
/** Private constructor: use {@link #start(int)} to create instances of {@link LocalIgniteCluster}. */
private LocalIgniteCluster(int initSize) {
if (initSize < 1)
throw new IllegalArgumentException("Cluster must have at least one node.");
this.initSize = initSize;
for (int i = 0; i < initSize; i++) {
IgniteConfiguration cfg = getConfiguration(
new NodeConfiguration(TcpDiscoverySpi.DFLT_PORT + i, ClientConnectorConfiguration.DFLT_PORT + i)
);
Ignite ignite = Ignition.start(cfg);
srvs.add(ignite);
}
}
/**
* Create and start start the cluster with default host.
*/
public static LocalIgniteCluster start(int initSize) {
return new LocalIgniteCluster(initSize);
}
/**
* Create and start start the cluster with custom host.
*/
public static LocalIgniteCluster start(int initSize, String host) {
LocalIgniteCluster.host = host;
return new LocalIgniteCluster(initSize);
}
/** {@inheritDoc} */
@Override public synchronized void close() {
srvs.forEach(Ignite::close);
srvs.clear();
failedCfgs.clear();
}
/**
* Remove one random node.
*/
public synchronized void failNode() {
if (srvs.isEmpty())
throw new IllegalStateException("Cannot remove node from empty cluster");
Ignite srv = srvs.get(rnd.nextInt(srvs.size()));
IgniteConfiguration cfg = srv.configuration();
NodeConfiguration nodeCfg = new NodeConfiguration(
((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder().getRegisteredAddresses().iterator().next().getPort(),
Objects.requireNonNull(cfg.getClientConnectorConfiguration()).getPort()
);
srv.close();
srvs.remove(srv);
failedCfgs.add(nodeCfg);
}
/**
* Restore one of the failed nodes.
*/
public synchronized void restoreNode() {
if (failedCfgs.isEmpty())
throw new IllegalStateException("Cannot restore nodes in healthy cluster");
NodeConfiguration nodeCfg = failedCfgs.get(rnd.nextInt(failedCfgs.size()));
Ignite ignite = Ignition.start(getConfiguration(nodeCfg));
srvs.add(ignite);
failedCfgs.remove(nodeCfg);
}
/**
* @return Client connection string as defined by
* {@link ClientConfiguration#setAddresses(String...)}.
*/
public Collection<String> clientAddresses() {
return srvs.stream()
.map(s -> {
ClientConnectorConfiguration cfg = s.configuration().getClientConnectorConfiguration();
return cfg == null ? null : String.format("%s:%s", cfg.getHost(), cfg.getPort());
})
.collect(Collectors.toCollection(ArrayList::new));
}
/**
* @return Number of nodes in the cluster.
*/
public int size() {
return srvs.size();
}
/**
* @return Initial cluster size (number of nodes in the healthy cluster).
*/
public int getInitialSize() {
return initSize;
}
/** */
private static IgniteConfiguration getConfiguration(NodeConfiguration nodeCfg) {
IgniteConfiguration igniteCfg = Config.getServerConfiguration();
igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration()
.setHost(host)
.setPort(nodeCfg.getClientPort())
);
return igniteCfg;
}
/** Settings unique for each node in the cluster. */
private static class NodeConfiguration {
/** Discovery port. */
private final int discoveryPort;
/** Client port. */
private final int clientPort;
/** */
NodeConfiguration(int discoveryPort, int clientPort) {
this.discoveryPort = discoveryPort;
this.clientPort = clientPort;
}
/** */
int getDiscoveryPort() {
return discoveryPort;
}
/** */
int getClientPort() {
return clientPort;
}
}
}