| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.testframework.junits.common; |
| |
| import java.lang.reflect.Method; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.BitSet; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import javax.cache.Cache; |
| import javax.cache.CacheException; |
| import javax.cache.integration.CompletionListener; |
| import javax.management.MBeanServer; |
| import javax.management.ObjectInstance; |
| import javax.net.ssl.HostnameVerifier; |
| import javax.net.ssl.HttpsURLConnection; |
| import javax.net.ssl.SSLSession; |
| import junit.framework.AssertionFailedError; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteEvents; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteMessaging; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.cache.affinity.AffinityFunctionContext; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.compute.ComputeTask; |
| import org.apache.ignite.compute.ComputeTaskFuture; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.pagemem.wal.record.DataRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; |
| import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; |
| import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan; |
| import org.apache.ignite.internal.processors.cache.GridCacheFuture; |
| import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter; |
| import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.local.GridLocalCache; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; |
| import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; |
| import org.apache.ignite.internal.processors.service.IgniteServiceProcessor; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.T3; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.visor.VisorTaskArgument; |
| import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; |
| import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.mxbean.MXBeanDescription; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.testframework.GridTestNode; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.GridAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.apache.ignite.transactions.TransactionRollbackException; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.Collections.emptyList; |
| import static java.util.stream.Collectors.toList; |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED; |
| import static org.apache.ignite.IgniteSystemProperties.getBoolean; |
| import static org.apache.ignite.cache.CacheMode.LOCAL; |
| import static org.apache.ignite.cache.CacheRebalanceMode.NONE; |
| import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; |
| import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * Super class for all common tests. |
| */ |
| public abstract class GridCommonAbstractTest extends GridAbstractTest { |
| /** Cache peek modes array that consist of only ONHEAP mode. */ |
| protected static final CachePeekMode[] ONHEAP_PEEK_MODES = new CachePeekMode[] {CachePeekMode.ONHEAP}; |
| |
| /** Service deployment wait timeout. */ |
| protected static final int SERVICE_DEPLOYMENT_WAIT_TIMEOUT = 10_000; |
| |
| /** |
| * @param startGrid If {@code true}, then grid node will be auto-started. |
| */ |
| protected GridCommonAbstractTest(boolean startGrid) { |
| super(startGrid); |
| } |
| |
| /** */ |
| protected GridCommonAbstractTest() { |
| super(false); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @return Cache. |
| */ |
| protected <K, V> IgniteCache<K, V> jcache(int idx) { |
| return grid(idx).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); |
| } |
| |
| /** |
| * Get or create instance of cache specified by K,V types; |
| * new instance of cache is created for each pair of types key and value |
| * |
| * @param clsK Key class. |
| * @param clsV Value class. |
| * @return cache instance |
| */ |
| protected <K, V> IgniteCache<K, V> jcache(CacheConfiguration ccfg, Class<K> clsK, Class<V> clsV) { |
| return jcache(grid(), ccfg, clsK, clsV); |
| } |
| |
| /** |
| * Get or create instance of cache specified by K,V types; |
| * new instance of cache is created for each pair of types key and value |
| * |
| * @param ig Ignite. |
| * @param ccfg Cache configuration. |
| * @param clsK Key class. |
| * @param clsV Value class. |
| * @return cache instance |
| */ |
| protected <K, V> IgniteCache<K, V> jcache(Ignite ig, CacheConfiguration ccfg, Class<K> clsK, Class<V> clsV) { |
| return jcache(ig, ccfg, clsK.getSimpleName() + "-" + clsV.getSimpleName(), clsK, clsV); |
| } |
| |
| /** |
| * Get or create instance of cache specified by K,V types; |
| * new instance of cache is created for each pair of types key and value |
| * |
| * @param ig Ignite. |
| * @param ccfg Cache configuration. |
| * @param name Cache name. |
| * @param clsK Key class. |
| * @param clsV Value class. |
| * @return cache instance |
| */ |
| @SuppressWarnings("unchecked") |
| protected <K, V> IgniteCache<K, V> jcache(Ignite ig, |
| CacheConfiguration ccfg, |
| @NotNull String name, |
| Class<K> clsK, |
| Class<V> clsV) { |
| CacheConfiguration<K, V> cc = new CacheConfiguration<>(ccfg); |
| cc.setName(name); |
| cc.setIndexedTypes(clsK, clsV); |
| |
| return ig.getOrCreateCache(cc); |
| } |
| |
| /** |
| * Get or create instance of cache specified by K,V types; |
| * new instance of cache is created |
| * |
| * @param ig Ignite. |
| * @param ccfg Cache configuration. |
| * @param name Cache name. |
| * @return cache instance |
| */ |
| @SuppressWarnings("unchecked") |
| protected <K, V> IgniteCache<K, V> jcache(Ignite ig, |
| CacheConfiguration ccfg, |
| @NotNull String name) { |
| CacheConfiguration<K, V> cc = new CacheConfiguration<>(ccfg); |
| cc.setName(name); |
| |
| return ig.getOrCreateCache(cc); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param name Cache name. |
| * @return Cache. |
| */ |
| protected <K, V> IgniteCache<K, V> jcache(int idx, String name) { |
| return grid(idx).cache(name); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @return Cache. |
| */ |
| protected <K, V> GridCacheAdapter<K, V> internalCache(int idx) { |
| return ((IgniteKernal)grid(idx)).internalCache(DEFAULT_CACHE_NAME); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param name Cache name. |
| * @return Cache. |
| */ |
| protected <K, V> GridCacheAdapter<K, V> internalCache(int idx, String name) { |
| return ((IgniteKernal)grid(idx)).internalCache(name); |
| } |
| |
| /** |
| * @param ignite Grid. |
| * @param name Cache name. |
| * @return Cache. |
| */ |
| protected <K, V> GridCacheAdapter<K, V> internalCache(Ignite ignite, String name) { |
| return ((IgniteKernal)ignite).internalCache(name); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Cache. |
| */ |
| protected static <K, V> GridCacheAdapter<K, V> internalCache0(IgniteCache<K, V> cache) { |
| if (isMultiJvmObject(cache)) |
| throw new UnsupportedOperationException("Operation can't be supported automatically for multi jvm " + |
| "(send closure instead)."); |
| |
| return ((IgniteKernal)cache.unwrap(Ignite.class)).internalCache(cache.getName()); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Cache. |
| */ |
| protected <K, V> GridCacheAdapter<K, V> internalCache(IgniteCache<K, V> cache) { |
| return internalCache0(cache); |
| } |
| |
| /** |
| * @return Cache. |
| */ |
| protected <K, V> IgniteCache<K, V> jcache() { |
| return grid().cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Cache. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| protected <K> Set<K> keySet(IgniteCache<K, ?> cache) { |
| Set<K> res = new HashSet<>(); |
| |
| for (Cache.Entry<K, ?> entry : cache) |
| res.add(entry.getKey()); |
| |
| return res; |
| } |
| |
| /** |
| * @return Cache. |
| */ |
| protected <K, V> GridLocalCache<K, V> local() { |
| return (GridLocalCache<K, V>)((IgniteKernal)grid()).<K, V>internalCache(DEFAULT_CACHE_NAME); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return DHT cache. |
| */ |
| protected static <K, V> GridDhtCacheAdapter<K, V> dht(IgniteCache<K, V> cache) { |
| return nearEnabled(cache) ? near(cache).dht() : |
| ((IgniteKernal)cache.unwrap(Ignite.class)).<K, V>internalCache(cache.getName()).context().dht(); |
| } |
| |
| /** |
| * @return DHT cache. |
| */ |
| protected <K, V> GridDhtCacheAdapter<K, V> dht() { |
| GridCacheAdapter<K, V> internalCache = ((IgniteKernal)grid()).internalCache(DEFAULT_CACHE_NAME); |
| |
| return internalCache.isNear() ? internalCache.context().near().dht() : internalCache.context().dht(); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @return DHT cache. |
| */ |
| protected <K, V> GridDhtCacheAdapter<K, V> dht(int idx) { |
| GridCacheAdapter<K, V> internalCache = ((IgniteKernal)grid(idx)).internalCache(DEFAULT_CACHE_NAME); |
| |
| return internalCache.isNear() ? internalCache.context().near().dht() : internalCache.context().dht(); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param cache Cache name. |
| * @return DHT cache. |
| */ |
| protected <K, V> GridDhtCacheAdapter<K, V> dht(int idx, String cache) { |
| GridCacheAdapter<K, V> internalCache = ((IgniteKernal)grid(idx)).internalCache(cache); |
| |
| return internalCache.isNear() ? internalCache.context().near().dht() : internalCache.context().dht(); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param cache Cache name. |
| * @return Colocated cache. |
| */ |
| protected <K, V> GridDhtColocatedCache<K, V> colocated(int idx, String cache) { |
| return (GridDhtColocatedCache<K, V>)((IgniteKernal)grid(idx)).internalCache(cache); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return {@code True} if near cache is enabled. |
| */ |
| private static <K, V> boolean nearEnabled(GridCacheAdapter<K, V> cache) { |
| return isNearEnabled(cache.configuration()); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return {@code True} if near cache is enabled. |
| */ |
| protected static <K, V> boolean nearEnabled(final IgniteCache<K, V> cache) { |
| CacheConfiguration cfg = GridAbstractTest.executeOnLocalOrRemoteJvm(cache, |
| new TestCacheCallable<K, V, CacheConfiguration>() { |
| private static final long serialVersionUID = 0L; |
| |
| @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception { |
| return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config(); |
| } |
| }); |
| |
| return isNearEnabled(cfg); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Near cache. |
| */ |
| private static <K, V> GridNearCacheAdapter<K, V> near(GridCacheAdapter<K, V> cache) { |
| return cache.context().near(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Near cache. |
| */ |
| protected static <K, V> GridNearCacheAdapter<K, V> near(IgniteCache<K, V> cache) { |
| return ((IgniteKernal)cache.unwrap(Ignite.class)).<K, V>internalCache(cache.getName()).context().near(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Colocated cache. |
| */ |
| protected static <K, V> GridDhtColocatedCache<K, V> colocated(IgniteCache<K, V> cache) { |
| return ((IgniteKernal)cache.unwrap(Ignite.class)).<K, V>internalCache(cache.getName()).context().colocated(); |
| } |
| |
| /** |
| * @param cache Ignite cache. |
| * @return CacheAtomicityMode for given cache. |
| */ |
| public static CacheAtomicityMode atomicityMode(IgniteCache cache) { |
| return ((CacheConfiguration)cache.getConfiguration(CacheConfiguration.class)).getAtomicityMode(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param keys Keys. |
| * @param replaceExistingValues Replace existing values. |
| * @throws Exception If failed. |
| */ |
| protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) |
| throws Exception { |
| IgniteCache<K, Object> cacheCp = (IgniteCache<K, Object>)cache; |
| |
| GridAbstractTest.executeOnLocalOrRemoteJvm(cacheCp, new TestCacheRunnable<K, Object>() { |
| private static final long serialVersionUID = -3030833765012500545L; |
| |
| @Override public void run(Ignite ignite, IgniteCache<K, Object> cache) throws Exception { |
| final AtomicReference<Exception> ex = new AtomicReference<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| cache.loadAll(keys, replaceExistingValues, new CompletionListener() { |
| @Override public void onCompletion() { |
| latch.countDown(); |
| } |
| |
| @Override public void onException(Exception e) { |
| ex.set(e); |
| |
| latch.countDown(); |
| } |
| }); |
| |
| latch.await(); |
| |
| if (ex.get() != null) |
| throw ex.get(); |
| } |
| }); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Keys. |
| * @param replaceExistingValues Replace existing values. |
| * @throws Exception If failed. |
| */ |
| protected static <K> void load(Cache<K, ?> cache, K key, boolean replaceExistingValues) throws Exception { |
| loadAll(cache, Collections.singleton(key), replaceExistingValues); |
| } |
| |
| /** |
| * @return Near cache. |
| */ |
| protected <K, V> GridNearCacheAdapter<K, V> near() { |
| return ((IgniteKernal)grid()).<K, V>internalCache(DEFAULT_CACHE_NAME).context().near(); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @return Near cache. |
| */ |
| protected <K, V> GridNearCacheAdapter<K, V> near(int idx) { |
| return ((IgniteKernal)grid(idx)).<K, V>internalCache(DEFAULT_CACHE_NAME).context().near(); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @return Colocated cache. |
| */ |
| protected <K, V> GridDhtColocatedCache<K, V> colocated(int idx) { |
| return (GridDhtColocatedCache<K, V>)((IgniteKernal)grid(idx)).<K, V>internalCache(DEFAULT_CACHE_NAME); |
| } |
| |
| /** |
| * @param idx Grid index. |
| * @param cache Cache name. |
| * @return Near cache. |
| */ |
| protected <K, V> GridNearCacheAdapter<K, V> near(int idx, String cache) { |
| return ((IgniteKernal)grid(idx)).<K, V>internalCache(cache).context().near(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected final boolean isJunitFrameworkClass() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| // Disable SSL hostname verifier. |
| HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() { |
| @Override public boolean verify(String s, SSLSession sslSes) { |
| return true; |
| } |
| }); |
| |
| super.beforeTest(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected final Ignite startGridsMultiThreaded(int cnt) throws Exception { |
| return startGridsMultiThreaded(cnt, true); |
| } |
| |
| /** |
| * @param cnt Count. |
| * @param awaitPartMapExchange If we need to await partition map exchange. |
| * @return Ignite. |
| * @throws Exception If failed. |
| */ |
| protected final Ignite startGridsMultiThreaded(int cnt, boolean awaitPartMapExchange) throws Exception { |
| Ignite g = super.startGridsMultiThreaded(cnt); |
| |
| if (awaitPartMapExchange) { |
| if (!g.active()) |
| g.active(true); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| return g; |
| } |
| |
| /** |
| * @throws InterruptedException If interrupted. |
| */ |
| protected void awaitPartitionMapExchange() throws InterruptedException { |
| awaitPartitionMapExchange(false, false, null); |
| } |
| |
| /** |
| * @param waitEvicts If {@code true} will wait for evictions finished. |
| * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. |
| * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will |
| * be filtered |
| * @throws InterruptedException If interrupted. |
| */ |
| protected void awaitPartitionMapExchange( |
| boolean waitEvicts, |
| boolean waitNode2PartUpdate, |
| @Nullable Collection<ClusterNode> nodes |
| ) throws InterruptedException { |
| awaitPartitionMapExchange(waitEvicts, waitNode2PartUpdate, nodes, false, null); |
| } |
| |
| /** |
| * @return Maximum time of awaiting PartitionMapExchange operation (in milliseconds) |
| */ |
| protected long getPartitionMapExchangeTimeout() { |
| return 30_000; |
| } |
| |
| /** |
| * @param waitEvicts If {@code true} will wait for evictions finished. |
| * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. |
| * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will |
| * be filtered |
| * @param printPartState If {@code true} will print partition state if evictions not happened. |
| * @throws InterruptedException If interrupted. |
| */ |
| @SuppressWarnings("BusyWait") |
| protected void awaitPartitionMapExchange( |
| boolean waitEvicts, |
| boolean waitNode2PartUpdate, |
| @Nullable Collection<ClusterNode> nodes, |
| boolean printPartState |
| ) throws InterruptedException { |
| awaitPartitionMapExchange(waitEvicts, waitNode2PartUpdate, nodes, printPartState, null); |
| } |
| |
| /** |
| * @param waitEvicts If {@code true} will wait for evictions finished. |
| * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. |
| * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will |
| * be filtered |
| * @param printPartState If {@code true} will print partition state if evictions not happened. |
| * @param cacheNames Wait for specific caches. |
| * @throws InterruptedException If interrupted. |
| */ |
| @SuppressWarnings("BusyWait") |
| protected void awaitPartitionMapExchange( |
| boolean waitEvicts, |
| boolean waitNode2PartUpdate, |
| @Nullable Collection<ClusterNode> nodes, |
| boolean printPartState, |
| @Nullable Set<String> cacheNames |
| ) throws InterruptedException { |
| long timeout = getPartitionMapExchangeTimeout(); |
| |
| long startTime = -1; |
| |
| Set<String> names = new HashSet<>(); |
| |
| Ignite crd = null; |
| |
| for (Ignite g : G.allGrids()) { |
| ClusterNode node = g.cluster().localNode(); |
| |
| if (crd == null || node.order() < crd.cluster().localNode().order()) { |
| crd = g; |
| |
| if (node.order() == 1) |
| break; |
| } |
| } |
| |
| if (crd == null) |
| return; |
| |
| AffinityTopologyVersion waitTopVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx(); |
| |
| if (waitTopVer.topologyVersion() <= 0) |
| waitTopVer = new AffinityTopologyVersion(1, 0); |
| |
| for (Ignite g : G.allGrids()) { |
| if (nodes != null && !nodes.contains(g.cluster().localNode())) |
| continue; |
| |
| IgniteKernal g0 = (IgniteKernal)g; |
| |
| names.add(g0.configuration().getIgniteInstanceName()); |
| |
| if (startTime != -1) { |
| if (startTime != g0.context().discovery().gridStartTime()) |
| fail("Found nodes from different clusters, probable some test does not stop nodes " + |
| "[allNodes=" + names + ']'); |
| } |
| else |
| startTime = g0.context().discovery().gridStartTime(); |
| |
| if (g.cluster().localNode().isDaemon()) |
| continue; |
| |
| IgniteInternalFuture<?> exchFut = |
| g0.context().cache().context().exchange().affinityReadyFuture(waitTopVer); |
| |
| if (exchFut != null && !exchFut.isDone()) { |
| try { |
| exchFut.get(timeout); |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to wait for exchange [topVer=" + waitTopVer + |
| ", node=" + g0.name() + ']', e); |
| } |
| } |
| |
| for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) { |
| CacheConfiguration cfg = c.context().config(); |
| |
| if (cfg == null || cacheNames != null && !cacheNames.contains(cfg.getName())) |
| continue; |
| |
| if (cfg.getCacheMode() != LOCAL && |
| cfg.getRebalanceMode() != NONE && |
| g.cluster().nodes().size() > 1) { |
| AffinityFunction aff = cfg.getAffinity(); |
| |
| GridDhtCacheAdapter<?, ?> dht = dht(c); |
| |
| GridDhtPartitionTopology top = dht.topology(); |
| |
| for (int p = 0; p < aff.partitions(); p++) { |
| long start = 0; |
| |
| for (int i = 0; ; i++) { |
| boolean match = false; |
| |
| GridCachePartitionExchangeManager<?, ?> exchMgr = dht.context().shared().exchange(); |
| |
| AffinityTopologyVersion readyVer = exchMgr.readyAffinityVersion(); |
| |
| // Wait for the exchange completion (there is a race between ready affinity version |
| // initialization and partition topology version update). |
| // Otherwise, there may be an assertion when printing top.readyTopologyVersion(). |
| try { |
| IgniteInternalFuture<?> fut = exchMgr.affinityReadyFuture(readyVer); |
| |
| if (fut != null) |
| fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| if (readyVer.topologyVersion() > 0 && c.context().started()) { |
| // Must map on updated version of topology. |
| List<ClusterNode> affNodes = |
| dht.context().affinity().assignment(readyVer).idealAssignment().get(p); |
| |
| int affNodesCnt = affNodes.size(); |
| |
| GridDhtTopologyFuture topFut = top.topologyVersionFuture(); |
| |
| Collection<ClusterNode> owners = (topFut != null && topFut.isDone()) ? |
| top.owners(p, AffinityTopologyVersion.NONE) : Collections.<ClusterNode>emptyList(); |
| |
| int ownerNodesCnt = owners.size(); |
| |
| GridDhtLocalPartition loc = top.localPartition(p, readyVer, false); |
| |
| boolean notPrimary = !affNodes.isEmpty() && |
| !affNodes.get(0).equals(dht.context().affinity().primaryByPartition(p, readyVer)); |
| |
| if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) || |
| (waitEvicts && loc != null && loc.state() != OWNING) || |
| notPrimary) { |
| if (i % 50 == 0) |
| LT.warn(log(), "Waiting for topology map update [" + |
| "igniteInstanceName=" + g.name() + |
| ", cache=" + cfg.getName() + |
| ", cacheId=" + dht.context().cacheId() + |
| ", topVer=" + top.readyTopologyVersion() + |
| ", p=" + p + |
| ", affNodesCnt=" + affNodesCnt + |
| ", ownersCnt=" + ownerNodesCnt + |
| ", affNodes=" + F.nodeIds(affNodes) + |
| ", owners=" + F.nodeIds(owners) + |
| ", topFut=" + topFut + |
| ", locNode=" + g.cluster().localNode() + ']'); |
| } |
| else |
| match = true; |
| } |
| else { |
| if (i % 50 == 0) |
| LT.warn(log(), "Waiting for topology map update [" + |
| "igniteInstanceName=" + g.name() + |
| ", cache=" + cfg.getName() + |
| ", cacheId=" + dht.context().cacheId() + |
| ", topVer=" + top.readyTopologyVersion() + |
| ", started=" + dht.context().started() + |
| ", p=" + p + |
| ", readVer=" + readyVer + |
| ", locNode=" + g.cluster().localNode() + ']'); |
| } |
| |
| if (!match) { |
| if (i == 0) |
| start = System.currentTimeMillis(); |
| |
| if (System.currentTimeMillis() - start > timeout) { |
| U.dumpThreads(log); |
| |
| if (printPartState) |
| printPartitionState(c); |
| |
| throw new IgniteException("Timeout of waiting for topology map update [" + |
| "igniteInstanceName=" + g.name() + |
| ", cache=" + cfg.getName() + |
| ", cacheId=" + dht.context().cacheId() + |
| ", topVer=" + top.readyTopologyVersion() + |
| ", p=" + p + |
| ", readVer=" + readyVer + |
| ", locNode=" + g.cluster().localNode() + ']'); |
| } |
| |
| Thread.sleep(20); // Busy wait. |
| |
| continue; |
| } |
| |
| if (i > 0) |
| log().warning("Finished waiting for topology map update [igniteInstanceName=" + |
| g.name() + ", p=" + p + ", duration=" + (System.currentTimeMillis() - start) + |
| "ms]"); |
| |
| break; |
| } |
| } |
| |
| if (waitNode2PartUpdate) { |
| long start = System.currentTimeMillis(); |
| |
| boolean failed = true; |
| |
| while (failed) { |
| failed = false; |
| |
| for (GridDhtPartitionMap pMap : top.partitionMap(true).values()) { |
| if (failed) |
| break; |
| |
| for (Map.Entry entry : pMap.entrySet()) { |
| if (System.currentTimeMillis() - start > timeout) { |
| U.dumpThreads(log); |
| |
| throw new IgniteException("Timeout of waiting for partition state update [" + |
| "igniteInstanceName=" + g.name() + |
| ", cache=" + cfg.getName() + |
| ", cacheId=" + dht.context().cacheId() + |
| ", topVer=" + top.readyTopologyVersion() + |
| ", locNode=" + g.cluster().localNode() + ']'); |
| } |
| |
| if (entry.getValue() != OWNING) { |
| LT.warn(log(), |
| "Waiting for correct partition state part=" + entry.getKey() |
| + ", should be OWNING [state=" + entry.getValue() + "], node=" + |
| g.name() + ", cache=" + c.getName()); |
| |
| Thread.sleep(200); // Busy wait. |
| |
| failed = true; |
| |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| log.info("awaitPartitionMapExchange finished"); |
| } |
| |
| /** |
| * @param top Topology. |
| * @param topVer Version to wait for. |
| * @throws Exception If failed. |
| */ |
| protected final void waitForReadyTopology(final GridDhtPartitionTopology top, final AffinityTopologyVersion topVer) |
| throws Exception { |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return topVer.compareTo(top.readyTopologyVersion()) <= 0; |
| } |
| }, 5000); |
| } |
| |
| /** |
| * @param c Cache proxy. |
| */ |
| protected static void printPartitionState(IgniteCache<?, ?> c) { |
| printPartitionState(c.getConfiguration(CacheConfiguration.class).getName(), 0); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param firstParts Count partition for print (will be print first count partition). |
| * |
| * Print partitionState for cache. |
| */ |
| protected static void printPartitionState(String cacheName, int firstParts) { |
| printPartitionState(cacheName, firstParts, G.allGrids()); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param firstParts Count partition for print (will be print first count partition). |
| * @param nodes Grid nodes. |
| * |
| * Print partitionState for cache. |
| */ |
| protected static void printPartitionState(String cacheName, int firstParts, List<? extends Ignite> nodes) { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append("----preload sync futures----\n"); |
| |
| for (Ignite ig : nodes) { |
| IgniteKernal k = ((IgniteKernal)ig); |
| |
| GridCacheAdapter<Object, Object> adapter = k.internalCache(cacheName); |
| |
| if (adapter == null) |
| continue; |
| |
| IgniteInternalFuture<?> syncFut = adapter |
| .preloader() |
| .syncFuture(); |
| |
| sb.append("nodeId=") |
| .append(k.context().localNodeId()) |
| .append(" consistentId=") |
| .append(k.localNode().consistentId()) |
| .append(" isDone=") |
| .append(syncFut.isDone()) |
| .append("\n"); |
| } |
| |
| sb.append("----rebalance futures----\n"); |
| |
| for (Ignite ig : nodes) { |
| IgniteKernal k = ((IgniteKernal)ig); |
| |
| GridCacheAdapter<Object, Object> adapter = k.internalCache(cacheName); |
| |
| if (adapter == null) |
| continue; |
| |
| IgniteInternalFuture<?> f = adapter |
| .preloader() |
| .rebalanceFuture(); |
| |
| try { |
| sb.append("nodeId=").append(k.context().localNodeId()) |
| .append(" isDone=").append(f.isDone()) |
| .append(" res=").append(f.isDone() ? f.get() : "N/A") |
| .append(" topVer=") |
| .append((U.hasField(f, "topVer") ? |
| String.valueOf(U.<Object>field(f, "topVer")) : "N/A")) |
| .append("\n"); |
| |
| Map<UUID, IgniteDhtDemandedPartitionsMap> remaining = U.field(f, "remaining"); |
| |
| sb.append("remaining: "); |
| sb.append(remaining.toString()); |
| sb.append("\n"); |
| } |
| catch (Throwable e) { |
| log.error(e.getMessage()); |
| } |
| } |
| |
| sb.append("----partition state----\n"); |
| |
| for (Ignite g : nodes) { |
| IgniteKernal g0 = (IgniteKernal)g; |
| |
| sb.append("localNodeId=").append(g0.localNode().id()) |
| .append(" grid=").append(g0.name()) |
| .append("\n"); |
| |
| IgniteCacheProxy<?, ?> cache = null; |
| try { |
| cache = g0.context().cache().jcache(cacheName); |
| } catch (IllegalArgumentException e) { |
| continue; // Client topology. |
| } |
| |
| GridDhtCacheAdapter<?, ?> dht = dht(cache); |
| |
| GridDhtPartitionTopology top = dht.topology(); |
| |
| int parts = firstParts == 0 ? cache.context() |
| .config() |
| .getAffinity() |
| .partitions() : firstParts; |
| |
| for (int p = 0; p < parts; p++) { |
| AffinityTopologyVersion readyVer = dht.context().shared().exchange().readyAffinityVersion(); |
| |
| Collection<UUID> affNodes = F.nodeIds(dht.context() |
| .affinity() |
| .assignment(readyVer) |
| .idealAssignment() |
| .get(p)); |
| |
| GridDhtLocalPartition part = top.localPartition(p, AffinityTopologyVersion.NONE, false); |
| |
| sb.append("local part="); |
| |
| if (part != null) { |
| sb.append(p).append(" counters=") |
| .append(part.dataStore().partUpdateCounter()) |
| .append(" fullSize=") |
| .append(part.fullSize()) |
| .append(" state=").append(part.state()) |
| .append(" reservations=").append(part.reservations()); |
| } |
| else |
| sb.append(p).append(" is null"); |
| |
| sb.append(" isAffNode=") |
| .append(affNodes.contains(g0.localNode().id())) |
| .append("\n"); |
| |
| for (UUID nodeId : F.nodeIds(g0.context().discovery().allNodes())) { |
| if (!nodeId.equals(g0.localNode().id())) { |
| top.readLock(); |
| |
| // Peek to remote state directly to distinguish if a partition is EVICTED or yet not initialized. |
| GridDhtPartitionFullMap map = U.field(top, "node2part"); |
| |
| try { |
| final GridDhtPartitionMap nodeMap = map.get(nodeId); |
| |
| if (nodeMap == null) |
| continue; // Skip client node. |
| |
| final GridDhtPartitionState rmtState = nodeMap.get(p); |
| |
| if (rmtState != null) { |
| sb.append(" nodeId=") |
| .append(nodeId) |
| .append(" part=") |
| .append(p) |
| .append(" state=") |
| .append(rmtState) |
| .append(" isAffNode=") |
| .append(affNodes.contains(nodeId)) |
| .append("\n"); |
| } |
| else { |
| sb.append(" nodeId=") |
| .append(nodeId) |
| .append(" part=") |
| .append(p) |
| .append(" is null") |
| .append("\n"); |
| } |
| } |
| finally { |
| top.readUnlock(); |
| } |
| } |
| } |
| } |
| |
| sb.append("\n"); |
| } |
| |
| log.info("dump partitions state for <" + cacheName + ">:\n" + sb.toString()); |
| } |
| |
| /** |
| * Use method for manual rebalancing cache on all nodes. Note that using |
| * <pre name="code" class="java"> |
| * for (int i = 0; i < G.allGrids(); i++) |
| * grid(i).cache(CACHE_NAME).rebalance().get(); |
| * </pre> |
| * for rebalancing cache will lead to flaky test cases. |
| * |
| * @param ignite Ignite server instance for getting {@code compute} facade over all cluster nodes. |
| * @param cacheName Cache name for manual rebalancing on cluster. Usually used when used when |
| * {@link CacheConfiguration#getRebalanceDelay()} configuration parameter set to {@code -1} value. |
| * @throws IgniteCheckedException If fails. |
| */ |
| protected void manualCacheRebalancing(Ignite ignite, |
| final String cacheName) throws IgniteCheckedException { |
| if (ignite.configuration().isClientMode()) |
| return; |
| |
| IgniteFuture<Void> fut = |
| ignite.compute().withTimeout(5_000).broadcastAsync(new ManualRebalancer(cacheName)); |
| |
| assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return fut.isDone(); |
| } |
| }, 5_000)); |
| } |
| |
| /** |
| * |
| */ |
| private static class ManualRebalancer implements IgniteRunnable { |
| /** */ |
| @LoggerResource |
| IgniteLogger log; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** */ |
| private final String cacheName; |
| |
| public ManualRebalancer(String cacheName) { |
| this.cacheName = cacheName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| IgniteCache<?, ?> cache = ignite.cache(cacheName); |
| |
| assertNotNull(cache); |
| |
| while (!cache.rebalance().get()) { |
| try { |
| U.sleep(100); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| if (log.isInfoEnabled()) |
| log.info("Manual rebalance finished [node=" + ignite.name() + ", cache=" + cacheName + "]"); |
| } |
| } |
| |
| /** |
| * @param ignite Node. |
| */ |
| public void dumpCacheDebugInfo(Ignite ignite) { |
| GridKernalContext ctx = ((IgniteKernal)ignite).context(); |
| |
| log.error("Cache information update [node=" + ignite.name() + |
| ", client=" + ignite.configuration().isClientMode() + ']'); |
| |
| GridCacheSharedContext cctx = ctx.cache().context(); |
| |
| log.error("Pending transactions:"); |
| |
| for (IgniteInternalTx tx : cctx.tm().activeTransactions()) |
| log.error(">>> " + tx); |
| |
| log.error("Pending explicit locks:"); |
| |
| for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks()) |
| log.error(">>> " + lockSpan); |
| |
| log.error("Pending cache futures:"); |
| |
| for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) |
| log.error(">>> " + fut); |
| |
| log.error("Pending atomic cache futures:"); |
| |
| for (GridCacheFuture<?> fut : cctx.mvcc().atomicFutures()) |
| log.error(">>> " + fut); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Affinity. |
| */ |
| public static <K> Affinity<K> affinity(IgniteCache<K, ?> cache) { |
| return cache.unwrap(Ignite.class).affinity(cache.getName()); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Local node. |
| */ |
| public static ClusterNode localNode(IgniteCache<?, ?> cache) { |
| return cache.unwrap(Ignite.class).cluster().localNode(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @param startFrom Start value for keys search. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Integer> primaryKeys(IgniteCache<?, ?> cache, final int cnt, final int startFrom) { |
| return findKeys(cache, cnt, startFrom, 0); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @param startFrom Start value for keys search. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Integer> findKeys(IgniteCache<?, ?> cache, final int cnt, final int startFrom, final int type) { |
| return findKeys(null, cache, cnt, startFrom, type); |
| } |
| |
| /** |
| * @param node Node. |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @param startFrom Start value for keys search. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Integer> findKeys(@Nullable ClusterNode node, IgniteCache<?, ?> cache, |
| final int cnt, final int startFrom, final int type) { |
| assert cnt > 0 : cnt; |
| |
| final List<Integer> found = new ArrayList<>(cnt); |
| |
| final ClusterNode node0 = node != null ? node : localNode(cache); |
| |
| final Affinity<Integer> aff = (Affinity<Integer>)affinity(cache); |
| |
| try { |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| for (int i = startFrom; i < startFrom + 100_000; i++) { |
| Integer key = i; |
| |
| boolean ok; |
| |
| if (type == 0) |
| ok = aff.isPrimary(node0, key); |
| else if (type == 1) |
| ok = aff.isBackup(node0, key); |
| else if (type == 2) |
| ok = !aff.isPrimaryOrBackup(node0, key); |
| else { |
| fail(); |
| |
| return false; |
| } |
| |
| if (ok) { |
| if (!found.contains(key)) |
| found.add(key); |
| |
| if (found.size() == cnt) |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| }, 5000); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| if (found.size() != cnt) |
| throw new IgniteException("Unable to find " + cnt + " requied keys."); |
| |
| return found; |
| } |
| |
| /** |
| * @param iterable Iterator |
| * @return Set |
| */ |
| protected <K, V> Set<Cache.Entry<K, V>> entrySet(Iterable<Cache.Entry<K, V>> iterable) { |
| Set<Cache.Entry<K, V>> set = new HashSet<>(); |
| |
| for (Cache.Entry<K, V> entry : iterable) |
| set.add(entry); |
| |
| return set; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param part Partition. |
| * @param cnt Count. |
| * @param skipCnt Skip keys from start. |
| * @return List of keys for partition. |
| */ |
| protected List<Integer> partitionKeys(IgniteCache<?, ?> cache, int part, int cnt, int skipCnt) { |
| IgniteCacheProxyImpl proxy = cache.unwrap(IgniteCacheProxyImpl.class); |
| |
| GridCacheContext<?, ?> cctx = proxy.context(); |
| |
| int k = 0, c = 0, skip0 = 0; |
| |
| List<Integer> keys = new ArrayList<>(cnt); |
| |
| while (c < cnt) { |
| if (cctx.affinity().partition(k) == part) { |
| if (skip0 < skipCnt) { |
| k++; |
| skip0++; |
| |
| continue; |
| } |
| |
| c++; |
| |
| keys.add(k); |
| } |
| |
| k++; |
| } |
| |
| return keys; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param part Partition. |
| * @return Unbounded iterator for partition keys. |
| */ |
| protected Iterator<Integer> partitionKeysIterator(IgniteCache<?, ?> cache, int part) { |
| IgniteCacheProxyImpl proxy = cache.unwrap(IgniteCacheProxyImpl.class); |
| |
| GridCacheContext<?, ?> cctx = proxy.context(); |
| |
| return new Iterator<Integer>() { |
| int cur, next = 0; |
| |
| { |
| advance(); |
| } |
| |
| private void advance() { |
| while (cctx.affinity().partition(cur = next++) != part); |
| } |
| |
| @Override public boolean hasNext() { |
| return true; |
| } |
| |
| @Override public Integer next() { |
| int tmp = cur; |
| |
| advance(); |
| |
| return tmp; |
| } |
| }; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @return Collection of keys for which given cache is primary. |
| */ |
| protected List<Integer> primaryKeys(IgniteCache<?, ?> cache, int cnt) { |
| return primaryKeys(cache, cnt, 1); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @param startFrom Start value for keys search. |
| * @return Collection of keys for which given cache is backup. |
| */ |
| protected List<Integer> backupKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) { |
| return findKeys(cache, cnt, startFrom, 1); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param cnt Keys count. |
| * @param startFrom Start value for keys search. |
| * @return Collection of keys for which given cache is neither primary nor backup. |
| */ |
| protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) { |
| return findKeys(cache, cnt, startFrom, 2); |
| } |
| |
| /** |
| * Return list of keys that are primary for given node on current topology, |
| * but primary node will change after new node will be added. |
| * |
| * @param ign Ignite. |
| * @param cacheName Cache name. |
| * @param size Number of keys. |
| * @return List of keys. |
| */ |
| protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size) { |
| return movingKeysAfterJoin(ign, cacheName, size, null, null); |
| } |
| |
| /** |
| * Return list of keys that are primary for given node on current topology, |
| * but primary node will change after new node will be added. |
| * |
| * @param ign Ignite. |
| * @param cacheName Cache name. |
| * @param size Number of keys. |
| * @param nodeInitializer Node initializer closure. |
| * @param joiningNodeConsistentId Joining node consistent id. |
| * @return List of keys. |
| */ |
| protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size, |
| @Nullable IgniteInClosure<ClusterNode> nodeInitializer, @Nullable String joiningNodeConsistentId) { |
| if (joiningNodeConsistentId == null) |
| assertEquals("Expected consistentId is set to node name", ign.name(), ign.cluster().localNode().consistentId()); |
| |
| ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); |
| |
| List<List<ClusterNode>> calcAff = calcAffinity(ign.cache(cacheName), nodes); |
| |
| GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null); |
| |
| if (nodeInitializer != null) |
| nodeInitializer.apply(fakeNode); |
| |
| fakeNode.consistentId(joiningNodeConsistentId == null ? getTestIgniteInstanceName(nodes.size()) : |
| joiningNodeConsistentId); |
| |
| nodes.add(fakeNode); |
| |
| List<List<ClusterNode>> calcAff2 = calcAffinity(ign.cache(cacheName), nodes); |
| |
| Set<Integer> movedParts = new HashSet<>(); |
| |
| UUID locId = ign.cluster().localNode().id(); |
| |
| for (int i = 0; i < calcAff.size(); i++) { |
| if (calcAff.get(i).get(0).id().equals(locId) && !calcAff2.get(i).get(0).id().equals(locId)) |
| movedParts.add(i); |
| } |
| |
| List<Integer> keys = new ArrayList<>(); |
| |
| Affinity<Integer> aff = ign.affinity(cacheName); |
| |
| for (int i = 0; i < 10_000; i++) { |
| int keyPart = aff.partition(i); |
| |
| if (movedParts.contains(keyPart)) { |
| keys.add(i); |
| |
| if (keys.size() == size) |
| break; |
| } |
| } |
| |
| assertEquals("Failed to find moving keys [movedPats=" + movedParts + ", keys=" + keys + ']', size, keys.size()); |
| |
| return keys; |
| } |
| |
| /** |
| * Returns list of partitions what will be evicted after new node's join. |
| * |
| * @param ign Node to find evicting partitions. |
| * @param cache Cach. |
| * @param size Size. |
| * @return List of moving partition |
| */ |
| protected List<Integer> evictingPartitionsAfterJoin(Ignite ign, IgniteCache<?, ?> cache, int size) { |
| ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); |
| |
| List<List<ClusterNode>> ideal1 = calcAffinity(cache, nodes); |
| |
| GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null); |
| |
| fakeNode.consistentId(getTestIgniteInstanceName(nodes.size())); |
| |
| nodes.add(fakeNode); |
| |
| List<List<ClusterNode>> ideal2 = calcAffinity(cache, nodes); |
| |
| Map<ClusterNode, BitSet> m1 = U.newHashMap(nodes.size()); |
| Map<ClusterNode, BitSet> m2 = U.newHashMap(nodes.size()); |
| |
| int parts = cache.getConfiguration(CacheConfiguration.class).getAffinity().partitions(); |
| |
| for (int p = 0; p < parts; p++) { |
| List<ClusterNode> assign1 = new ArrayList<>(ideal1.get(p)); |
| List<ClusterNode> assign2 = new ArrayList<>(ideal2.get(p)); |
| |
| final int finalP = p; |
| |
| IgniteBiInClosure<Map<ClusterNode, BitSet>, ClusterNode> updater = (map, node) -> { |
| BitSet set = map.get(node); |
| |
| if (set == null) |
| map.put(node, (set = new BitSet(parts))); |
| |
| set.set(finalP); |
| }; |
| |
| for (ClusterNode node : assign1) |
| updater.apply(m1, node); |
| |
| for (ClusterNode node : assign2) |
| updater.apply(m2, node); |
| } |
| |
| List<Integer> partsToRet = new ArrayList<>(size); |
| |
| BitSet before = m1.get(ign.cluster().localNode()); |
| BitSet after = m2.get(ign.cluster().localNode()); |
| |
| for (int p = before.nextSetBit(0); p >= 0; p = before.nextSetBit(p + 1)) { |
| if (!after.get(p)) { |
| partsToRet.add(p); |
| |
| if (partsToRet.size() == size) |
| break; |
| } |
| } |
| |
| return partsToRet; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param nodes Nodes. |
| */ |
| private List<List<ClusterNode>> calcAffinity(IgniteCache<?, ?> cache, List<ClusterNode> nodes) { |
| IgniteCacheProxyImpl proxy = cache.unwrap(IgniteCacheProxyImpl.class); |
| |
| GridCacheContext<?, ?> cctx = proxy.context(); |
| |
| AffinityFunction func = cctx.config().getAffinity(); |
| |
| AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl( |
| nodes, |
| null, |
| null, |
| AffinityTopologyVersion.NONE, |
| cctx.config().getBackups()); |
| |
| return func.assignPartitions(ctx); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Collection of keys for which given cache is primary. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected Integer primaryKey(IgniteCache<?, ?> cache) |
| throws IgniteCheckedException { |
| return primaryKeys(cache, 1, 1).get(0); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Keys for which given cache is backup. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected Integer backupKey(IgniteCache<?, ?> cache) |
| throws IgniteCheckedException { |
| return backupKeys(cache, 1, 1).get(0); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Key for which given cache is neither primary nor backup. |
| */ |
| protected Integer nearKey(IgniteCache<?, ?> cache) { |
| return nearKeys(cache, 1, 1).get(0); |
| } |
| |
| /** |
| * @param key Key. |
| */ |
| protected <K, V> V dhtPeek(K key) throws IgniteCheckedException { |
| return localPeek(this.<K, V>dht(), key); |
| } |
| |
| /** |
| * @param idx Index. |
| * @param key Key. |
| */ |
| protected <K, V> V dhtPeek(int idx, K key) throws IgniteCheckedException { |
| return localPeek(this.<K, V>dht(idx), key); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key. |
| */ |
| protected <K, V> V nearPeek(IgniteCache<K, V> cache, K key) throws IgniteCheckedException { |
| return localPeek(near(cache), key); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key. |
| */ |
| protected static <K, V> V dhtPeek(IgniteCache<K, V> cache, K key) throws IgniteCheckedException { |
| return localPeek(dht(cache), key); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key. |
| */ |
| protected static <K, V> V localPeek(GridCacheAdapter<K, V> cache, K key) throws IgniteCheckedException { |
| return cache.localPeek(key, null); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key. |
| */ |
| protected static <K, V> V localPeekOnHeap(GridCacheAdapter<K, V> cache, K key) throws IgniteCheckedException { |
| return cache.localPeek(key, new CachePeekMode[] {CachePeekMode.ONHEAP}); |
| } |
| |
| /** |
| * @param comp Compute. |
| * @param task Task. |
| * @param arg Task argument. |
| * @return Task future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected <R> ComputeTaskFuture<R> executeAsync(IgniteCompute comp, ComputeTask task, @Nullable Object arg) |
| throws IgniteCheckedException { |
| ComputeTaskFuture<R> fut = comp.executeAsync(task, arg); |
| |
| assertNotNull(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @param comp Compute. |
| * @param taskName Task name. |
| * @param arg Task argument. |
| * @return Task future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected <R> ComputeTaskFuture<R> executeAsync(IgniteCompute comp, String taskName, @Nullable Object arg) |
| throws IgniteCheckedException { |
| ComputeTaskFuture<R> fut = comp.executeAsync(taskName, arg); |
| |
| assertNotNull(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @param comp Compute. |
| * @param taskCls Task class. |
| * @param arg Task argument. |
| * @return Task future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| protected <R> ComputeTaskFuture<R> executeAsync(IgniteCompute comp, Class taskCls, @Nullable Object arg) |
| throws IgniteCheckedException { |
| ComputeTaskFuture<R> fut = comp.executeAsync(taskCls, arg); |
| |
| assertNotNull(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @param evts Events. |
| * @param filter Filter. |
| * @param types Events types. |
| * @return Future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected <T extends Event> IgniteFuture<T> waitForLocalEvent(IgniteEvents evts, |
| @Nullable IgnitePredicate<T> filter, @Nullable int... types) throws IgniteCheckedException { |
| IgniteFuture<T> fut = evts.waitForLocalAsync(filter, types); |
| |
| assertNotNull(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @param e Exception. |
| * @param exCls Ex class. |
| */ |
| protected <T extends IgniteException> void assertCacheExceptionWithCause(RuntimeException e, Class<T> exCls) { |
| if (exCls.isAssignableFrom(e.getClass())) |
| return; |
| |
| if (e.getClass() != CacheException.class |
| || e.getCause() == null || !exCls.isAssignableFrom(e.getCause().getClass())) |
| throw e; |
| } |
| |
| /** |
| * @param cache Cache. |
| */ |
| protected <K, V> GridCacheAdapter<K, V> cacheFromCtx(IgniteCache<K, V> cache) { |
| return ((IgniteKernal)cache.unwrap(Ignite.class)).<K, V>internalCache(cache.getName()).context().cache(); |
| } |
| |
| /** |
| * @param ignite Grid. |
| * @return {@link org.apache.ignite.IgniteCompute} for given grid's local node. |
| */ |
| protected IgniteCompute forLocal(Ignite ignite) { |
| return ignite.compute(ignite.cluster().forLocal()); |
| } |
| |
| /** |
| * @param prj Projection. |
| * @return {@link org.apache.ignite.IgniteCompute} for given projection. |
| */ |
| protected IgniteCompute compute(ClusterGroup prj) { |
| return prj.ignite().compute(prj); |
| } |
| |
| /** |
| * @param prj Projection. |
| * @return {@link org.apache.ignite.IgniteMessaging} for given projection. |
| */ |
| protected IgniteMessaging message(ClusterGroup prj) { |
| return prj.ignite().message(prj); |
| } |
| |
| /** |
| * @param prj Projection. |
| * @return {@link org.apache.ignite.IgniteMessaging} for given projection. |
| */ |
| protected IgniteEvents events(ClusterGroup prj) { |
| return prj.ignite().events(prj); |
| } |
| |
| /** |
| * @param cfg Configuration. |
| * @param cacheName Cache name. |
| * @return Cache configuration. |
| */ |
| protected CacheConfiguration cacheConfiguration(IgniteConfiguration cfg, String cacheName) { |
| for (CacheConfiguration ccfg : cfg.getCacheConfiguration()) { |
| if (F.eq(cacheName, ccfg.getName())) |
| return ccfg; |
| } |
| |
| fail("Failed to find cache configuration for cache: " + cacheName); |
| |
| return null; |
| } |
| |
| /** |
| * @param key Key. |
| * @return Near cache for key. |
| */ |
| protected IgniteCache<Integer, Integer> nearCache(Integer key) { |
| List<Ignite> allGrids = Ignition.allGrids(); |
| |
| assertFalse("There are no alive nodes.", F.isEmpty(allGrids)); |
| |
| Affinity<Integer> aff = allGrids.get(0).affinity(DEFAULT_CACHE_NAME); |
| |
| Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key); |
| |
| for (Ignite ignite : allGrids) { |
| if (!nodes.contains(ignite.cluster().localNode())) |
| return ignite.cache(DEFAULT_CACHE_NAME); |
| } |
| |
| fail(); |
| |
| return null; |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return Near cache for key. |
| */ |
| protected <K, V> IgniteCache<K, V> primaryCache(Object key, String cacheName) { |
| return primaryNode(key, cacheName).cache(cacheName); |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return Near cache for key. |
| */ |
| protected IgniteCache<Integer, Integer> backupCache(Integer key, String cacheName) { |
| return backupNode(key, cacheName).cache(cacheName); |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return Ignite instance which has primary cache for given key. |
| */ |
| protected Ignite primaryNode(Object key, String cacheName) { |
| List<Ignite> allGrids = Ignition.allGrids(); |
| |
| assertFalse("There are no alive nodes.", F.isEmpty(allGrids)); |
| |
| Ignite ignite = allGrids.get(0); |
| |
| Affinity<Object> aff = ignite.affinity(cacheName); |
| |
| ClusterNode node = aff.mapKeyToNode(key); |
| |
| assertNotNull("There are no cache affinity nodes", node); |
| |
| return grid(node); |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return Ignite instance which has backup cache for given key. |
| */ |
| protected Ignite backupNode(Object key, String cacheName) { |
| List<Ignite> allGrids = Ignition.allGrids(); |
| |
| assertFalse("There are no alive nodes.", F.isEmpty(allGrids)); |
| |
| Ignite ignite = allGrids.get(0); |
| |
| Affinity<Object> aff = ignite.affinity(cacheName); |
| |
| Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key); |
| |
| assertTrue("Expected more than one node for key [key=" + key + ", nodes=" + nodes + ']', nodes.size() > 1); |
| |
| Iterator<ClusterNode> it = nodes.iterator(); |
| |
| it.next(); // Skip primary. |
| |
| return grid(it.next()); |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return Ignite instances which has backup cache for given key. |
| */ |
| protected List<Ignite> backupNodes(Object key, String cacheName) { |
| List<Ignite> allGrids = Ignition.allGrids(); |
| |
| assertFalse("There are no alive nodes.", F.isEmpty(allGrids)); |
| |
| Ignite ignite = allGrids.get(0); |
| |
| Affinity<Object> aff = ignite.affinity(cacheName); |
| |
| Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key); |
| |
| assertTrue("Expected more than one node for key [key=" + key + ", nodes=" + nodes + ']', nodes.size() > 1); |
| |
| Iterator<ClusterNode> it = nodes.iterator(); |
| |
| it.next(); // Skip primary. |
| |
| List<Ignite> backups = new ArrayList<>(nodes.size() - 1); |
| |
| while (it.hasNext()) |
| backups.add(grid(it.next())); |
| |
| return backups; |
| } |
| |
| /** |
| * @param exp Expected. |
| * @param act Actual. |
| */ |
| protected void assertEqualsCollections(Collection<?> exp, Collection<?> act) { |
| if (exp.size() != act.size()) |
| fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act); |
| |
| Iterator<?> it1 = exp.iterator(); |
| Iterator<?> it2 = act.iterator(); |
| |
| int idx = 0; |
| |
| while (it1.hasNext()) { |
| Object item1 = it1.next(); |
| Object item2 = it2.next(); |
| |
| if (!F.eq(item1, item2)) |
| fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual: " + act); |
| |
| idx++; |
| } |
| } |
| |
| /** |
| * @param ignite Ignite instance. |
| * @param clo Closure. |
| * @return Result of closure execution. |
| * @throws Exception If failed. |
| */ |
| protected <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { |
| return doInTransaction(ignite, PESSIMISTIC, REPEATABLE_READ, clo); |
| } |
| |
| /** |
| * @param ignite Ignite instance. |
| * @param concurrency Transaction concurrency. |
| * @param isolation Transaction isolation. |
| * @param clo Closure. |
| * @return Result of closure execution. |
| * @throws Exception If failed. |
| */ |
| protected static <T> T doInTransaction(Ignite ignite, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation, |
| Callable<T> clo) throws Exception { |
| while (true) { |
| try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { |
| T res = clo.call(); |
| |
| tx.commit(); |
| |
| return res; |
| } |
| catch (CacheException e) { |
| if (e.getCause() instanceof ClusterTopologyException) { |
| ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); |
| |
| topEx.retryReadyFuture().get(); |
| } |
| else |
| throw e; |
| } |
| catch (ClusterTopologyException e) { |
| IgniteFuture<?> fut = e.retryReadyFuture(); |
| |
| fut.get(); |
| } |
| catch (TransactionRollbackException ignore) { |
| // Safe to retry right away. |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected void cleanPersistenceDir() throws Exception { |
| assertTrue("Grids are not stopped", F.isEmpty(G.allGrids())); |
| |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "cp", false)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DataStorageConfiguration.DFLT_MARSHALLER_PATH, false)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, |
| false)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false)); |
| } |
| |
| /** |
| * @param name Instance name. |
| */ |
| protected void cleanPersistenceDir(String name) throws Exception { |
| String dn2DirName = name.replace(".", "_"); |
| |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR + "/" + dn2DirName, true)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR + "/wal/" + dn2DirName, true)); |
| U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR + "/wal/archive/" + dn2DirName, true)); |
| } |
| |
| /** |
| * @param aff Affinity. |
| * @param key Counter. |
| * @param node Target node. |
| * @return Key. |
| */ |
| protected final Integer keyForNode(Affinity<Object> aff, AtomicInteger key, ClusterNode node) { |
| for (int i = 0; i < 100_000; i++) { |
| Integer next = key.getAndIncrement(); |
| |
| if (aff.mapKeyToNode(next).equals(node)) |
| return next; |
| } |
| |
| fail("Failed to find key for node: " + node); |
| |
| return null; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param qry Query. |
| * @return Query plan. |
| */ |
| protected final String queryPlan(IgniteCache<?, ?> cache, SqlFieldsQuery qry) { |
| return (String)cache.query(new SqlFieldsQuery("explain " + qry.getSql()) |
| .setArgs(qry.getArgs()) |
| .setLocal(qry.isLocal()) |
| .setCollocated(qry.isCollocated()) |
| .setPageSize(qry.getPageSize()) |
| .setDistributedJoins(qry.isDistributedJoins()) |
| .setEnforceJoinOrder(qry.isEnforceJoinOrder())) |
| .getAll().get(0).get(0); |
| } |
| |
| /** |
| * @param expData Expected cache data. |
| * @param cacheName Cache name. |
| */ |
| protected final void checkCacheData(Map<?, ?> expData, String cacheName) { |
| assert !expData.isEmpty(); |
| |
| List<Ignite> nodes = G.allGrids(); |
| |
| assertFalse(nodes.isEmpty()); |
| |
| for (Ignite node : nodes) { |
| IgniteCache<Object, Object> cache = node.cache(cacheName); |
| |
| for (Map.Entry<?, ?> e : expData.entrySet()) { |
| assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']', |
| e.getValue(), |
| cache.get(e.getKey())); |
| } |
| } |
| } |
| |
| /** |
| * @param nodesCnt Expected nodes number or {@code -1} to use all nodes. |
| * @throws Exception If failed. |
| */ |
| protected final void checkOnePhaseCommitReturnValuesCleaned(final int nodesCnt) throws Exception { |
| final List<Ignite> nodes; |
| |
| if (nodesCnt == -1) { |
| nodes = G.allGrids(); |
| |
| assertTrue(!nodes.isEmpty()); |
| } |
| else { |
| nodes = new ArrayList<>(nodesCnt); |
| |
| for (int i = 0; i < nodesCnt; i++) |
| nodes.add(grid(i)); |
| } |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| for (Ignite node : nodes) { |
| Map completedVersHashMap = completedTxsMap(node); |
| |
| for (Object o : completedVersHashMap.values()) { |
| if (!(o instanceof Boolean)) |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| }, 5000); |
| |
| for (Ignite node : nodes) { |
| Map completedVersHashMap = completedTxsMap(node); |
| |
| for (Object o : completedVersHashMap.values()) { |
| assertTrue("completedVersHashMap contains " + o.getClass().getName() + " instead of boolean. " + |
| "These values should be replaced by boolean after onePhaseCommit finished. " + |
| "[node=" + node.name() + "]", o instanceof Boolean); |
| } |
| } |
| } |
| |
| /** |
| * Sets baseline topology. |
| */ |
| public void resetBaselineTopology() { |
| Ignite node = G.allGrids().get(0); |
| |
| node.cluster().setBaselineTopology(node.cluster().topologyVersion()); |
| } |
| |
| /** |
| * @param ignite Node. |
| * @return Completed txs map. |
| */ |
| private Map completedTxsMap(Ignite ignite) { |
| IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm(); |
| |
| return U.field(tm, "completedVersHashMap"); |
| } |
| |
| /** |
| * |
| */ |
| protected final void checkCacheDiscoveryDataConsistent() { |
| Map<Integer, CacheGroupDescriptor> cacheGrps = null; |
| Map<String, DynamicCacheDescriptor> caches = null; |
| |
| for (Ignite node : G.allGrids()) { |
| Map<Integer, CacheGroupDescriptor> cacheGrps0 = |
| ((IgniteKernal)node).context().cache().cacheGroupDescriptors(); |
| Map<String, DynamicCacheDescriptor> caches0 = |
| ((IgniteKernal)node).context().cache().cacheDescriptors(); |
| |
| assertNotNull(cacheGrps0); |
| assertNotNull(caches0); |
| |
| if (cacheGrps == null) { |
| cacheGrps = cacheGrps0; |
| caches = caches0; |
| } |
| else { |
| assertEquals(cacheGrps.size(), cacheGrps0.size()); |
| |
| for (Map.Entry<Integer, CacheGroupDescriptor> e : cacheGrps.entrySet()) { |
| CacheGroupDescriptor desc = e.getValue(); |
| CacheGroupDescriptor desc0 = cacheGrps0.get(e.getKey()); |
| |
| assertNotNull(desc0); |
| checkGroupDescriptorsData(desc, desc0); |
| } |
| |
| for (Map.Entry<String, DynamicCacheDescriptor> e : caches.entrySet()) { |
| DynamicCacheDescriptor desc = e.getValue(); |
| DynamicCacheDescriptor desc0 = caches.get(e.getKey()); |
| |
| assertNotNull(desc0); |
| assertEquals(desc.deploymentId(), desc0.deploymentId()); |
| assertEquals(desc.receivedFrom(), desc0.receivedFrom()); |
| assertEquals(desc.startTopologyVersion(), desc0.startTopologyVersion()); |
| assertEquals(desc.cacheConfiguration().getName(), desc0.cacheConfiguration().getName()); |
| assertEquals(desc.cacheConfiguration().getGroupName(), desc0.cacheConfiguration().getGroupName()); |
| checkGroupDescriptorsData(desc.groupDescriptor(), desc0.groupDescriptor()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param desc First descriptor. |
| * @param desc0 Second descriptor. |
| */ |
| private void checkGroupDescriptorsData(CacheGroupDescriptor desc, CacheGroupDescriptor desc0) { |
| assertEquals(desc.groupName(), desc0.groupName()); |
| assertEquals(desc.sharedGroup(), desc0.sharedGroup()); |
| assertEquals(desc.deploymentId(), desc0.deploymentId()); |
| assertEquals(desc.receivedFrom(), desc0.receivedFrom()); |
| assertEquals(desc.startTopologyVersion(), desc0.startTopologyVersion()); |
| assertEquals(desc.config().getName(), desc0.config().getName()); |
| assertEquals(desc.config().getGroupName(), desc0.config().getGroupName()); |
| assertEquals(desc.caches(), desc0.caches()); |
| } |
| |
| /** |
| * Forces checkpoint on all available nodes. |
| * |
| * @throws IgniteCheckedException If checkpoint was failed. |
| */ |
| protected void forceCheckpoint() throws IgniteCheckedException { |
| forceCheckpoint(G.allGrids()); |
| } |
| |
| /** |
| * Forces checkpoint on specified node. |
| * |
| * @param node Node to force checkpoint on it. |
| * @throws IgniteCheckedException If checkpoint was failed. |
| */ |
| protected void forceCheckpoint(Ignite node) throws IgniteCheckedException { |
| forceCheckpoint(Collections.singletonList(node)); |
| } |
| |
| /** |
| * Forces checkpoint on all specified nodes. |
| * |
| * @param nodes Nodes to force checkpoint on them. |
| * @throws IgniteCheckedException If checkpoint was failed. |
| */ |
| protected void forceCheckpoint(Collection<Ignite> nodes) throws IgniteCheckedException { |
| for (Ignite ignite : nodes) { |
| if (ignite.cluster().localNode().isClient()) |
| continue; |
| |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() |
| .cache().context().database(); |
| |
| dbMgr.waitForCheckpoint("test"); |
| } |
| } |
| |
| /** |
| * Compares checksums between primary and backup partitions of specified caches. |
| * Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being |
| * concurrently updated. |
| * |
| * @param ig Ignite instance. |
| * @param caches Cache names (if null, all user caches will be verified). |
| * @return Conflicts result. |
| * @throws IgniteException If none caches or node found. |
| */ |
| protected IdleVerifyResultV2 idleVerify(Ignite ig, @Nullable String... caches) { |
| log.info("Starting idleVerify ..."); |
| |
| IgniteEx ig0 = (IgniteEx)ig; |
| |
| Set<String> cacheNames = new HashSet<>(); |
| |
| if (F.isEmpty(caches)) |
| cacheNames.addAll(ig0.cacheNames()); |
| else |
| Collections.addAll(cacheNames, caches); |
| |
| if (cacheNames.isEmpty()) |
| throw new IgniteException("None cache for checking."); |
| |
| ClusterNode node = !ig0.localNode().isClient() ? ig0.localNode() : ig0.cluster().forServers().forRandom().node(); |
| |
| if (node == null) |
| throw new IgniteException("None server node for verification."); |
| |
| VisorIdleVerifyTaskArg taskArg = new VisorIdleVerifyTaskArg(cacheNames); |
| |
| return ig.compute().execute( |
| VisorIdleVerifyTaskV2.class.getName(), |
| new VisorTaskArgument<>(node.id(), taskArg, false) |
| ); |
| } |
| |
| /** |
| * Checks if all txs and mvcc futures are finished. |
| */ |
| protected void checkFutures() { |
| for (Ignite ignite : G.allGrids()) { |
| IgniteEx ig = (IgniteEx)ignite; |
| |
| final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures(); |
| |
| boolean hasFutures = false; |
| |
| for (GridCacheFuture<?> fut : futs) { |
| if (!fut.isDone()) { |
| log.error("Expecting no active future [node=" + ig.localNode().id() + ", fut=" + fut + ']'); |
| |
| hasFutures = true; |
| } |
| } |
| |
| if (hasFutures) |
| fail("Some cache futures are not finished"); |
| |
| Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions(); |
| |
| for (IgniteInternalTx tx : txs) |
| log.error("Expecting no active transaction [node=" + ig.localNode().id() + ", tx=" + tx + ']'); |
| |
| if (!txs.isEmpty()) |
| fail("Some transaction are not finished"); |
| } |
| } |
| |
| /** |
| * Load data into a single partition. |
| * |
| * @param p Partition. |
| * @param cacheName Cache name. |
| * @param total Total keys count. |
| * @param skip Skip keys from the beginning. |
| * @param putType Optional put type (default putAll). Possible values 0, 1, 2, 3. |
| * |
| * @return List of last keys. |
| */ |
| protected List<Integer> loadDataToPartition(int p, String gridName, String cacheName, int total, int skip, int... putType) { |
| IgniteCache<Integer, Integer> cache = grid(gridName).cache(cacheName); |
| |
| List<Integer> keys = partitionKeys(cache, p, total, skip); |
| |
| int mode = putType != null && putType.length > 0 ? putType[0] : 0; |
| |
| Map<Integer, Integer> map = keys.stream().collect(Collectors.toMap(k -> k, k -> k, (a, b) -> a, TreeMap::new)); |
| |
| switch (mode) { |
| case 0: |
| cache.putAll(map); |
| |
| break; |
| |
| case 1: |
| for (Map.Entry<Integer, Integer> entry : map.entrySet()) |
| cache.put(entry.getKey(), entry.getValue()); |
| |
| break; |
| |
| default: |
| try (IgniteDataStreamer<Integer, Integer> ds = grid(gridName).dataStreamer(cacheName)) { |
| ds.allowOverwrite(mode == 2); |
| |
| ds.addData(map); |
| } |
| |
| break; |
| } |
| |
| return keys; |
| } |
| |
| /** |
| * @param ignite Ignite instance. |
| * @param topVer Topology version to wait. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| protected void waitForServicesReadyTopology(final IgniteEx ignite, |
| final AffinityTopologyVersion topVer) throws IgniteInterruptedCheckedException { |
| if (!(ignite.context().service() instanceof IgniteServiceProcessor)) |
| return; |
| |
| final IgniteServiceProcessor srvcProc = (IgniteServiceProcessor)ignite.context().service(); |
| |
| GridTestUtils.waitForCondition(() -> { |
| AffinityTopologyVersion readyTopVer = srvcProc.deployment().readyTopologyVersion(); |
| |
| return topVer.compareTo(readyTopVer) <= 0; |
| }, SERVICE_DEPLOYMENT_WAIT_TIMEOUT); |
| } |
| |
| /** |
| * @return {@code false} if value of a system property "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED" is "false", |
| * otherwise {@code true}. |
| */ |
| protected static boolean isEventDrivenServiceProcessorEnabled() { |
| return getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true); |
| } |
| |
| /** |
| * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved. |
| * |
| * @param countUpdate Number of events. |
| */ |
| protected void awaitMetricsUpdate(int countUpdate) throws InterruptedException { |
| awaitMetricsUpdate(countUpdate, G.allGrids()); |
| } |
| |
| /** |
| * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved on specific nodes. |
| * |
| * @param countUpdate Number of events. |
| * @param grids Collection of Ignite instances we are listening for metrics update |
| * @throws InterruptedException |
| */ |
| protected void awaitMetricsUpdate(int countUpdate, Collection<Ignite> grids) throws InterruptedException { |
| if (countUpdate > 0) { |
| |
| final CountDownLatch latch = new CountDownLatch(G.allGrids().size() * grids.size() * countUpdate); |
| |
| final IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| assert evt.type() == EventType.EVT_NODE_METRICS_UPDATED; |
| |
| latch.countDown(); |
| |
| return true; |
| } |
| }; |
| |
| for (Ignite g : grids) |
| g.events().localListen(lsnr, EventType.EVT_NODE_METRICS_UPDATED); |
| |
| // Wait for metrics update. |
| assert latch.await(10, TimeUnit.SECONDS); |
| |
| for (Ignite g : grids) |
| g.events().stopLocalListen(lsnr); |
| } |
| } |
| |
| /** |
| * @param partId Partition id. |
| * @param gridName Grid name. |
| * |
| * @return Partition update counter or {@code null} if node is not an owner. |
| */ |
| protected @Nullable PartitionUpdateCounter counter(int partId, String gridName) { |
| @Nullable GridDhtLocalPartition locPart = |
| internalCache(grid(gridName).cache(DEFAULT_CACHE_NAME)).context().topology().localPartition(partId); |
| |
| return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter(); |
| } |
| |
| /** |
| * @param partId Partition id. |
| * @param cacheName Cache name. |
| * @param gridName Grid name. |
| * |
| * @return Partition update counter or {@code null} if node is not an owner. |
| */ |
| protected @Nullable PartitionUpdateCounter counter(int partId, String cacheName, String gridName) { |
| @Nullable GridDhtLocalPartition locPart = |
| internalCache(grid(gridName).cache(cacheName)).context().topology().localPartition(partId); |
| |
| return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter(); |
| } |
| |
| /** |
| * @param res Response. |
| */ |
| protected static void assertPartitionsSame(IdleVerifyResultV2 res) throws AssertionFailedError { |
| if (res.hasConflicts()) { |
| StringBuilder b = new StringBuilder(); |
| |
| res.print(b::append, true); |
| |
| fail(b.toString()); |
| } |
| } |
| |
| /** |
| * @param partId Partition. |
| * @param withReserveCntr {@code True} to compare reserve counters. Because reserve counters are synced during |
| * @param cacheName Cache name. |
| * PME invoking with {@code true} makes sense only after PME was finished. |
| */ |
| protected void assertCountersSame(int partId, boolean withReserveCntr, String cacheName) throws AssertionFailedError { |
| PartitionUpdateCounter cntr0 = null; |
| |
| List<T3<String, @Nullable PartitionUpdateCounter, Boolean>> cntrMap = G.allGrids().stream().filter(ignite -> |
| !ignite.configuration().isClientMode()).map(ignite -> |
| new T3<>(ignite.name(), counter(partId, cacheName, ignite.name()), |
| ignite.affinity(cacheName).isPrimary(ignite.cluster().localNode(), partId))).collect(toList()); |
| |
| for (T3<String, PartitionUpdateCounter, Boolean> cntr : cntrMap) { |
| if (cntr.get2() == null) |
| continue; |
| |
| if (cntr0 != null) { |
| assertEquals("Expecting same counters [partId=" + partId + |
| ", cntrs=" + cntrMap + ']', cntr0, cntr.get2()); |
| |
| if (withReserveCntr) |
| assertEquals("Expecting same reservation counters [partId=" + partId + |
| ", cntrs=" + cntrMap + ']', |
| cntr0.reserved(), cntr.get2().reserved()); |
| } |
| |
| cntr0 = cntr.get2(); |
| } |
| } |
| |
| /** |
| * @param partId Partition. |
| * @param withReserveCntr {@code True} to compare reserve counters. Because reserve counters are synced during |
| * @param cacheName Cache name. |
| * @param cnt Counter. |
| * @param reserved Reserved counter. |
| * PME invoking with {@code true} makes sense only after PME was finished. |
| */ |
| protected void assertCountersAsExpected(int partId, boolean withReserveCntr, String cacheName, long cnt, |
| long reserved) throws AssertionFailedError { |
| List<T3<String, @Nullable PartitionUpdateCounter, Boolean>> cntrMap = G.allGrids().stream().filter(ignite -> |
| !ignite.configuration().isClientMode()).map(ignite -> |
| new T3<>(ignite.name(), counter(partId, cacheName, ignite.name()), |
| ignite.affinity(cacheName).isPrimary(ignite.cluster().localNode(), partId))).collect(toList()); |
| |
| for (T3<String, PartitionUpdateCounter, Boolean> cntr : cntrMap) { |
| if (cntr.get2() == null) |
| continue; |
| |
| assertEquals("Expecting same counters [partId=" + partId + |
| ", cntrs=" + cntrMap + ']', cnt, cntr.get2().get()); |
| |
| if (withReserveCntr) { |
| assertEquals("Expecting same reservation counters [partId=" + partId + |
| ", cntrs=" + cntrMap + ']', |
| reserved, cntr.get2().reserved()); |
| } |
| } |
| } |
| |
| /** |
| * Checks that return types of all registered ignite metrics methods are correct. |
| * Also checks that all classes from {@code namesToCheck} are registered as mbeans. |
| * |
| * @param ignite Ignite instance to collect metrics from. |
| * @param namesToCheck Mbean classes names that must be registered in {@code MBeanServer}. |
| * @throws Exception If failed to obtain mbeans. |
| */ |
| protected void validateMbeans(Ignite ignite, String... namesToCheck) throws Exception { |
| logMbeansValidation(getNotRegisteredMbeans(ignite, namesToCheck), "Not registered mbeans"); |
| logMbeansValidation(getInvalidMbeansMethods(ignite), "Invalid metrics methods"); |
| } |
| |
| /** |
| * @param ignite Ignite instance to collect metrics from. |
| * @param namesToCheck Mbean classes names that must be registered in {@code MBeanServer}. |
| * @return {@code Set} of class names that are contained in {@code namesToCheck} |
| * but not registered in {@code MBeanServer}. |
| */ |
| protected Set<String> getNotRegisteredMbeans(Ignite ignite, String... namesToCheck) { |
| MBeanServer srv = ignite.configuration().getMBeanServer(); |
| |
| Set<String> beancClsNames = srv.queryMBeans(null, null).stream() |
| .map(ObjectInstance::getClassName) |
| .collect(toSet()); |
| |
| return Arrays.stream(namesToCheck) |
| .filter(nameToCheck -> beancClsNames.stream().noneMatch(clsName -> clsName.contains(nameToCheck))) |
| .collect(toSet()); |
| } |
| |
| /** |
| * @param ignite Ignite instance to collect metrics from. |
| * @return {@code Set} of metrics methods that have forbidden return types. |
| * @throws Exception If failed to obtain metrics. |
| */ |
| protected Set<String> getInvalidMbeansMethods(Ignite ignite) throws Exception { |
| Set<String> sysMetricsPackages = new HashSet<>(); |
| sysMetricsPackages.add("sun.management"); |
| sysMetricsPackages.add("javax.management"); |
| |
| MBeanServer srv = ignite.configuration().getMBeanServer(); |
| |
| Set<String> invalidMethods = new HashSet<>(); |
| |
| final Set<ObjectInstance> instances = srv.queryMBeans(null, null); |
| |
| for (ObjectInstance instance: instances) { |
| final String clsName = instance.getClassName(); |
| |
| if (sysMetricsPackages.stream().anyMatch(clsName::startsWith)) |
| continue; |
| |
| Class c; |
| |
| try { |
| c = Class.forName(clsName); |
| } |
| catch (ClassNotFoundException e) { |
| log.warning("Failed to load class: " + clsName); |
| |
| continue; |
| } |
| |
| for (Class interf : c.getInterfaces()) { |
| for (Method m : interf.getMethods()) { |
| if (!m.isAnnotationPresent(MXBeanDescription.class)) |
| continue; |
| |
| if (!validateMetricsMethod(m)) |
| invalidMethods.add(m.toString()); |
| } |
| } |
| } |
| |
| return invalidMethods; |
| } |
| |
| /** */ |
| private void logMbeansValidation(Set<String> invalidSet, String errorMsgPrefix) { |
| if (!invalidSet.isEmpty()) { |
| log.info("****************************************"); |
| log.info(errorMsgPrefix + ":"); |
| |
| invalidSet.stream() |
| .sorted() |
| .forEach(log::info); |
| |
| log.info("****************************************"); |
| |
| fail(errorMsgPrefix + " detected^"); |
| } |
| } |
| |
| /** |
| * Validates return type for metrics method. |
| * Validity rules are not carved in stone and can be changed in future. |
| * See https://issues.apache.org/jira/browse/IGNITE-12629. |
| * |
| * @param m Metric method to check. |
| * @return {@code True} if method return type is allowed. |
| */ |
| private boolean validateMetricsMethod(Method m) { |
| Set<String> primitives = new HashSet<>(); |
| primitives.add("char"); |
| primitives.add("short"); |
| primitives.add("int"); |
| primitives.add("long"); |
| primitives.add("double"); |
| primitives.add("float"); |
| primitives.add("byte"); |
| primitives.add("boolean"); |
| primitives.add("void"); |
| |
| Set<String> allowedPackages = new HashSet<>(); |
| allowedPackages.add("java.lang"); |
| allowedPackages.add("java.util"); |
| |
| final String returnTypeName = m.getGenericReturnType().getTypeName(); |
| |
| if (primitives.stream().anyMatch(type -> type.equals(returnTypeName) || (type + "[]").equals(returnTypeName))) |
| return true; |
| |
| String[] parts = returnTypeName.split("[<>,]"); |
| |
| for (String part: parts) { |
| if (allowedPackages.stream().noneMatch(pack -> part.trim().startsWith(pack))) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Enable checkpoints on a specific nodes. |
| * |
| * @param nodes Ignite nodes. |
| * @param enable {@code True} For checkpoint enabling. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void enableCheckpoints(Collection<Ignite> nodes, boolean enable) throws IgniteCheckedException { |
| GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>(); |
| |
| for (Ignite node : nodes) { |
| assert !node.cluster().localNode().isClient(); |
| |
| IgniteCacheDatabaseSharedManager dbMgr = (((IgniteEx)node).context() |
| .cache().context().database()); |
| |
| assert dbMgr instanceof GridCacheDatabaseSharedManager; |
| |
| GridCacheDatabaseSharedManager dbMgr0 = (GridCacheDatabaseSharedManager) dbMgr; |
| |
| fut.add(dbMgr0.enableCheckpoints(enable)); |
| } |
| |
| fut.markInitialized(); |
| |
| fut.get(); |
| } |
| |
| /** |
| * Enable checkpoints on a specific nodes. |
| * |
| * @param node Ignite node. |
| * @param enable {@code True} For checkpoint enabling. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void enableCheckpoints(Ignite node, boolean enable) throws IgniteCheckedException { |
| enableCheckpoints(Collections.singletonList(node), enable); |
| } |
| |
| /** |
| * Loads data into a cache using stream data source. |
| * |
| * @param ignite Ignite. |
| * @param cache Cache. |
| * @param stream Key-value source stream. |
| */ |
| protected void load(IgniteEx ignite, String cache, Stream<Integer> stream) { |
| try (IgniteDataStreamer<Object, Object> s = ignite.dataStreamer(cache)) { |
| final Iterator<Integer> it = stream.iterator(); |
| |
| while (it.hasNext()) { |
| Integer key = it.next(); |
| |
| s.addData(key, key); |
| } |
| } |
| } |
| |
| /** |
| * Getting WAL manager. |
| * |
| * @param n Node. |
| * @return WAL manger. |
| */ |
| protected static FileWriteAheadLogManager walMgr(IgniteEx n) { |
| return (FileWriteAheadLogManager)n.context().cache().context().wal(); |
| } |
| |
| /** |
| * Disable/enable VAL. |
| * |
| * @param n Node. |
| * @param disable Disable flag. |
| * @throws Exception If failed. |
| */ |
| protected static void disableWal(IgniteEx n, boolean disable) throws Exception { |
| WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext(); |
| |
| setFieldValue(walDisableCtx, "disableWal", disable); |
| |
| assertEquals(disable, walDisableCtx.check()); |
| |
| WALPointer logPtr = walMgr(n).log(new DataRecord(emptyList())); |
| |
| if (disable) |
| assertNull(logPtr); |
| else |
| assertNotNull(logPtr); |
| } |
| |
| /** |
| * Initiate an asynchronous forced index rebuild for caches. |
| * |
| * @param n Node. |
| * @param cacheCtxs Cache contexts. |
| * @return Cache contexts for which index rebuilding is not initiated by |
| * this call because they are already in the process of rebuilding. |
| */ |
| protected Collection<GridCacheContext> forceRebuildIndexes(IgniteEx n, GridCacheContext... cacheCtxs) { |
| return n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtxs)); |
| } |
| |
| /** |
| * Getting rebuild index future for the cache. |
| * |
| * @param n Node. |
| * @param cacheId Cache id. |
| * @return Rebuild index future. |
| */ |
| @Nullable protected IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) { |
| return n.context().query().indexRebuildFuture(cacheId); |
| } |
| |
| /** |
| * Getting cache metrics. |
| * |
| * @param n Node. |
| * @param cacheName Cache name. |
| * @return Cache metrics. |
| */ |
| @Nullable protected CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) { |
| return Optional.ofNullable(n.cachex(cacheName)).map(IgniteInternalCache::context) |
| .map(GridCacheContext::cache).map(GridCacheAdapter::metrics0).orElse(null); |
| } |
| |
| /** |
| * Getting database manager. |
| * |
| * @param n Node. |
| * @return Database manager. |
| */ |
| protected GridCacheDatabaseSharedManager dbMgr(IgniteEx n) { |
| return (GridCacheDatabaseSharedManager)n.context().cache().context().database(); |
| } |
| } |