| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicIntegerArray; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSnapshot; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cache.store.CacheStoreSessionListener; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.TransactionConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.managers.communication.GridIoManager; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; |
| import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; |
| import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; |
| import org.apache.ignite.internal.managers.systemview.ScanQuerySystemView; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRegion; |
| import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; |
| import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; |
| import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; |
| import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; |
| import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; |
| import org.apache.ignite.internal.util.GridIntList; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.PluginProvider; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY; |
| import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; |
| |
| /** |
| * Shared context. |
| */ |
| @GridToStringExclude |
| public class GridCacheSharedContext<K, V> { |
| /** Kernal context. */ |
| private GridKernalContext kernalCtx; |
| |
| /** Managers in starting order. */ |
| private List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); |
| |
| /** Cache transaction manager. */ |
| private IgniteTxManager txMgr; |
| |
| /** JTA manager. */ |
| private CacheJtaManagerAdapter jtaMgr; |
| |
| /** Partition exchange manager. */ |
| private GridCachePartitionExchangeManager<K, V> exchMgr; |
| |
| /** Version manager. */ |
| private GridCacheVersionManager verMgr; |
| |
| /** Lock manager. */ |
| private GridCacheMvccManager mvccMgr; |
| |
| /** IO Manager. */ |
| private GridCacheIoManager ioMgr; |
| |
| /** Deployment manager. */ |
| private GridCacheDeploymentManager<K, V> depMgr; |
| |
| /** Write ahead log manager. {@code Null} if persistence is not enabled. */ |
| @Nullable private IgniteWriteAheadLogManager walMgr; |
| |
| /** Write ahead log state manager. */ |
| private WalStateManager walStateMgr; |
| |
| /** Database manager. */ |
| private IgniteCacheDatabaseSharedManager dbMgr; |
| |
| /** Snapshot manager. */ |
| private IgniteCacheSnapshotManager snpMgr; |
| |
| /** Page store manager. {@code Null} if persistence is not enabled. */ |
| @Nullable private IgnitePageStoreManager pageStoreMgr; |
| |
| /** Snapshot manager for persistence caches. See {@link IgniteSnapshot}. */ |
| private IgniteSnapshotManager snapshotMgr; |
| |
| /** Affinity manager. */ |
| private CacheAffinitySharedManager affMgr; |
| |
| /** Ttl cleanup manager. */ |
| private GridCacheSharedTtlCleanupManager ttlMgr; |
| |
| /** Partitons evict manager. */ |
| private PartitionsEvictManager evictMgr; |
| |
| /** Mvcc caching manager. */ |
| private MvccCachingManager mvccCachingMgr; |
| |
| /** Deadlock detection manager. */ |
| private DeadlockDetectionManager deadlockDetectionMgr; |
| |
| /** Cache contexts map. */ |
| private final ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap; |
| |
| /** Tx metrics. */ |
| private final TransactionMetricsAdapter txMetrics; |
| |
| /** Cache diagnostic manager. */ |
| private CacheDiagnosticManager diagnosticMgr; |
| |
| /** Store session listeners. */ |
| private Collection<CacheStoreSessionListener> storeSesLsnrs; |
| |
| /** Local store count. */ |
| private final AtomicInteger locStoreCnt; |
| |
| /** Indicating whether local store keeps primary only. */ |
| private final boolean locStorePrimaryOnly = IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY); |
| |
| /** */ |
| private final IgniteLogger msgLog; |
| |
| /** */ |
| private final IgniteLogger atomicMsgLog; |
| |
| /** */ |
| private final IgniteLogger txPrepareMsgLog; |
| |
| /** */ |
| private final IgniteLogger txFinishMsgLog; |
| |
| /** */ |
| private final IgniteLogger txLockMsgLog; |
| |
| /** */ |
| private final IgniteLogger txRecoveryMsgLog; |
| |
| /** Concurrent DHT atomic updates counters. */ |
| private AtomicIntegerArray dhtAtomicUpdCnt; |
| |
| /** Rebalance enabled flag. */ |
| private boolean rebalanceEnabled = true; |
| |
| /** */ |
| private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs; |
| |
| /** Cluster is in read-only mode. */ |
| private volatile boolean readOnlyMode; |
| |
| /** |
| * @param kernalCtx Context. |
| * @param txMgr Transaction manager. |
| * @param verMgr Version manager. |
| * @param mvccMgr MVCC manager. |
| * @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled. |
| * @param walMgr WAL manager. {@code Null} if persistence is not enabled. |
| * @param walStateMgr WAL state manager. |
| * @param depMgr Deployment manager. |
| * @param dbMgr Database manager. |
| * @param snpMgr Snapshot manager. |
| * @param exchMgr Exchange manager. |
| * @param affMgr Affinity manager. |
| * @param ioMgr IO manager. |
| * @param ttlMgr Ttl cleanup manager. |
| * @param evictMgr Partitons evict manager. |
| * @param jtaMgr JTA manager. |
| * @param storeSesLsnrs Store session listeners. |
| * @param mvccCachingMgr Mvcc caching manager. |
| * @param deadlockDetectionMgr Deadlock detection manager. |
| */ |
| public GridCacheSharedContext( |
| GridKernalContext kernalCtx, |
| IgniteTxManager txMgr, |
| GridCacheVersionManager verMgr, |
| GridCacheMvccManager mvccMgr, |
| @Nullable IgnitePageStoreManager pageStoreMgr, |
| @Nullable IgniteWriteAheadLogManager walMgr, |
| WalStateManager walStateMgr, |
| IgniteCacheDatabaseSharedManager dbMgr, |
| IgniteSnapshotManager snapshotMgr, |
| IgniteCacheSnapshotManager snpMgr, |
| GridCacheDeploymentManager<K, V> depMgr, |
| GridCachePartitionExchangeManager<K, V> exchMgr, |
| CacheAffinitySharedManager<K, V> affMgr, |
| GridCacheIoManager ioMgr, |
| GridCacheSharedTtlCleanupManager ttlMgr, |
| PartitionsEvictManager evictMgr, |
| CacheJtaManagerAdapter jtaMgr, |
| Collection<CacheStoreSessionListener> storeSesLsnrs, |
| MvccCachingManager mvccCachingMgr, |
| DeadlockDetectionManager deadlockDetectionMgr, |
| CacheDiagnosticManager diagnosticMgr |
| ) { |
| this.kernalCtx = kernalCtx; |
| |
| setManagers( |
| mgrs, |
| txMgr, |
| jtaMgr, |
| verMgr, |
| mvccMgr, |
| pageStoreMgr, |
| walMgr, |
| walStateMgr, |
| dbMgr, |
| snapshotMgr, |
| snpMgr, |
| depMgr, |
| exchMgr, |
| affMgr, |
| ioMgr, |
| ttlMgr, |
| evictMgr, |
| mvccCachingMgr, |
| deadlockDetectionMgr, |
| diagnosticMgr |
| ); |
| |
| this.storeSesLsnrs = storeSesLsnrs; |
| |
| txMetrics = new TransactionMetricsAdapter(kernalCtx); |
| |
| ctxMap = new ConcurrentHashMap<>(); |
| |
| kernalCtx.systemView().registerView(new ScanQuerySystemView<>(ctxMap.values())); |
| |
| locStoreCnt = new AtomicInteger(); |
| |
| if (dbMgr != null && CU.isPersistenceEnabled(kernalCtx.config())) |
| dhtAtomicUpdCnt = new AtomicIntegerArray(kernalCtx.config().getSystemThreadPoolSize()); |
| |
| msgLog = kernalCtx.log(CU.CACHE_MSG_LOG_CATEGORY); |
| atomicMsgLog = kernalCtx.log(CU.ATOMIC_MSG_LOG_CATEGORY); |
| txPrepareMsgLog = kernalCtx.log(CU.TX_MSG_PREPARE_LOG_CATEGORY); |
| txFinishMsgLog = kernalCtx.log(CU.TX_MSG_FINISH_LOG_CATEGORY); |
| txLockMsgLog = kernalCtx.log(CU.TX_MSG_LOCK_LOG_CATEGORY); |
| txRecoveryMsgLog = kernalCtx.log(CU.TX_MSG_RECOVERY_LOG_CATEGORY); |
| |
| stateAwareMgrs = new ArrayList<>(); |
| |
| if (pageStoreMgr != null) |
| stateAwareMgrs.add(pageStoreMgr); |
| |
| if (walMgr != null) |
| stateAwareMgrs.add(walMgr); |
| |
| stateAwareMgrs.add(dbMgr); |
| |
| stateAwareMgrs.add(snpMgr); |
| |
| stateAwareMgrs.add(snapshotMgr); |
| |
| for (PluginProvider prv : kernalCtx.plugins().allProviders()) |
| if (prv instanceof IgniteChangeGlobalStateSupport) |
| stateAwareMgrs.add(((IgniteChangeGlobalStateSupport)prv)); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void activate() throws IgniteCheckedException { |
| long time = System.currentTimeMillis(); |
| |
| for (IgniteChangeGlobalStateSupport mgr : stateAwareMgrs) |
| mgr.onActivate(kernalCtx); |
| |
| if (msgLog.isInfoEnabled()) |
| msgLog.info("Components activation performed in " + (System.currentTimeMillis() - time) + " ms."); |
| } |
| |
| /** |
| * |
| */ |
| public void deactivate() { |
| for (int i = stateAwareMgrs.size() - 1; i >= 0; i--) |
| stateAwareMgrs.get(i).onDeActivate(kernalCtx); |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger messageLogger() { |
| return msgLog; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger atomicMessageLogger() { |
| return atomicMsgLog; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger txPrepareMessageLogger() { |
| return txPrepareMsgLog; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger txFinishMessageLogger() { |
| return txFinishMsgLog; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger txLockMessageLogger() { |
| return txLockMsgLog; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| public IgniteLogger txRecoveryMessageLogger() { |
| return txRecoveryMsgLog; |
| } |
| |
| /** |
| * @return rebalance enabled flag. |
| */ |
| public boolean isRebalanceEnabled() { |
| return this.rebalanceEnabled; |
| } |
| |
| /** |
| * @param rebalanceEnabled rebalance enabled flag. |
| */ |
| public void rebalanceEnabled(boolean rebalanceEnabled) { |
| this.rebalanceEnabled = rebalanceEnabled; |
| |
| if (rebalanceEnabled) |
| enableRebalance(); |
| } |
| |
| /** |
| * Gets all caches having cache proxy and starts on them force rebalance. |
| */ |
| private void enableRebalance() { |
| for (IgniteCacheProxy c : cache().publicCaches()) |
| c.rebalance(); |
| } |
| |
| /** |
| * @param reconnectFut Reconnect future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { |
| GridCacheSharedManager<?, ?> mgr = it.previous(); |
| |
| mgr.onDisconnected(reconnectFut); |
| |
| if (restartOnDisconnect(mgr)) |
| mgr.onKernalStop(true); |
| } |
| |
| for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { |
| GridCacheSharedManager<?, ?> mgr = it.previous(); |
| |
| if (restartOnDisconnect(mgr)) |
| mgr.stop(true); |
| } |
| |
| deactivate(); |
| } |
| |
| /** |
| * @param active Active flag. |
| * @throws IgniteCheckedException If failed. |
| */ |
| void onReconnected(boolean active) throws IgniteCheckedException { |
| List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); |
| |
| setManagers( |
| mgrs, |
| txMgr, |
| jtaMgr, |
| verMgr, |
| mvccMgr, |
| pageStoreMgr, |
| walMgr, |
| walStateMgr, |
| dbMgr, |
| snapshotMgr, |
| snpMgr, |
| new GridCacheDeploymentManager<K, V>(), |
| new GridCachePartitionExchangeManager<K, V>(), |
| affMgr, |
| ioMgr, |
| ttlMgr, |
| evictMgr, |
| mvccCachingMgr, |
| deadlockDetectionMgr, |
| diagnosticMgr |
| ); |
| |
| this.mgrs = mgrs; |
| |
| for (GridCacheSharedManager<K, V> mgr : mgrs) { |
| if (restartOnDisconnect(mgr)) |
| mgr.start(this); |
| |
| mgr.onReconnected(active); |
| } |
| |
| kernalCtx.query().onCacheReconnect(); |
| |
| if (!active) |
| affinity().clearGroupHoldersAndRegistry(); |
| |
| exchMgr.onKernalStart(active, true); |
| } |
| |
| /** |
| * @param mgr Manager. |
| * @return {@code True} if manager is restarted cn reconnect. |
| */ |
| private boolean restartOnDisconnect(GridCacheSharedManager<?, ?> mgr) { |
| return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager; |
| } |
| |
| /** */ |
| @SuppressWarnings("unchecked") |
| private void setManagers( |
| List<GridCacheSharedManager<K, V>> mgrs, |
| IgniteTxManager txMgr, |
| CacheJtaManagerAdapter jtaMgr, |
| GridCacheVersionManager verMgr, |
| GridCacheMvccManager mvccMgr, |
| @Nullable IgnitePageStoreManager pageStoreMgr, |
| IgniteWriteAheadLogManager walMgr, |
| WalStateManager walStateMgr, |
| IgniteCacheDatabaseSharedManager dbMgr, |
| IgniteSnapshotManager snapshotMgr, |
| IgniteCacheSnapshotManager snpMgr, |
| GridCacheDeploymentManager<K, V> depMgr, |
| GridCachePartitionExchangeManager<K, V> exchMgr, |
| CacheAffinitySharedManager affMgr, |
| GridCacheIoManager ioMgr, |
| GridCacheSharedTtlCleanupManager ttlMgr, |
| PartitionsEvictManager evictMgr, |
| MvccCachingManager mvccCachingMgr, |
| DeadlockDetectionManager deadlockDetectionMgr, |
| CacheDiagnosticManager diagnosticMgr |
| ) { |
| this.diagnosticMgr = add(mgrs, diagnosticMgr); |
| this.mvccMgr = add(mgrs, mvccMgr); |
| this.verMgr = add(mgrs, verMgr); |
| this.txMgr = add(mgrs, txMgr); |
| this.pageStoreMgr = add(mgrs, pageStoreMgr); |
| this.walMgr = add(mgrs, walMgr); |
| this.walStateMgr = add(mgrs, walStateMgr); |
| this.dbMgr = add(mgrs, dbMgr); |
| this.snapshotMgr = add(mgrs, snapshotMgr); |
| this.snpMgr = add(mgrs, snpMgr); |
| this.jtaMgr = add(mgrs, jtaMgr); |
| this.depMgr = add(mgrs, depMgr); |
| this.exchMgr = add(mgrs, exchMgr); |
| this.affMgr = add(mgrs, affMgr); |
| this.ioMgr = add(mgrs, ioMgr); |
| this.ttlMgr = add(mgrs, ttlMgr); |
| this.evictMgr = add(mgrs, evictMgr); |
| this.mvccCachingMgr = add(mgrs, mvccCachingMgr); |
| this.deadlockDetectionMgr = add(mgrs, deadlockDetectionMgr); |
| } |
| |
| /** |
| * Gets all cache contexts for local node. |
| * |
| * @return Collection of all cache contexts. |
| */ |
| public Collection<GridCacheContext> cacheContexts() { |
| return (Collection)ctxMap.values(); |
| } |
| |
| /** |
| * @param c Cache context closure. |
| */ |
| void forAllCaches(final IgniteInClosure<GridCacheContext> c) { |
| for (Integer cacheId : ctxMap.keySet()) { |
| ctxMap.computeIfPresent(cacheId, |
| (cacheId1, ctx) -> { |
| c.apply(ctx); |
| |
| return ctx; |
| } |
| ); |
| } |
| } |
| |
| /** |
| * @return Cache processor. |
| */ |
| public GridCacheProcessor cache() { |
| return kernalCtx.cache(); |
| } |
| |
| /** |
| * Adds cache context to shared cache context. |
| * |
| * @param cacheCtx Cache context to add. |
| * @throws IgniteCheckedException If cache ID conflict detected. |
| */ |
| @SuppressWarnings("unchecked") |
| public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException { |
| if (ctxMap.containsKey(cacheCtx.cacheId())) { |
| GridCacheContext<K, V> existing = ctxMap.get(cacheCtx.cacheId()); |
| |
| throw new IgniteCheckedException("Failed to start cache due to conflicting cache ID " + |
| "(change cache name and restart grid) [cacheName=" + cacheCtx.name() + |
| ", conflictingCacheName=" + existing.name() + ']'); |
| } |
| |
| CacheStoreManager mgr = cacheCtx.store(); |
| |
| if (mgr.configured() && mgr.isLocal()) |
| locStoreCnt.incrementAndGet(); |
| |
| ctxMap.put(cacheCtx.cacheId(), cacheCtx); |
| } |
| |
| /** |
| * @param cacheCtx Cache context to remove. |
| */ |
| void removeCacheContext(GridCacheContext cacheCtx) { |
| int cacheId = cacheCtx.cacheId(); |
| |
| ctxMap.remove(cacheId, cacheCtx); |
| |
| CacheStoreManager mgr = cacheCtx.store(); |
| |
| if (mgr.configured() && mgr.isLocal()) |
| locStoreCnt.decrementAndGet(); |
| |
| // Safely clean up the message listeners. |
| ioMgr.removeCacheHandlers(cacheId); |
| } |
| |
| /** |
| * Checks if cache context is closed. |
| * |
| * @param ctx Cache context to check. |
| * @return {@code True} if cache context is closed. |
| */ |
| public boolean closed(GridCacheContext ctx) { |
| return !ctxMap.containsKey(ctx.cacheId()); |
| } |
| |
| /** |
| * @return List of shared context managers in starting order. |
| */ |
| public List<GridCacheSharedManager<K, V>> managers() { |
| return mgrs; |
| } |
| |
| /** |
| * Gets cache context by cache ID. |
| * |
| * @param cacheId Cache ID. |
| * @return Cache context. |
| */ |
| public GridCacheContext<K, V> cacheContext(int cacheId) { |
| return ctxMap.get(cacheId); |
| } |
| |
| /** |
| * Returns cache object context if created or creates new and caches it until cache started. |
| * |
| * @param cacheId Cache id. |
| */ |
| @Nullable public CacheObjectContext cacheObjectContext(int cacheId) throws IgniteCheckedException { |
| GridCacheContext<K, V> ctx = ctxMap.get(cacheId); |
| |
| if (ctx != null) |
| return ctx.cacheObjectContext(); |
| |
| DynamicCacheDescriptor desc = cache().cacheDescriptor(cacheId); |
| |
| return desc != null ? desc.cacheObjectContext(kernalContext().cacheObjects()) : null; |
| } |
| |
| /** |
| * @return Ignite instance name. |
| */ |
| public String igniteInstanceName() { |
| return kernalCtx.igniteInstanceName(); |
| } |
| |
| /** |
| * Gets transactions configuration. |
| * |
| * @return Transactions configuration. |
| */ |
| public TransactionConfiguration txConfig() { |
| return kernalCtx.config().getTransactionConfiguration(); |
| } |
| |
| /** |
| * @return Timeout for initial map exchange before preloading. We make it {@code 4} times |
| * bigger than network timeout by default. |
| */ |
| public long preloadExchangeTimeout() { |
| long t1 = gridConfig().getNetworkTimeout() * 4; |
| long t2 = gridConfig().getNetworkTimeout() * gridConfig().getCacheConfiguration().length * 2; |
| |
| long timeout = Math.max(t1, t2); |
| |
| return timeout < 0 ? Long.MAX_VALUE : timeout; |
| } |
| |
| /** |
| * @return Deployment enabled flag. |
| */ |
| public boolean deploymentEnabled() { |
| return kernalContext().deploy().enabled(); |
| } |
| |
| /** |
| * @return Data center ID. |
| */ |
| public byte dataCenterId() { |
| // Data center ID is same for all caches, so grab the first one. |
| GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts()); |
| |
| return cacheCtx.dataCenterId(); |
| } |
| |
| /** |
| * @return Transactional metrics adapter. |
| */ |
| public TransactionMetricsAdapter txMetrics() { |
| return txMetrics; |
| } |
| |
| /** |
| * Resets tx metrics. |
| */ |
| public void resetTxMetrics() { |
| txMetrics.reset(); |
| } |
| |
| /** |
| * @return Cache transaction manager. |
| */ |
| public IgniteTxManager tm() { |
| return txMgr; |
| } |
| |
| /** |
| * @return JTA manager. |
| */ |
| public CacheJtaManagerAdapter jta() { |
| return jtaMgr; |
| } |
| |
| /** |
| * @return Exchange manager. |
| */ |
| public GridCachePartitionExchangeManager<K, V> exchange() { |
| return exchMgr; |
| } |
| |
| /** |
| * @return Affinity manager. |
| */ |
| public CacheAffinitySharedManager<K, V> affinity() { |
| return affMgr; |
| } |
| |
| /** |
| * @return Lock order manager. |
| */ |
| public GridCacheVersionManager versions() { |
| return verMgr; |
| } |
| |
| /** |
| * @return Lock manager. |
| */ |
| public GridCacheMvccManager mvcc() { |
| return mvccMgr; |
| } |
| |
| /** |
| * @return Database manager. |
| */ |
| public IgniteCacheDatabaseSharedManager database() { |
| return dbMgr; |
| } |
| |
| /** |
| * @return Snapshot manager. |
| */ |
| public IgniteCacheSnapshotManager snapshot() { |
| return snpMgr; |
| } |
| |
| /** |
| * @return Page store manager. {@code Null} if persistence is not enabled. |
| */ |
| @Nullable public IgnitePageStoreManager pageStore() { |
| return pageStoreMgr; |
| } |
| |
| /** |
| * @return Page storage snapshot manager. |
| */ |
| public IgniteSnapshotManager snapshotMgr() { |
| return snapshotMgr; |
| } |
| |
| /** |
| * @return Write ahead log manager. |
| */ |
| @Nullable public IgniteWriteAheadLogManager wal() { |
| return walMgr; |
| } |
| |
| /** |
| * @return WAL state manager. |
| */ |
| public WalStateManager walState() { |
| return walStateMgr; |
| } |
| |
| /** |
| * @return IO manager. |
| */ |
| public GridCacheIoManager io() { |
| return ioMgr; |
| } |
| |
| /** |
| * @return Ttl cleanup manager. |
| */ |
| public GridCacheSharedTtlCleanupManager ttl() { |
| return ttlMgr; |
| } |
| |
| /** |
| * @return Cache deployment manager. |
| */ |
| public GridCacheDeploymentManager<K, V> deploy() { |
| return depMgr; |
| } |
| |
| /** |
| * @return Marshaller. |
| */ |
| public Marshaller marshaller() { |
| return kernalCtx.config().getMarshaller(); |
| } |
| |
| /** |
| * @return Grid configuration. |
| */ |
| public IgniteConfiguration gridConfig() { |
| return kernalCtx.config(); |
| } |
| |
| /** |
| * @return Kernal context. |
| */ |
| public GridKernalContext kernalContext() { |
| return kernalCtx; |
| } |
| |
| /** |
| * @return Grid IO manager. |
| */ |
| public GridIoManager gridIO() { |
| return kernalCtx.io(); |
| } |
| |
| /** |
| * @return Grid deployment manager. |
| */ |
| public GridDeploymentManager gridDeploy() { |
| return kernalCtx.deploy(); |
| } |
| |
| /** |
| * @return Grid event storage manager. |
| */ |
| public GridEventStorageManager gridEvents() { |
| return kernalCtx.event(); |
| } |
| |
| /** |
| * @return Discovery manager. |
| */ |
| public GridDiscoveryManager discovery() { |
| return kernalCtx.discovery(); |
| } |
| |
| /** |
| * @return Timeout processor. |
| */ |
| public GridTimeoutProcessor time() { |
| return kernalCtx.timeout(); |
| } |
| |
| /** |
| * @return Cache mvcc coordinator manager. |
| */ |
| public MvccProcessor coordinators() { |
| return kernalCtx.coordinators(); |
| } |
| |
| /** |
| * @return Partition evict manager. |
| */ |
| public PartitionsEvictManager evict() { |
| return evictMgr; |
| } |
| |
| /** |
| * @return Mvcc transaction enlist caching manager. |
| */ |
| public MvccCachingManager mvccCaching() { |
| return mvccCachingMgr; |
| } |
| |
| /** |
| * @return Diagnostic manager. |
| */ |
| public CacheDiagnosticManager diagnostic() { |
| return diagnosticMgr; |
| } |
| |
| /** |
| * @return Deadlock detection manager. |
| */ |
| public DeadlockDetectionManager deadlockDetectionMgr() { |
| return deadlockDetectionMgr; |
| } |
| |
| /** |
| * @return Node ID. |
| */ |
| public UUID localNodeId() { |
| return kernalCtx.localNodeId(); |
| } |
| |
| /** |
| * @return Local node. |
| */ |
| public ClusterNode localNode() { |
| return kernalCtx.discovery().localNode(); |
| } |
| |
| /** |
| * @return Count of caches with configured local stores. |
| */ |
| public int getLocalStoreCount() { |
| return locStoreCnt.get(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @return Node or {@code null}. |
| */ |
| @Nullable public ClusterNode node(UUID nodeId) { |
| return kernalCtx.discovery().node(nodeId); |
| } |
| |
| /** Indicating whether local store keeps primary only. */ |
| public boolean localStorePrimaryOnly() { |
| return locStorePrimaryOnly; |
| } |
| |
| /** |
| * Gets grid logger for given class. |
| * |
| * @param cls Class to get logger for. |
| * @return IgniteLogger instance. |
| */ |
| public IgniteLogger logger(Class<?> cls) { |
| return kernalCtx.log(cls); |
| } |
| |
| /** |
| * @param category Category. |
| * @return Logger. |
| */ |
| public IgniteLogger logger(String category) { |
| return kernalCtx.log(category); |
| } |
| |
| /** |
| * Captures all ongoing operations that we need to wait before we can proceed to the next topology version. |
| * This method must be called only after |
| * {@link GridDhtPartitionTopology#updateTopologyVersion} |
| * method is called so that all new updates will wait to switch to the new version. |
| * This method will capture: |
| * <ul> |
| * <li>All non-released cache locks</li> |
| * <li>All non-committed transactions (local and remote)</li> |
| * <li>All pending atomic updates</li> |
| * <li>All pending DataStreamer updates</li> |
| * </ul> |
| * |
| * Captured updates are wrapped in a future that will be completed once pending objects are released. |
| * |
| * @param topVer Topology version. |
| * @return {@code true} if waiting was successful. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion topVer) { |
| GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer); |
| |
| f.add(mvcc().finishExplicitLocks(topVer)); |
| f.add(mvcc().finishAtomicUpdates(topVer)); |
| f.add(mvcc().finishDataStreamerUpdates(topVer)); |
| |
| IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer); |
| // To properly track progress of finishing local tx updates we explicitly add this future to compound set. |
| f.add(finishLocalTxsFuture); |
| f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer)); |
| |
| f.markInitialized(); |
| |
| return f; |
| } |
| |
| /** |
| * Captures all prepared operations that we need to wait before we able to perform PME-free switch. |
| * This method must be called only after {@link GridDhtPartitionTopology#updateTopologyVersion} |
| * method is called so that all new updates will wait to switch to the new version. |
| * |
| * Captured updates are wrapped in a future that will be completed once pending objects are released. |
| * |
| * @param topVer Topology version. |
| * @param node Failed node. |
| * @return {@code true} if waiting was successful. |
| */ |
| public IgniteInternalFuture<?> partitionRecoveryFuture(AffinityTopologyVersion topVer, ClusterNode node) { |
| return tm().recoverLocalTxs(topVer, node); |
| } |
| |
| /** |
| * Gets ready future for the next affinity topology version (used in cases when a node leaves grid). |
| * |
| * @param curVer Current topology version (before a node left grid). |
| * @return Ready future. |
| */ |
| public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) { |
| if (curVer == null) |
| return null; |
| |
| AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1); |
| |
| IgniteInternalFuture<?> fut = exchMgr.affinityReadyFuture(nextVer); |
| |
| return fut == null ? new GridFinishedFuture<>() : fut; |
| } |
| |
| /** |
| * @param tx Transaction to check. |
| * @param activeCacheIds Active cache IDs. |
| * @param cacheCtx Cache context. |
| * @return Error message if transactions are incompatible. |
| */ |
| @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridIntList activeCacheIds, |
| GridCacheContext<K, V> cacheCtx) { |
| if (cacheCtx.systemTx() && !tx.system()) |
| return "system cache can be enlisted only in system transaction"; |
| |
| if (!cacheCtx.systemTx() && tx.system()) |
| return "non-system cache can't be enlisted in system transaction"; |
| |
| for (int i = 0; i < activeCacheIds.size(); i++) { |
| int cacheId = activeCacheIds.get(i); |
| |
| GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); |
| |
| if (cacheCtx.systemTx()) { |
| if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) |
| return "system transaction can include only one cache"; |
| } |
| |
| CacheStoreManager store = cacheCtx.store(); |
| CacheStoreManager activeStore = activeCacheCtx.store(); |
| |
| if (store.isLocal() != activeStore.isLocal()) |
| return "caches with local and non-local stores can't be enlisted in one transaction"; |
| |
| if (store.isWriteBehind() != activeStore.isWriteBehind()) |
| return "caches with different write-behind setting can't be enlisted in one transaction"; |
| |
| if (activeCacheCtx.deploymentEnabled() != cacheCtx.deploymentEnabled()) |
| return "caches with enabled and disabled deployment modes can't be enlisted in one transaction"; |
| |
| // If local and write-behind validations passed, this must be true. |
| assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht(); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param ignore Transaction to ignore. |
| * @return Not null topology version if current thread holds lock preventing topology change. |
| */ |
| @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) { |
| long threadId = Thread.currentThread().getId(); |
| |
| AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, ignore); |
| |
| if (topVer == null) |
| topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId); |
| |
| return topVer; |
| } |
| |
| /** |
| * Nulling references to potentially leak-prone objects. |
| */ |
| public void cleanup() { |
| mvccMgr = null; |
| |
| mgrs.clear(); |
| } |
| |
| /** |
| * @param tx Transaction to close. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void endTx(GridNearTxLocal tx) throws IgniteCheckedException { |
| boolean clearThreadMap = txMgr.threadLocalTx(null) == tx; |
| |
| if (clearThreadMap) |
| tx.txState().awaitLastFuture(this); |
| else |
| tx.state(MARKED_ROLLBACK); |
| |
| tx.close(clearThreadMap); |
| } |
| |
| /** |
| * @param tx Transaction to commit. |
| * @return Commit future. |
| */ |
| @SuppressWarnings("unchecked") |
| public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) { |
| GridCacheContext ctx = tx.txState().singleCacheContext(this); |
| |
| if (ctx == null) { |
| tx.txState().awaitLastFuture(this); |
| |
| return tx.commitNearTxLocalAsync(); |
| } |
| else |
| return ctx.cache().commitTxAsync(tx); |
| } |
| |
| /** |
| * @param tx Transaction to rollback. |
| * @return Rollback future. |
| */ |
| public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) { |
| boolean clearThreadMap = txMgr.threadLocalTx(null) == tx; |
| |
| if (clearThreadMap) |
| tx.txState().awaitLastFuture(this); |
| else |
| tx.state(MARKED_ROLLBACK); |
| |
| return tx.rollbackNearTxLocalAsync(clearThreadMap, false); |
| } |
| |
| /** |
| * Suspends transaction. It could be resume later. Supported only for optimistic transactions. |
| * |
| * @param tx Transaction to suspend. |
| * @throws IgniteCheckedException If suspension failed. |
| */ |
| public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.txState().awaitLastFuture(this); |
| |
| tx.suspend(); |
| } |
| |
| /** |
| * Resume transaction if it was previously suspended. |
| * |
| * @param tx Transaction to resume. |
| * @throws IgniteCheckedException If resume failed. |
| */ |
| public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.txState().awaitLastFuture(this); |
| |
| tx.resume(); |
| } |
| |
| /** |
| * @return Store session listeners. |
| */ |
| @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() { |
| return storeSesLsnrs; |
| } |
| |
| /** |
| * @param mgrs Managers list. |
| * @param mgr Manager to add. |
| * @return Added manager. |
| */ |
| @Nullable private <T extends GridCacheSharedManager<K, V>> T add(List<GridCacheSharedManager<K, V>> mgrs, |
| @Nullable T mgr) { |
| if (mgr != null) |
| mgrs.add(mgr); |
| |
| return mgr; |
| } |
| |
| /** |
| * Reset thread-local context for transactional cache. |
| */ |
| public void txContextReset() { |
| mvccMgr.contextReset(); |
| } |
| |
| /** |
| * @param ver DHT atomic update future version. |
| * @return Amount of active DHT atomic updates. |
| */ |
| public int startDhtAtomicUpdate(GridCacheVersion ver) { |
| assert dhtAtomicUpdCnt != null; |
| |
| return dhtAtomicUpdCnt.incrementAndGet(dhtAtomicUpdateIndex(ver)); |
| } |
| |
| /** |
| * @param ver DHT atomic update future version. |
| */ |
| public void finishDhtAtomicUpdate(GridCacheVersion ver) { |
| assert dhtAtomicUpdCnt != null; |
| |
| dhtAtomicUpdCnt.decrementAndGet(dhtAtomicUpdateIndex(ver)); |
| } |
| |
| /** |
| * @param ver Version. |
| * @return Index. |
| */ |
| private int dhtAtomicUpdateIndex(GridCacheVersion ver) { |
| return U.safeAbs(ver.hashCode()) % dhtAtomicUpdCnt.length(); |
| } |
| |
| /** |
| * @return {@code true} if cluster is in read-only mode. |
| */ |
| public boolean readOnlyMode() { |
| return readOnlyMode; |
| } |
| |
| /** |
| * @param readOnlyMode Read-only flag. |
| */ |
| public void readOnlyMode(boolean readOnlyMode) { |
| this.readOnlyMode = readOnlyMode; |
| } |
| |
| /** |
| * For test purposes. |
| * @param txMgr Tx manager. |
| */ |
| public void setTxManager(IgniteTxManager txMgr) { |
| this.txMgr = txMgr; |
| } |
| |
| /** |
| * @return {@code True} if lazy memory allocation enabled. {@code False} otherwise. |
| */ |
| public boolean isLazyMemoryAllocation(@Nullable DataRegion region) { |
| return gridConfig().isClientMode() || region == null || region.config().isLazyMemoryAllocation(); |
| } |
| } |