blob: 44eef02ef913352deb590173ebcc6fe76f414e7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
* Discovery data exchange test.
*/
public class DiscoverySpiDataExchangeTest extends GridCommonAbstractTest {
/** Closure to collect discovery data bags when node is joined. */
private BiConsumer<ClusterNode, DiscoveryDataBag> dataExchangeCollectClosure;
/** {@inheritDoc} */
@Override protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException {
IgniteConfiguration cfg0 = super.optimize(cfg);
cfg0.setDiscoverySpi(new DelegatedDiscoverySpi((IgniteDiscoverySpi)cfg.getDiscoverySpi()));
return cfg0;
}
/**
* Ensures that {@link DiscoveryDataBag#isJoiningNodeClient()} returns a valid value when joining a node.
*
* @throws Exception In case of error.
*/
@Test
public void testJoiningNodeClientFlag() throws Exception {
for (int nodeIdx = 0; nodeIdx < 4; nodeIdx++) {
Collection<T2<ClusterNode, DiscoveryDataBag>> dataBags = new ConcurrentLinkedQueue<>();
dataExchangeCollectClosure = (locNode, dataBag) -> dataBags.add(new T2<>(locNode, dataBag));
IgniteEx node = nodeIdx % 2 == 0 ? startGrid(nodeIdx) : startClientGrid(nodeIdx);
assertFalse(dataBags.isEmpty());
assertTrue(dataBags.toString(),
dataBags.stream().allMatch(pair -> node.context().clientNode() == pair.get2().isJoiningNodeClient()));
}
}
/** Delegated discovery. */
@IgniteSpiMultipleInstancesSupport(true)
@DiscoverySpiHistorySupport(true)
private class DelegatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi {
/** Discovery delegate. */
private final IgniteDiscoverySpi delegate;
/**
* @param delegate Discovery delegate.
*/
DelegatedDiscoverySpi(IgniteDiscoverySpi delegate) {
this.delegate = delegate;
}
/** {@inheritDoc} */
@Override public @Nullable Serializable consistentId() throws IgniteSpiException {
return delegate.consistentId();
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> getRemoteNodes() {
return delegate.getRemoteNodes();
}
/** {@inheritDoc} */
@Override public ClusterNode getLocalNode() {
return delegate.getLocalNode();
}
/** {@inheritDoc} */
@Override public @Nullable ClusterNode getNode(UUID nodeId) {
return delegate.getNode(nodeId);
}
/** {@inheritDoc} */
@Override public boolean pingNode(UUID nodeId) {
return delegate.pingNode(nodeId);
}
/** {@inheritDoc} */
@Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
delegate.setNodeAttributes(attrs, ver);
}
/** {@inheritDoc} */
@Deprecated
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
delegate.setListener(lsnr);
}
/** {@inheritDoc} */
@Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
delegate.setDataExchange(new DelegatedDiscoverySpiDataExchange(exchange));
}
/** {@inheritDoc} */
@Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
delegate.setMetricsProvider(metricsProvider);
}
/** {@inheritDoc} */
@Override public void disconnect() throws IgniteSpiException {
delegate.disconnect();
}
/** {@inheritDoc} */
@Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
delegate.setAuthenticator(auth);
}
/** {@inheritDoc} */
@Override public long getGridStartTime() {
return delegate.getGridStartTime();
}
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
delegate.sendCustomEvent(msg);
}
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
delegate.failNode(nodeId, warning);
}
/** {@inheritDoc} */
@Override public boolean isClientMode() throws IllegalStateException {
return delegate.isClientMode();
}
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
delegate.spiStart(igniteInstanceName);
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
delegate.spiStop();
}
/** {@inheritDoc} */
@Override protected void onContextInitialized0(final IgniteSpiContext spiCtx) throws IgniteSpiException {
delegate.onContextInitialized(spiCtx);
}
/** {@inheritDoc} */
@Override public void injectResources(Ignite ignite) {
super.injectResources(ignite);
if (ignite != null && delegate instanceof IgniteSpiAdapter) {
try {
((IgniteEx)ignite).context().resource().inject(delegate);
}
catch (IgniteCheckedException e) {
log.error("Unable to inject resources", e);
}
}
}
/** {@inheritDoc} */
@Override public boolean knownNode(UUID nodeId) {
return delegate.knownNode(nodeId);
}
/** {@inheritDoc} */
@Override public boolean clientReconnectSupported() {
return delegate.clientReconnectSupported();
}
/** {@inheritDoc} */
@Override public void clientReconnect() {
delegate.clientReconnect();
}
/** {@inheritDoc} */
@Override public boolean allNodesSupport(IgniteFeatures feature) {
return delegate.allNodesSupport(feature);
}
/** {@inheritDoc} */
@Override public void simulateNodeFailure() {
delegate.simulateNodeFailure();
}
/** {@inheritDoc} */
@Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
delegate.setInternalListener(lsnr);
}
/** {@inheritDoc} */
@Override public boolean supportsCommunicationFailureResolve() {
return delegate.supportsCommunicationFailureResolve();
}
/** {@inheritDoc} */
@Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
delegate.resolveCommunicationFailure(node, err);
}
/** Delegated discovery data exchange. */
private class DelegatedDiscoverySpiDataExchange implements DiscoverySpiDataExchange {
/** Discovery data exchange delegate. */
private final DiscoverySpiDataExchange delegate;
/**
* @param delegate Discovery data exchange delegate.
*/
public DelegatedDiscoverySpiDataExchange(DiscoverySpiDataExchange delegate) {
this.delegate = delegate;
}
/** {@inheritDoc} */
@Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
if (dataExchangeCollectClosure != null)
dataExchangeCollectClosure.accept(getLocalNode(), dataBag);
return delegate.collect(dataBag);
}
/** {@inheritDoc} */
@Override public void onExchange(DiscoveryDataBag dataBag) {
delegate.onExchange(dataBag);
}
}
}
}