| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.cluster; |
| |
| import java.io.Externalizable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.ObjectStreamException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteCluster; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.BaselineNode; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterGroupEmptyException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterStartNodeResult; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteComponentType; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.managers.discovery.DiscoCache; |
| import org.apache.ignite.internal.processors.cluster.BaselineTopology; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.IgniteFutureImpl; |
| import org.apache.ignite.internal.util.nodestart.IgniteRemoteStartSpecification; |
| import org.apache.ignite.internal.util.nodestart.IgniteSshHelper; |
| import org.apache.ignite.internal.util.nodestart.StartNodeCallable; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; |
| import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.parseFile; |
| import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.specifications; |
| |
| /** |
| * |
| */ |
| public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private IgniteConfiguration cfg; |
| |
| /** Node local store. */ |
| @GridToStringExclude |
| private ConcurrentMap nodeLoc; |
| |
| /** Client reconnect future. */ |
| private IgniteFuture<?> reconnecFut; |
| |
| /** Minimal IgniteProductVersion supporting BaselineTopology */ |
| private static final IgniteProductVersion MIN_BLT_SUPPORTING_VER = IgniteProductVersion.fromString("2.4.0"); |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public IgniteClusterImpl() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public IgniteClusterImpl(GridKernalContext ctx) { |
| super(ctx, null, (IgnitePredicate<ClusterNode>)null); |
| |
| cfg = ctx.config(); |
| |
| nodeLoc = new ClusterNodeLocalMapImpl(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClusterGroup forLocal() { |
| guard(); |
| |
| try { |
| return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId())); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClusterNode localNode() { |
| guard(); |
| |
| try { |
| ClusterNode node = ctx.discovery().localNode(); |
| |
| assert node != null; |
| |
| return node; |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() { |
| return nodeLoc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean pingNode(UUID nodeId) { |
| A.notNull(nodeId, "nodeId"); |
| |
| guard(); |
| |
| try { |
| return ctx.discovery().pingNode(nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long topologyVersion() { |
| guard(); |
| |
| try { |
| return ctx.discovery().topologyVersion(); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException { |
| guard(); |
| |
| try { |
| return ctx.discovery().topology(topVer); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<ClusterStartNodeResult> startNodes(File file, |
| boolean restart, |
| int timeout, |
| int maxConn) |
| throws IgniteException |
| { |
| try { |
| return startNodesAsync0(file, restart, timeout, maxConn).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, |
| int timeout, int maxConn) throws IgniteException { |
| return new IgniteFutureImpl<>(startNodesAsync0(file, restart, timeout, maxConn)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<ClusterStartNodeResult> startNodes(Collection<Map<String, Object>> hosts, |
| @Nullable Map<String, Object> dflts, |
| boolean restart, |
| int timeout, |
| int maxConn) |
| throws IgniteException |
| { |
| try { |
| return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync( |
| Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, |
| boolean restart, int timeout, int maxConn) throws IgniteException { |
| return new IgniteFutureImpl<>(startNodesAsync0(hosts, dflts, restart, timeout, maxConn)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stopNodes() throws IgniteException { |
| guard(); |
| |
| try { |
| compute().execute(IgniteKillTask.class, false); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stopNodes(Collection<UUID> ids) throws IgniteException { |
| guard(); |
| |
| try { |
| ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void restartNodes() throws IgniteException { |
| guard(); |
| |
| try { |
| compute().execute(IgniteKillTask.class, true); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void restartNodes(Collection<UUID> ids) throws IgniteException { |
| guard(); |
| |
| try { |
| ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetMetrics() { |
| guard(); |
| |
| try { |
| ctx.jobMetric().reset(); |
| ctx.io().resetMetrics(); |
| ctx.task().resetMetrics(); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean active() { |
| guard(); |
| |
| try { |
| return ctx.state().publicApiActiveState(true); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void active(boolean active) { |
| guard(); |
| |
| try { |
| ctx.state().changeGlobalState(active, baselineNodes(), false).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** */ |
| private Collection<BaselineNode> baselineNodes() { |
| Collection<ClusterNode> srvNodes = ctx.cluster().get().forServers().nodes(); |
| |
| ArrayList baselineNodes = new ArrayList(srvNodes.size()); |
| |
| for (ClusterNode clN : srvNodes) |
| baselineNodes.add(clN); |
| |
| return baselineNodes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Collection<BaselineNode> currentBaselineTopology() { |
| guard(); |
| |
| try { |
| BaselineTopology blt = ctx.state().clusterState().baselineTopology(); |
| |
| return blt != null ? blt.currentBaseline() : null; |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setBaselineTopology(Collection<? extends BaselineNode> baselineTop) { |
| guard(); |
| |
| try { |
| if (isInMemoryMode()) |
| return; |
| |
| validateBeforeBaselineChange(baselineTop); |
| |
| ctx.state().changeGlobalState(true, baselineTop, true).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** */ |
| private boolean isInMemoryMode() { |
| return !CU.isPersistenceEnabled(cfg); |
| } |
| |
| /** |
| * Verifies all nodes in current cluster topology support BaselineTopology feature |
| * so compatibilityMode flag is enabled to reset. |
| * |
| * @param discoCache |
| */ |
| private void verifyBaselineTopologySupport(DiscoCache discoCache) { |
| if (discoCache.minimumServerNodeVersion().compareTo(MIN_BLT_SUPPORTING_VER) < 0) { |
| SB sb = new SB("Cluster contains nodes that don't support BaselineTopology: ["); |
| |
| for (ClusterNode cn : discoCache.serverNodes()) { |
| if (cn.version().compareTo(MIN_BLT_SUPPORTING_VER) < 0) |
| sb |
| .a("[") |
| .a(cn.consistentId()) |
| .a(":") |
| .a(cn.version()) |
| .a("], "); |
| } |
| |
| sb.d(sb.length() - 2, sb.length()); |
| |
| throw new IgniteException(sb.a("]").toString()); |
| } |
| } |
| |
| /** |
| * Executes validation checks of cluster state and BaselineTopology before changing BaselineTopology to new one. |
| */ |
| private void validateBeforeBaselineChange(Collection<? extends BaselineNode> baselineTop) { |
| verifyBaselineTopologySupport(ctx.discovery().discoCache()); |
| |
| if (!ctx.state().clusterState().active()) |
| throw new IgniteException("Changing BaselineTopology on inactive cluster is not allowed."); |
| |
| if (baselineTop != null) { |
| if (baselineTop.isEmpty()) |
| throw new IgniteException("BaselineTopology must contain at least one node."); |
| |
| Collection<Object> onlineNodes = onlineBaselineNodesRequestedForRemoval(baselineTop); |
| |
| if (onlineNodes != null) { |
| if (!onlineNodes.isEmpty()) |
| throw new IgniteException("Removing online nodes from BaselineTopology is not supported: " + onlineNodes); |
| } |
| } |
| } |
| |
| /** */ |
| @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(Collection<? extends BaselineNode> newBlt) { |
| BaselineTopology blt = ctx.state().clusterState().baselineTopology(); |
| Set<Object> bltConsIds; |
| |
| if (blt == null) |
| return null; |
| else |
| bltConsIds = blt.consistentIds(); |
| |
| ArrayList<Object> onlineNodesRequestedForRemoval = new ArrayList<>(); |
| |
| Collection<Object> aliveNodesConsIds = getConsistentIds(ctx.discovery().aliveServerNodes()); |
| |
| Collection<Object> newBltConsIds = getConsistentIds(newBlt); |
| |
| for (Object oldBltConsId : bltConsIds) { |
| if (aliveNodesConsIds.contains(oldBltConsId)) { |
| if (!newBltConsIds.contains(oldBltConsId)) |
| onlineNodesRequestedForRemoval.add(oldBltConsId); |
| } |
| } |
| |
| return onlineNodesRequestedForRemoval; |
| } |
| |
| /** */ |
| private Collection<Object> getConsistentIds(Collection<? extends BaselineNode> nodes) { |
| ArrayList<Object> res = new ArrayList<>(nodes.size()); |
| |
| for (BaselineNode n : nodes) |
| res.add(n.consistentId()); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setBaselineTopology(long topVer) { |
| guard(); |
| |
| try { |
| if (isInMemoryMode()) |
| return; |
| |
| Collection<ClusterNode> top = topology(topVer); |
| |
| if (top == null) |
| throw new IgniteException("Topology version does not exist: " + topVer); |
| |
| Collection<BaselineNode> target = new ArrayList<>(top.size()); |
| |
| for (ClusterNode node : top) { |
| if (!node.isClient()) |
| target.add(node); |
| } |
| |
| validateBeforeBaselineChange(target); |
| |
| ctx.state().changeGlobalState(true, target, true).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void enableStatistics(Collection<String> caches, boolean enabled) { |
| guard(); |
| |
| try { |
| ctx.cache().enableStatistics(caches, enabled); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clearStatistics(Collection<String> caches) { |
| guard(); |
| |
| try { |
| ctx.cache().clearStatistics(caches); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) { |
| guard(); |
| |
| try { |
| ctx.cache().setTxTimeoutOnPartitionMapExchange(timeout); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteCluster withAsync() { |
| return new IgniteClusterAsyncImpl(this); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean enableWal(String cacheName) throws IgniteException { |
| return changeWalMode(cacheName, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean disableWal(String cacheName) throws IgniteException { |
| return changeWalMode(cacheName, false); |
| } |
| |
| /** |
| * Change WAL mode. |
| * |
| * @param cacheName Cache name. |
| * @param enabled Enabled flag. |
| * @return {@code True} if WAL mode was changed as a result of this call. |
| */ |
| private boolean changeWalMode(String cacheName, boolean enabled) { |
| A.notNull(cacheName, "cacheName"); |
| |
| guard(); |
| |
| try { |
| return ctx.cache().changeWalMode(Collections.singleton(cacheName), enabled).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isWalEnabled(String cacheName) { |
| guard(); |
| |
| try { |
| return ctx.cache().walEnabled(cacheName); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isAsync() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <R> IgniteFuture<R> future() { |
| throw new IllegalStateException("Asynchronous mode is not enabled."); |
| } |
| |
| /** |
| * @param file Configuration file. |
| * @param restart Whether to stop existing nodes. |
| * @param timeout Connection timeout. |
| * @param maxConn Number of parallel SSH connections to one host. |
| * @return Future with results. |
| * @see IgniteCluster#startNodes(java.io.File, boolean, int, int) |
| */ |
| IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file, |
| boolean restart, |
| int timeout, |
| int maxConn) |
| { |
| A.notNull(file, "file"); |
| A.ensure(file.exists(), "file doesn't exist."); |
| A.ensure(file.isFile(), "file is a directory."); |
| |
| try { |
| IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); |
| |
| return startNodesAsync0(t.get1(), t.get2(), restart, timeout, maxConn); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| } |
| |
| /** |
| * @param hosts Startup parameters. |
| * @param dflts Default values. |
| * @param restart Whether to stop existing nodes |
| * @param timeout Connection timeout in milliseconds. |
| * @param maxConn Number of parallel SSH connections to one host. |
| * @return Future with results. |
| * @see IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int) |
| */ |
| IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0( |
| Collection<Map<String, Object>> hosts, |
| @Nullable Map<String, Object> dflts, |
| boolean restart, |
| int timeout, |
| int maxConn) |
| { |
| A.notNull(hosts, "hosts"); |
| |
| guard(); |
| |
| try { |
| IgniteSshHelper sshHelper = IgniteComponentType.SSH.create(false); |
| |
| Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts); |
| |
| Map<String, ConcurrentLinkedQueue<StartNodeCallable>> runMap = new HashMap<>(); |
| |
| int nodeCallCnt = 0; |
| |
| for (String host : specsMap.keySet()) { |
| InetAddress addr; |
| |
| try { |
| addr = InetAddress.getByName(host); |
| } |
| catch (UnknownHostException e) { |
| throw new IgniteCheckedException("Invalid host name: " + host, e); |
| } |
| |
| Collection<? extends ClusterNode> neighbors = null; |
| |
| if (addr.isLoopbackAddress()) |
| neighbors = neighbors(); |
| else { |
| for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) { |
| ClusterNode node = F.first(p); |
| |
| if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) { |
| neighbors = p; |
| |
| break; |
| } |
| } |
| } |
| |
| int startIdx = 1; |
| |
| if (neighbors != null) { |
| if (restart && !neighbors.isEmpty()) { |
| try { |
| ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false); |
| } |
| catch (ClusterGroupEmptyException ignored) { |
| // No-op, nothing to restart. |
| } |
| } |
| else |
| startIdx = neighbors.size() + 1; |
| } |
| |
| ConcurrentLinkedQueue<StartNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>(); |
| |
| runMap.put(host, nodeRuns); |
| |
| for (IgniteRemoteStartSpecification spec : specsMap.get(host)) { |
| assert spec.host().equals(host); |
| |
| for (int i = startIdx; i <= spec.nodes(); i++) { |
| nodeRuns.add(sshHelper.nodeStartCallable(spec, timeout)); |
| |
| nodeCallCnt++; |
| } |
| } |
| } |
| |
| // If there is nothing to start, return finished future with empty result. |
| if (nodeCallCnt == 0) |
| return new GridFinishedFuture<Collection<ClusterStartNodeResult>>( |
| Collections.<ClusterStartNodeResult>emptyList()); |
| |
| // Exceeding max line width for readability. |
| GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut = |
| new GridCompoundFuture<>(CU.<ClusterStartNodeResult>objectsReducer()); |
| |
| AtomicInteger cnt = new AtomicInteger(nodeCallCnt); |
| |
| // Limit maximum simultaneous connection number per host. |
| for (ConcurrentLinkedQueue<StartNodeCallable> queue : runMap.values()) { |
| for (int i = 0; i < maxConn; i++) { |
| if (!runNextNodeCallable(queue, fut, cnt)) |
| break; |
| } |
| } |
| |
| return fut; |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| finally { |
| unguard(); |
| } |
| } |
| |
| /** |
| * Gets the all grid nodes that reside on the same physical computer as local grid node. |
| * Local grid node is excluded. |
| * <p> |
| * Detection of the same physical computer is based on comparing set of network interface MACs. |
| * If two nodes have the same set of MACs, Ignite considers these nodes running on the same |
| * physical computer. |
| * @return Grid nodes that reside on the same physical computer as local grid node. |
| */ |
| private Collection<ClusterNode> neighbors() { |
| Collection<ClusterNode> neighbors = new ArrayList<>(1); |
| |
| String macs = localNode().attribute(ATTR_MACS); |
| |
| assert macs != null; |
| |
| for (ClusterNode n : forOthers(localNode()).nodes()) { |
| if (macs.equals(n.attribute(ATTR_MACS))) |
| neighbors.add(n); |
| } |
| |
| return neighbors; |
| } |
| |
| /** |
| * Runs next callable from host node start queue. |
| * |
| * @param queue Queue of tasks to poll from. |
| * @param comp Compound future that comprise all started node tasks. |
| * @param cnt Atomic counter to check if all futures are added to compound future. |
| * @return {@code True} if task was started, {@code false} if queue was empty. |
| */ |
| private boolean runNextNodeCallable(final ConcurrentLinkedQueue<StartNodeCallable> queue, |
| final GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> |
| comp, |
| final AtomicInteger cnt) |
| { |
| StartNodeCallable call = queue.poll(); |
| |
| if (call == null) |
| return false; |
| |
| IgniteInternalFuture<ClusterStartNodeResult> fut = ctx.closure().callLocalSafe(call, true); |
| |
| comp.add(fut); |
| |
| if (cnt.decrementAndGet() == 0) |
| comp.markInitialized(); |
| |
| fut.listen(new CI1<IgniteInternalFuture<ClusterStartNodeResult>>() { |
| @Override public void apply(IgniteInternalFuture<ClusterStartNodeResult> f) { |
| runNextNodeCallable(queue, comp, cnt); |
| } |
| }); |
| |
| return true; |
| } |
| |
| /** |
| * Clears node local map. |
| */ |
| public void clearNodeMap() { |
| nodeLoc.clear(); |
| } |
| |
| /** |
| * @param reconnecFut Reconnect future. |
| */ |
| public void clientReconnectFuture(IgniteFuture<?> reconnecFut) { |
| this.reconnecFut = reconnecFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public IgniteFuture<?> clientReconnectFuture() { |
| return reconnecFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| ctx = (GridKernalContext)in.readObject(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| out.writeObject(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected Object readResolve() throws ObjectStreamException { |
| return ctx.grid().cluster(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return "IgniteCluster [igniteInstanceName=" + ctx.igniteInstanceName() + ']'; |
| } |
| } |