| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.IdentityHashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import javax.cache.configuration.Factory; |
| import javax.cache.integration.CacheLoader; |
| import javax.cache.integration.CacheWriter; |
| import javax.management.JMException; |
| import javax.management.MBeanServer; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheAtomicWriteOrderMode; |
| import org.apache.ignite.cache.CacheExistsException; |
| import org.apache.ignite.cache.CacheMemoryMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.cache.affinity.AffinityFunctionContext; |
| import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cache.store.CacheStore; |
| import org.apache.ignite.cache.store.CacheStoreSessionListener; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DeploymentMode; |
| import org.apache.ignite.configuration.FileSystemConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.configuration.TransactionConfiguration; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridPerformanceSuggestions; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteComponentType; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteNodeAttributes; |
| import org.apache.ignite.internal.IgniteTransactionsEx; |
| import org.apache.ignite.internal.managers.discovery.CustomEventListener; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridNoStorageCacheMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager; |
| import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.local.GridLocalCache; |
| import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; |
| import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager; |
| import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; |
| import org.apache.ignite.internal.processors.plugin.CachePluginManager; |
| import org.apache.ignite.internal.processors.query.GridQueryProcessor; |
| import org.apache.ignite.internal.util.F0; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CIX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.lifecycle.LifecycleAware; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.marshaller.jdk.JdkMarshaller; |
| import org.apache.ignite.spi.IgniteNodeValidationResult; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; |
| import static org.apache.ignite.IgniteSystemProperties.getBoolean; |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheMode.LOCAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheMode.REPLICATED; |
| import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; |
| import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; |
| import static org.apache.ignite.configuration.CacheConfiguration.DFLT_CACHE_MODE; |
| import static org.apache.ignite.configuration.CacheConfiguration.DFLT_MEMORY_MODE; |
| import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; |
| import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; |
| import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; |
| import static org.apache.ignite.configuration.DeploymentMode.SHARED; |
| import static org.apache.ignite.internal.IgniteComponentType.JTA; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| |
| /** |
| * Cache processor. |
| */ |
| @SuppressWarnings("unchecked") |
| public class GridCacheProcessor extends GridProcessorAdapter { |
| /** Null cache name. */ |
| private static final String NULL_NAME = U.id8(UUID.randomUUID()); |
| |
| /** Shared cache context. */ |
| private GridCacheSharedContext<?, ?> sharedCtx; |
| |
| /** */ |
| private final Map<String, GridCacheAdapter<?, ?>> caches; |
| |
| /** Caches stopped from onKernalStop callback. */ |
| private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>(); |
| |
| /** Map of proxies. */ |
| private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; |
| |
| /** Caches stop sequence. */ |
| private final Deque<String> stopSeq; |
| |
| /** Transaction interface implementation. */ |
| private IgniteTransactionsImpl transactions; |
| |
| /** Pending cache starts. */ |
| private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>(); |
| |
| /** Template configuration add futures. */ |
| private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>(); |
| |
| /** Dynamic caches. */ |
| private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); |
| |
| /** Cache templates. */ |
| private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); |
| |
| /** Must use JDK marshaller since it is used by discovery to fire custom events. */ |
| private Marshaller marshaller = new JdkMarshaller(); |
| |
| /** Count down latch for caches. */ |
| private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); |
| |
| /** */ |
| private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; |
| |
| /** */ |
| private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public GridCacheProcessor(GridKernalContext ctx) { |
| super(ctx); |
| |
| caches = new ConcurrentHashMap<>(); |
| jCacheProxies = new ConcurrentHashMap<>(); |
| stopSeq = new LinkedList<>(); |
| } |
| |
| /** |
| * @param internalCache Internal cache flag. |
| * @param cfg Initializes cache configuration with proper defaults. |
| * @param cacheObjCtx Cache object context. |
| * @throws IgniteCheckedException If configuration is not valid. |
| */ |
| private void initialize(boolean internalCache, CacheConfiguration cfg, CacheObjectContext cacheObjCtx) |
| throws IgniteCheckedException { |
| if (cfg.getCacheMode() == null) |
| cfg.setCacheMode(DFLT_CACHE_MODE); |
| |
| if (cfg.getMemoryMode() == null) |
| cfg.setMemoryMode(DFLT_MEMORY_MODE); |
| |
| if (cfg.getNodeFilter() == null) |
| cfg.setNodeFilter(CacheConfiguration.ALL_NODES); |
| |
| if (cfg.getAffinity() == null) { |
| if (cfg.getCacheMode() == PARTITIONED) { |
| RendezvousAffinityFunction aff = new RendezvousAffinityFunction(); |
| |
| aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); |
| |
| cfg.setAffinity(aff); |
| } |
| else if (cfg.getCacheMode() == REPLICATED) { |
| RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512); |
| |
| aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); |
| |
| cfg.setAffinity(aff); |
| |
| cfg.setBackups(Integer.MAX_VALUE); |
| } |
| else |
| cfg.setAffinity(new LocalAffinityFunction()); |
| } |
| else { |
| if (cfg.getCacheMode() != LOCAL) { |
| if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { |
| RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity(); |
| |
| if (aff.getHashIdResolver() == null) |
| aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); |
| } |
| } |
| } |
| |
| if (cfg.getCacheMode() == REPLICATED) |
| cfg.setBackups(Integer.MAX_VALUE); |
| |
| if (cfg.getAffinityMapper() == null) |
| cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper()); |
| |
| ctx.igfsHelper().preProcessCacheConfiguration(cfg); |
| |
| if (cfg.getRebalanceMode() == null) |
| cfg.setRebalanceMode(ASYNC); |
| |
| if (cfg.getAtomicityMode() == null) |
| cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE); |
| |
| if (cfg.getWriteSynchronizationMode() == null) |
| cfg.setWriteSynchronizationMode(PRIMARY_SYNC); |
| |
| assert cfg.getWriteSynchronizationMode() != null; |
| |
| if (cfg.getAtomicityMode() == ATOMIC) { |
| if (cfg.getAtomicWriteOrderMode() == null) { |
| cfg.setAtomicWriteOrderMode(cfg.getWriteSynchronizationMode() == FULL_SYNC ? |
| CacheAtomicWriteOrderMode.CLOCK : |
| CacheAtomicWriteOrderMode.PRIMARY); |
| } |
| else if (cfg.getWriteSynchronizationMode() != FULL_SYNC && |
| cfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK) { |
| cfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY); |
| |
| U.warn(log, "Automatically set write order mode to PRIMARY for better performance " + |
| "[writeSynchronizationMode=" + cfg.getWriteSynchronizationMode() + ", " + |
| "cacheName=" + U.maskName(cfg.getName()) + ']'); |
| } |
| } |
| |
| if (cfg.getCacheStoreFactory() == null) { |
| Factory<CacheLoader> ldrFactory = cfg.getCacheLoaderFactory(); |
| Factory<CacheWriter> writerFactory = cfg.isWriteThrough() ? cfg.getCacheWriterFactory() : null; |
| |
| if (ldrFactory != null || writerFactory != null) |
| cfg.setCacheStoreFactory(new GridCacheLoaderWriterStoreFactory(ldrFactory, writerFactory)); |
| } |
| else { |
| if (cfg.getCacheLoaderFactory() != null) |
| throw new IgniteCheckedException("Cannot set both cache loaded factory and cache store factory " + |
| "for cache: " + U.maskName(cfg.getName())); |
| |
| if (cfg.getCacheWriterFactory() != null) |
| throw new IgniteCheckedException("Cannot set both cache writer factory and cache store factory " + |
| "for cache: " + U.maskName(cfg.getName())); |
| } |
| } |
| |
| /** |
| * @param cfg Configuration to check for possible performance issues. |
| * @param hasStore {@code True} if store is configured. |
| */ |
| private void suggestOptimizations(CacheConfiguration cfg, boolean hasStore) { |
| GridPerformanceSuggestions perf = ctx.performance(); |
| |
| String msg = "Disable eviction policy (remove from configuration)"; |
| |
| if (cfg.getEvictionPolicy() != null) { |
| perf.add(msg, false); |
| |
| perf.add("Disable synchronized evictions (set 'evictSynchronized' to false)", !cfg.isEvictSynchronized()); |
| } |
| else |
| perf.add(msg, true); |
| |
| if (cfg.getCacheMode() == PARTITIONED) { |
| perf.add("Disable near cache (set 'nearConfiguration' to null)", cfg.getNearConfiguration() == null); |
| |
| if (cfg.getAffinity() != null) |
| perf.add("Decrease number of backups (set 'keyBackups' to 0)", cfg.getBackups() == 0); |
| } |
| |
| // Suppress warning if at least one ATOMIC cache found. |
| perf.add("Enable ATOMIC mode if not using transactions (set 'atomicityMode' to ATOMIC)", |
| cfg.getAtomicityMode() == ATOMIC); |
| |
| // Suppress warning if at least one non-FULL_SYNC mode found. |
| perf.add("Disable fully synchronous writes (set 'writeSynchronizationMode' to PRIMARY_SYNC or FULL_ASYNC)", |
| cfg.getWriteSynchronizationMode() != FULL_SYNC); |
| |
| // Suppress warning if at least one swap is disabled. |
| perf.add("Disable swap store (set 'swapEnabled' to false)", !cfg.isSwapEnabled()); |
| |
| if (hasStore && cfg.isWriteThrough()) |
| perf.add("Enable write-behind to persistent store (set 'writeBehindEnabled' to true)", |
| cfg.isWriteBehindEnabled()); |
| } |
| |
| /** |
| * @param c Ignite configuration. |
| * @param cc Configuration to validate. |
| * @param cacheType Cache type. |
| * @param cfgStore Cache store. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void validate(IgniteConfiguration c, |
| CacheConfiguration cc, |
| CacheType cacheType, |
| @Nullable CacheStore cfgStore) throws IgniteCheckedException { |
| if (cc.getCacheMode() == REPLICATED) { |
| if (cc.getNearConfiguration() != null && |
| ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) { |
| U.warn(log, "Near cache cannot be used with REPLICATED cache, " + |
| "will be ignored [cacheName=" + U.maskName(cc.getName()) + ']'); |
| |
| cc.setNearConfiguration(null); |
| } |
| } |
| |
| if (cc.getCacheMode() == LOCAL && !cc.getAffinity().getClass().equals(LocalAffinityFunction.class)) |
| U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" + |
| U.maskName(cc.getName()) + ']'); |
| |
| if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) |
| assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0"); |
| |
| if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) { |
| if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC) |
| U.warn(log, "Cache write synchronization mode is set to FULL_ASYNC. All single-key 'put' and " + |
| "'remove' operations will return 'null', all 'putx' and 'removex' operations will return" + |
| " 'true' [cacheName=" + U.maskName(cc.getName()) + ']'); |
| } |
| |
| DeploymentMode depMode = c.getDeploymentMode(); |
| |
| if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) && |
| !CU.isSystemCache(cc.getName())) |
| throw new IgniteCheckedException("Cannot start cache in PRIVATE or ISOLATED deployment mode: " + |
| ctx.config().getDeploymentMode()); |
| |
| if (cc.isWriteBehindEnabled()) { |
| if (cfgStore == null) |
| throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " + |
| "for cache: " + U.maskName(cc.getName())); |
| |
| assertParameter(cc.getWriteBehindBatchSize() > 0, "writeBehindBatchSize > 0"); |
| assertParameter(cc.getWriteBehindFlushSize() >= 0, "writeBehindFlushSize >= 0"); |
| assertParameter(cc.getWriteBehindFlushFrequency() >= 0, "writeBehindFlushFrequency >= 0"); |
| assertParameter(cc.getWriteBehindFlushThreadCount() > 0, "writeBehindFlushThreadCount > 0"); |
| |
| if (cc.getWriteBehindFlushSize() == 0 && cc.getWriteBehindFlushFrequency() == 0) |
| throw new IgniteCheckedException("Cannot set both 'writeBehindFlushFrequency' and " + |
| "'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName())); |
| } |
| |
| if (cc.isReadThrough() && cfgStore == null) |
| throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " + |
| "for cache: " + U.maskName(cc.getName())); |
| |
| if (cc.isWriteThrough() && cfgStore == null) |
| throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " + |
| "for cache: " + U.maskName(cc.getName())); |
| |
| long delay = cc.getRebalanceDelay(); |
| |
| if (delay != 0) { |
| if (cc.getCacheMode() != PARTITIONED) |
| U.warn(log, "Rebalance delay is supported only for partitioned caches (will ignore): " + (cc.getName()), |
| "Will ignore rebalance delay for cache: " + U.maskName(cc.getName())); |
| else if (cc.getRebalanceMode() == SYNC) { |
| if (delay < 0) { |
| U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start (node will not wait for " + |
| "rebalancing to be finished): " + U.maskName(cc.getName()), |
| "Node will not wait for rebalance in SYNC mode: " + U.maskName(cc.getName())); |
| } |
| else { |
| U.warn(log, |
| "Using SYNC rebalance mode with rebalance delay (node will wait until rebalancing is " + |
| "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()), |
| "Node will wait until rebalancing is initiated for " + delay + "ms for cache: " + U.maskName(cc.getName())); |
| } |
| } |
| } |
| |
| ctx.igfsHelper().validateCacheConfiguration(cc); |
| |
| switch (cc.getMemoryMode()) { |
| case OFFHEAP_VALUES: { |
| if (cc.getOffHeapMaxMemory() < 0) |
| cc.setOffHeapMaxMemory(0); // Set to unlimited. |
| |
| break; |
| } |
| |
| case OFFHEAP_TIERED: { |
| if (cc.getOffHeapMaxMemory() < 0) |
| cc.setOffHeapMaxMemory(0); // Set to unlimited. |
| |
| break; |
| } |
| |
| case ONHEAP_TIERED: |
| if (cacheType.userCache() && cc.getEvictionPolicy() == null && cc.getOffHeapMaxMemory() >= 0) |
| U.quietAndWarn(log, "Eviction policy not enabled with ONHEAP_TIERED mode for cache " + |
| "(entries will not be moved to off-heap store): " + U.maskName(cc.getName())); |
| |
| break; |
| |
| default: |
| throw new IllegalStateException("Unknown memory mode: " + cc.getMemoryMode()); |
| } |
| |
| if (cc.getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES) { |
| if (GridQueryProcessor.isEnabled(cc)) |
| throw new IgniteCheckedException("Cannot have query indexing enabled while values are stored off-heap. " + |
| "You must either disable query indexing or disable off-heap values only flag for cache: " + |
| U.maskName(cc.getName())); |
| } |
| |
| if (cc.getAtomicityMode() == ATOMIC) |
| assertParameter(cc.getTransactionManagerLookupClassName() == null, |
| "transaction manager can not be used with ATOMIC cache"); |
| } |
| |
| /** |
| * @param ctx Context. |
| * @return DHT managers. |
| */ |
| private List<GridCacheManager> dhtManagers(GridCacheContext ctx) { |
| return F.asList(ctx.store(), ctx.events(), ctx.swap(), ctx.evicts(), ctx.queries(), ctx.continuousQueries(), |
| ctx.dr()); |
| } |
| |
| /** |
| * @param ctx Context. |
| * @return Managers present in both, DHT and Near caches. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| private Collection<GridCacheManager> dhtExcludes(GridCacheContext ctx) { |
| if (ctx.config().getCacheMode() == LOCAL || !isNearEnabled(ctx)) |
| return Collections.emptyList(); |
| else |
| return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store()); |
| } |
| |
| /** |
| * @param cfg Configuration. |
| * @param objs Extra components. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| private void prepare(CacheConfiguration cfg, Collection<Object> objs) throws IgniteCheckedException { |
| prepare(cfg, cfg.getEvictionPolicy(), false); |
| prepare(cfg, cfg.getAffinity(), false); |
| prepare(cfg, cfg.getAffinityMapper(), false); |
| prepare(cfg, cfg.getEvictionFilter(), false); |
| prepare(cfg, cfg.getInterceptor(), false); |
| |
| NearCacheConfiguration nearCfg = cfg.getNearConfiguration(); |
| |
| if (nearCfg != null) |
| prepare(cfg, nearCfg.getNearEvictionPolicy(), true); |
| |
| for (Object obj : objs) |
| prepare(cfg, obj, false); |
| } |
| |
| /** |
| * @param cfg Cache configuration. |
| * @param rsrc Resource. |
| * @param near Near flag. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void prepare(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) throws IgniteCheckedException { |
| if (rsrc != null) { |
| ctx.resource().injectGeneric(rsrc); |
| |
| ctx.resource().injectCacheName(rsrc, cfg.getName()); |
| |
| registerMbean(rsrc, cfg.getName(), near); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| */ |
| private void cleanup(GridCacheContext cctx) { |
| CacheConfiguration cfg = cctx.config(); |
| |
| cleanup(cfg, cfg.getEvictionPolicy(), false); |
| cleanup(cfg, cfg.getAffinity(), false); |
| cleanup(cfg, cfg.getAffinityMapper(), false); |
| cleanup(cfg, cctx.store().configuredStore(), false); |
| |
| if (!CU.isUtilityCache(cfg.getName()) && !CU.isSystemCache(cfg.getName())) |
| unregisterMbean(cctx.cache().mxBean(), cfg.getName(), false); |
| |
| NearCacheConfiguration nearCfg = cfg.getNearConfiguration(); |
| |
| if (nearCfg != null) |
| cleanup(cfg, nearCfg.getNearEvictionPolicy(), true); |
| |
| cctx.cleanup(); |
| } |
| |
| /** |
| * @param cfg Cache configuration. |
| * @param rsrc Resource. |
| * @param near Near flag. |
| */ |
| private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) { |
| if (rsrc != null) { |
| unregisterMbean(rsrc, cfg.getName(), near); |
| |
| try { |
| ctx.resource().cleanupGeneric(rsrc); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to cleanup resource: " + rsrc, e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings( {"unchecked"}) |
| @Override public void start() throws IgniteCheckedException { |
| DeploymentMode depMode = ctx.config().getDeploymentMode(); |
| |
| if (!F.isEmpty(ctx.config().getCacheConfiguration())) { |
| if (depMode != CONTINUOUS && depMode != SHARED) |
| U.warn(log, "Deployment mode for cache is not CONTINUOUS or SHARED " + |
| "(it is recommended that you change deployment mode and restart): " + depMode, |
| "Deployment mode for cache is not CONTINUOUS or SHARED."); |
| } |
| |
| ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, |
| new CustomEventListener<DynamicCacheChangeBatch>() { |
| @Override public void onCustomEvent(ClusterNode snd, |
| DynamicCacheChangeBatch msg, |
| AffinityTopologyVersion topVer) { |
| onCacheChangeRequested(msg, topVer); |
| } |
| }); |
| |
| Set<String> internalCaches = internalCachesNames(); |
| |
| CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); |
| |
| sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, |
| ctx.config().getCacheStoreSessionListenerFactories())); |
| |
| for (int i = 0; i < cfgs.length; i++) { |
| if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName())) |
| continue; |
| |
| cloneCheckSerializable(cfgs[i]); |
| |
| CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]); |
| |
| CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); |
| |
| // Initialize defaults. |
| initialize(internalCaches.contains(maskNull(cfg.getName())), cfg, cacheObjCtx); |
| |
| cfgs[i] = cfg; // Replace original configuration value. |
| |
| String masked = maskNull(cfg.getName()); |
| |
| if (registeredCaches.containsKey(masked)) { |
| String cacheName = cfg.getName(); |
| |
| if (cacheName != null) |
| throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + |
| "assign unique name to each cache): " + U.maskName(cacheName)); |
| else |
| throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + |
| "assign unique name to each cache)."); |
| } |
| |
| CacheType cacheType; |
| |
| if (CU.isUtilityCache(cfg.getName())) |
| cacheType = CacheType.UTILITY; |
| else if (CU.isMarshallerCache(cfg.getName())) |
| cacheType = CacheType.MARSHALLER; |
| else if (internalCaches.contains(maskNull(cfg.getName()))) |
| cacheType = CacheType.INTERNAL; |
| else |
| cacheType = CacheType.USER; |
| |
| boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); |
| |
| DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template, |
| IgniteUuid.randomUuid()); |
| |
| desc.locallyConfigured(true); |
| desc.staticallyConfigured(true); |
| |
| if (!template) { |
| registeredCaches.put(masked, desc); |
| |
| ctx.discovery().setCacheFilter( |
| cfg.getName(), |
| cfg.getNodeFilter(), |
| cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, |
| cfg.getCacheMode()); |
| |
| ctx.discovery().addClientNode(cfg.getName(), |
| ctx.localNodeId(), |
| cfg.getNearConfiguration() != null); |
| |
| if (!cacheType.userCache()) |
| stopSeq.addLast(cfg.getName()); |
| else |
| stopSeq.addFirst(cfg.getName()); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Use cache configuration as template: " + cfg); |
| |
| registeredTemplates.put(masked, desc); |
| } |
| |
| if (cfg.getName() == null) { // Use cache configuration with null name as template. |
| DynamicCacheDescriptor desc0 = |
| new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid()); |
| |
| desc0.locallyConfigured(true); |
| desc0.staticallyConfigured(true); |
| |
| registeredTemplates.put(masked, desc0); |
| } |
| } |
| |
| // Start shared managers. |
| for (GridCacheSharedManager mgr : sharedCtx.managers()) |
| mgr.start(sharedCtx); |
| |
| transactions = new IgniteTransactionsImpl(sharedCtx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Started cache processor."); |
| } |
| |
| /** |
| * @return Internal caches names. |
| */ |
| private Set<String> internalCachesNames() { |
| // Internal caches which should not be returned to user. |
| Set<String> internalCaches = new HashSet<>(); |
| |
| FileSystemConfiguration[] igfsCfgs = ctx.grid().configuration().getFileSystemConfiguration(); |
| |
| if (igfsCfgs != null) { |
| for (FileSystemConfiguration igfsCfg : igfsCfgs) { |
| internalCaches.add(maskNull(igfsCfg.getMetaCacheName())); |
| internalCaches.add(maskNull(igfsCfg.getDataCacheName())); |
| } |
| } |
| |
| if (IgniteComponentType.HADOOP.inClassPath()) |
| internalCaches.add(CU.SYS_CACHE_HADOOP_MR); |
| |
| internalCaches.add(CU.ATOMICS_CACHE_NAME); |
| |
| return internalCaches; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void onKernalStart() throws IgniteCheckedException { |
| try { |
| ClusterNode locNode = ctx.discovery().localNode(); |
| |
| if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { |
| for (ClusterNode n : ctx.discovery().remoteNodes()) { |
| if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) |
| continue; |
| |
| checkTransactionConfiguration(n); |
| |
| DeploymentMode locDepMode = ctx.config().getDeploymentMode(); |
| DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); |
| |
| CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", |
| locDepMode, rmtDepMode, true); |
| |
| for (DynamicCacheDescriptor desc : registeredCaches.values()) { |
| CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); |
| |
| if (rmtCfg != null) { |
| CacheConfiguration locCfg = desc.cacheConfiguration(); |
| |
| checkCache(locCfg, rmtCfg, n, desc); |
| |
| // Check plugin cache configurations. |
| CachePluginManager pluginMgr = desc.pluginManager(); |
| |
| pluginMgr.validateRemotes(rmtCfg, n); |
| } |
| } |
| } |
| } |
| |
| // Start dynamic caches received from collect discovery data. |
| for (DynamicCacheDescriptor desc : registeredCaches.values()) { |
| if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName())) |
| continue; |
| |
| boolean started = desc.onStart(); |
| |
| assert started : "Failed to change started flag for locally configured cache: " + desc; |
| |
| desc.clearRemoteConfigurations(); |
| |
| CacheConfiguration ccfg = desc.cacheConfiguration(); |
| |
| IgnitePredicate filter = ccfg.getNodeFilter(); |
| |
| boolean loc = desc.locallyConfigured(); |
| |
| if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { |
| CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); |
| |
| CachePluginManager pluginMgr = desc.pluginManager(); |
| |
| GridCacheContext ctx = createCache( |
| ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); |
| |
| ctx.dynamicDeploymentId(desc.deploymentId()); |
| |
| sharedCtx.addCacheContext(ctx); |
| |
| GridCacheAdapter cache = ctx.cache(); |
| |
| String name = ccfg.getName(); |
| |
| caches.put(maskNull(name), cache); |
| |
| startCache(cache); |
| |
| jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); |
| } |
| } |
| } |
| finally { |
| cacheStartedLatch.countDown(); |
| } |
| |
| if (!ctx.config().isDaemon()) |
| ctx.marshallerContext().onMarshallerCacheStarted(ctx); |
| |
| marshallerCache().context().preloader().initialRebalanceFuture().listen(new CIX1<IgniteInternalFuture<?>>() { |
| @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { |
| ctx.marshallerContext().onMarshallerCachePreloaded(ctx); |
| } |
| }); |
| |
| // Must call onKernalStart on shared managers after creation of fetched caches. |
| for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) |
| mgr.onKernalStart(false); |
| |
| for (GridCacheAdapter<?, ?> cache : caches.values()) |
| onKernalStart(cache); |
| |
| // Wait for caches in SYNC preload mode. |
| for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { |
| GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); |
| |
| if (cache != null) { |
| if (cfg.getRebalanceMode() == SYNC) { |
| if (cfg.getCacheMode() == REPLICATED || |
| (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { |
| boolean utilityCache = CU.isUtilityCache(cache.name()); |
| |
| if (utilityCache || CU.isMarshallerCache(cache.name())) { |
| cache.preloader().initialRebalanceFuture().get(); |
| |
| if (utilityCache) |
| ctx.cacheObjects().onUtilityCacheStarted(); |
| } |
| else |
| cache.preloader().syncFuture().get(); |
| } |
| } |
| } |
| } |
| |
| assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started"; |
| assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| for (String cacheName : stopSeq) { |
| GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName)); |
| |
| if (cache != null) |
| stopCache(cache, cancel); |
| } |
| |
| for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) { |
| if (cache == stoppedCaches.remove(maskNull(cache.name()))) |
| stopCache(cache, cancel); |
| } |
| |
| List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); |
| |
| for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { |
| GridCacheSharedManager<?, ?> mgr = it.previous(); |
| |
| mgr.stop(cancel); |
| } |
| |
| CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners()); |
| |
| sharedCtx.cleanup(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Stopped cache processor."); |
| } |
| |
| /** |
| * Blocks all available gateways |
| */ |
| public void blockGateways() { |
| for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) |
| proxy.gate().onStopped(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void onKernalStop(boolean cancel) { |
| cacheStartedLatch.countDown(); |
| |
| GridCachePartitionExchangeManager<Object, Object> exch = context().exchange(); |
| |
| // Stop exchange manager first so that we call onKernalStop on all caches. |
| // No new caches should be added after this point. |
| exch.onKernalStop(cancel); |
| |
| for (String cacheName : stopSeq) { |
| GridCacheAdapter<?, ?> cache = caches.remove(maskNull(cacheName)); |
| |
| if (cache != null) { |
| stoppedCaches.put(maskNull(cacheName), cache); |
| |
| onKernalStop(cache, cancel); |
| } |
| } |
| |
| for (Map.Entry<String, GridCacheAdapter<?, ?>> entry : caches.entrySet()) { |
| GridCacheAdapter<?, ?> cache = entry.getValue(); |
| |
| if (cache == caches.remove(entry.getKey())) { |
| stoppedCaches.put(entry.getKey(), cache); |
| |
| onKernalStop(entry.getValue(), cancel); |
| } |
| } |
| |
| cancelFutures(); |
| |
| List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); |
| |
| for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size()); |
| it.hasPrevious();) { |
| GridCacheSharedManager<?, ?> mgr = it.previous(); |
| |
| if (mgr != exch) |
| mgr.onKernalStop(cancel); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| cachesOnDisconnect = new HashMap<>(registeredCaches); |
| |
| IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( |
| ctx.cluster().clientReconnectFuture(), |
| "Failed to execute dynamic cache change request, client node disconnected."); |
| |
| for (IgniteInternalFuture fut : pendingFuts.values()) |
| ((GridFutureAdapter)fut).onDone(err); |
| |
| for (IgniteInternalFuture fut : pendingTemplateFuts.values()) |
| ((GridFutureAdapter)fut).onDone(err); |
| |
| for (GridCacheAdapter cache : caches.values()) { |
| GridCacheContext cctx = cache.context(); |
| |
| cctx.gate().onDisconnected(reconnectFut); |
| |
| List<GridCacheManager> mgrs = cache.context().managers(); |
| |
| for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { |
| GridCacheManager mgr = it.previous(); |
| |
| mgr.onDisconnected(reconnectFut); |
| } |
| } |
| |
| sharedCtx.onDisconnected(reconnectFut); |
| |
| registeredCaches.clear(); |
| |
| registeredTemplates.clear(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { |
| List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); |
| |
| GridCompoundFuture<?, ?> stopFut = null; |
| |
| for (final GridCacheAdapter cache : caches.values()) { |
| String name = cache.name(); |
| |
| boolean stopped; |
| |
| boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); |
| |
| if (!sysCache) { |
| DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name)); |
| |
| assert oldDesc != null : "No descriptor for cache: " + name; |
| |
| DynamicCacheDescriptor newDesc = registeredCaches.get(maskNull(name)); |
| |
| stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); |
| } |
| else |
| stopped = false; |
| |
| if (stopped) { |
| cache.context().gate().reconnected(true); |
| |
| sharedCtx.removeCacheContext(cache.ctx); |
| |
| caches.remove(maskNull(cache.name())); |
| jCacheProxies.remove(maskNull(cache.name())); |
| |
| IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable() { |
| @Override public void run() { |
| onKernalStop(cache, true); |
| stopCache(cache, true); |
| } |
| }); |
| |
| if (stopFut == null) |
| stopFut = new GridCompoundFuture<>(); |
| |
| stopFut.add((IgniteInternalFuture)fut); |
| } |
| else { |
| cache.onReconnected(); |
| |
| reconnected.add(cache); |
| } |
| } |
| |
| if (clientReconnectReqs != null) { |
| for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet()) |
| processClientReconnectData(e.getKey(), e.getValue()); |
| |
| clientReconnectReqs = null; |
| } |
| |
| sharedCtx.onReconnected(); |
| |
| for (GridCacheAdapter cache : reconnected) |
| cache.context().gate().reconnected(false); |
| |
| cachesOnDisconnect = null; |
| |
| if (stopFut != null) |
| stopFut.markInitialized(); |
| |
| return stopFut; |
| } |
| |
| /** |
| * @param cache Cache to start. |
| * @throws IgniteCheckedException If failed to start cache. |
| */ |
| @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) |
| private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { |
| GridCacheContext<?, ?> cacheCtx = cache.context(); |
| |
| ctx.query().onCacheStart(cacheCtx); |
| ctx.continuous().onCacheStart(cacheCtx); |
| |
| CacheConfiguration cfg = cacheCtx.config(); |
| |
| // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set. |
| if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY |
| && !(ctx.config().getMarshaller() instanceof BinaryMarshaller)) |
| U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " + |
| "BinaryMarshaller is not used"); |
| |
| // Start managers. |
| for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) |
| mgr.start(cacheCtx); |
| |
| cacheCtx.initConflictResolver(); |
| |
| if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { |
| GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); |
| |
| // Start DHT managers. |
| for (GridCacheManager mgr : dhtManagers(dhtCtx)) |
| mgr.start(dhtCtx); |
| |
| dhtCtx.initConflictResolver(); |
| |
| // Start DHT cache. |
| dhtCtx.cache().start(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Started DHT cache: " + dhtCtx.cache().name()); |
| } |
| |
| cacheCtx.cache().start(); |
| |
| cacheCtx.onStarted(); |
| |
| if (log.isInfoEnabled()) |
| log.info("Started cache [name=" + U.maskName(cfg.getName()) + ", mode=" + cfg.getCacheMode() + ']'); |
| } |
| |
| /** |
| * @param cache Cache to stop. |
| * @param cancel Cancel flag. |
| */ |
| @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) |
| private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel) { |
| GridCacheContext ctx = cache.context(); |
| |
| sharedCtx.removeCacheContext(ctx); |
| |
| cache.stop(); |
| |
| if (isNearEnabled(ctx)) { |
| GridDhtCacheAdapter dht = ctx.near().dht(); |
| |
| // Check whether dht cache has been started. |
| if (dht != null) { |
| dht.stop(); |
| |
| GridCacheContext<?, ?> dhtCtx = dht.context(); |
| |
| List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx); |
| |
| for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) { |
| GridCacheManager mgr = it.previous(); |
| |
| mgr.stop(cancel); |
| } |
| } |
| } |
| |
| List<GridCacheManager> mgrs = ctx.managers(); |
| |
| Collection<GridCacheManager> excludes = dhtExcludes(ctx); |
| |
| // Reverse order. |
| for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { |
| GridCacheManager mgr = it.previous(); |
| |
| if (!excludes.contains(mgr)) |
| mgr.stop(cancel); |
| } |
| |
| ctx.kernalContext().query().onCacheStop(ctx); |
| ctx.kernalContext().continuous().onCacheStop(ctx); |
| |
| U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore())); |
| |
| if (log.isInfoEnabled()) |
| log.info("Stopped cache: " + cache.name()); |
| |
| cleanup(ctx); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to wait. |
| */ |
| public void awaitStarted() throws IgniteCheckedException { |
| U.await(cacheStartedLatch); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { |
| GridCacheContext<?, ?> ctx = cache.context(); |
| |
| // Start DHT cache as well. |
| if (isNearEnabled(ctx)) { |
| GridDhtCacheAdapter dht = ctx.near().dht(); |
| |
| GridCacheContext<?, ?> dhtCtx = dht.context(); |
| |
| for (GridCacheManager mgr : dhtManagers(dhtCtx)) |
| mgr.onKernalStart(); |
| |
| dht.onKernalStart(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name()); |
| } |
| |
| for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx)))) |
| mgr.onKernalStart(); |
| |
| cache.onKernalStart(); |
| |
| if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) |
| ctx.events().addEvent(EventType.EVT_CACHE_STARTED); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" + |
| cache.configuration().getCacheMode() + ']'); |
| } |
| |
| /** |
| * @param cache Cache to stop. |
| * @param cancel Cancel flag. |
| */ |
| @SuppressWarnings("unchecked") |
| private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) { |
| GridCacheContext ctx = cache.context(); |
| |
| if (isNearEnabled(ctx)) { |
| GridDhtCacheAdapter dht = ctx.near().dht(); |
| |
| if (dht != null) { |
| GridCacheContext<?, ?> dhtCtx = dht.context(); |
| |
| for (GridCacheManager mgr : dhtManagers(dhtCtx)) |
| mgr.onKernalStop(cancel); |
| |
| dht.onKernalStop(); |
| } |
| } |
| |
| List<GridCacheManager> mgrs = ctx.managers(); |
| |
| Collection<GridCacheManager> excludes = dhtExcludes(ctx); |
| |
| // Reverse order. |
| for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) { |
| GridCacheManager mgr = it.previous(); |
| |
| if (!excludes.contains(mgr)) |
| mgr.onKernalStop(cancel); |
| } |
| |
| cache.onKernalStop(); |
| |
| if (ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED)) |
| ctx.events().addEvent(EventType.EVT_CACHE_STOPPED); |
| } |
| |
| /** |
| * @param cfg Cache configuration to use to create cache. |
| * @param pluginMgr Cache plugin manager. |
| * @param cacheType Cache type. |
| * @param cacheObjCtx Cache object context. |
| * @param updatesAllowed Updates allowed flag. |
| * @return Cache context. |
| * @throws IgniteCheckedException If failed to create cache. |
| */ |
| private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, |
| @Nullable CachePluginManager pluginMgr, |
| CacheType cacheType, |
| CacheObjectContext cacheObjCtx, |
| boolean updatesAllowed) |
| throws IgniteCheckedException |
| { |
| assert cfg != null; |
| |
| if (cfg.getCacheStoreFactory() instanceof GridCacheLoaderWriterStoreFactory) { |
| GridCacheLoaderWriterStoreFactory factory = (GridCacheLoaderWriterStoreFactory)cfg.getCacheStoreFactory(); |
| |
| prepare(cfg, factory.loaderFactory(), false); |
| prepare(cfg, factory.writerFactory(), false); |
| } |
| else |
| prepare(cfg, cfg.getCacheStoreFactory(), false); |
| |
| |
| CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; |
| |
| validate(ctx.config(), cfg, cacheType, cfgStore); |
| |
| if (pluginMgr == null) |
| pluginMgr = new CachePluginManager(ctx, cfg); |
| |
| pluginMgr.validate(); |
| |
| sharedCtx.jta().registerCache(cfg); |
| |
| // Skip suggestions for internal caches. |
| if (cacheType.userCache()) |
| suggestOptimizations(cfg, cfgStore != null); |
| |
| Collection<Object> toPrepare = new ArrayList<>(); |
| |
| if (cfgStore instanceof GridCacheLoaderWriterStore) { |
| toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader()); |
| toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer()); |
| } |
| else |
| toPrepare.add(cfgStore); |
| |
| prepare(cfg, toPrepare); |
| |
| U.startLifecycleAware(lifecycleAwares(cfg, cfgStore)); |
| |
| GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); |
| GridCacheEventManager evtMgr = new GridCacheEventManager(); |
| GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || |
| !GridCacheUtils.isNearEnabled(cfg)); |
| GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); |
| GridCacheQueryManager qryMgr = queryManager(cfg); |
| CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); |
| CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); |
| GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); |
| |
| CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class); |
| GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); |
| CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class); |
| |
| storeMgr.initialize(cfgStore, sesHolders); |
| |
| GridCacheContext<?, ?> cacheCtx = new GridCacheContext( |
| ctx, |
| sharedCtx, |
| cfg, |
| cacheType, |
| ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), |
| updatesAllowed, |
| |
| /* |
| * Managers in starting order! |
| * =========================== |
| */ |
| evtMgr, |
| swapMgr, |
| storeMgr, |
| evictMgr, |
| qryMgr, |
| contQryMgr, |
| dataStructuresMgr, |
| ttlMgr, |
| drMgr, |
| rslvrMgr, |
| pluginMgr, |
| affMgr |
| ); |
| |
| cacheCtx.cacheObjectContext(cacheObjCtx); |
| |
| GridCacheAdapter cache = null; |
| |
| switch (cfg.getCacheMode()) { |
| case LOCAL: { |
| switch (cfg.getAtomicityMode()) { |
| case TRANSACTIONAL: { |
| cache = new GridLocalCache(cacheCtx); |
| |
| break; |
| } |
| case ATOMIC: { |
| cache = new GridLocalAtomicCache(cacheCtx); |
| |
| break; |
| } |
| |
| default: { |
| assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); |
| } |
| } |
| |
| break; |
| } |
| case PARTITIONED: |
| case REPLICATED: { |
| if (GridCacheUtils.isNearEnabled(cfg)) { |
| switch (cfg.getAtomicityMode()) { |
| case TRANSACTIONAL: { |
| cache = new GridNearTransactionalCache(cacheCtx); |
| |
| break; |
| } |
| case ATOMIC: { |
| cache = new GridNearAtomicCache(cacheCtx); |
| |
| break; |
| } |
| |
| default: { |
| assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); |
| } |
| } |
| } |
| else { |
| switch (cfg.getAtomicityMode()) { |
| case TRANSACTIONAL: { |
| cache = cacheCtx.affinityNode() ? |
| new GridDhtColocatedCache(cacheCtx) : |
| new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); |
| |
| break; |
| } |
| case ATOMIC: { |
| cache = cacheCtx.affinityNode() ? |
| new GridDhtAtomicCache(cacheCtx) : |
| new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); |
| |
| break; |
| } |
| |
| default: { |
| assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); |
| } |
| } |
| } |
| |
| break; |
| } |
| |
| default: { |
| assert false : "Invalid cache mode: " + cfg.getCacheMode(); |
| } |
| } |
| |
| cacheCtx.cache(cache); |
| |
| GridCacheContext<?, ?> ret = cacheCtx; |
| |
| /* |
| * Create DHT cache. |
| * ================ |
| */ |
| if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { |
| /* |
| * Specifically don't create the following managers |
| * here and reuse the one from Near cache: |
| * 1. GridCacheVersionManager |
| * 2. GridCacheIoManager |
| * 3. GridCacheDeploymentManager |
| * 4. GridCacheQueryManager (note, that we start it for DHT cache though). |
| * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). |
| * 6. GridCacheDgcManager |
| * 7. GridCacheTtlManager. |
| * =============================================== |
| */ |
| swapMgr = new GridCacheSwapManager(true); |
| evictMgr = new GridCacheEvictionManager(); |
| evtMgr = new GridCacheEventManager(); |
| pluginMgr = new CachePluginManager(ctx, cfg); |
| drMgr = pluginMgr.createComponent(GridCacheDrManager.class); |
| |
| cacheCtx = new GridCacheContext( |
| ctx, |
| sharedCtx, |
| cfg, |
| cacheType, |
| ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), |
| true, |
| |
| /* |
| * Managers in starting order! |
| * =========================== |
| */ |
| evtMgr, |
| swapMgr, |
| storeMgr, |
| evictMgr, |
| qryMgr, |
| contQryMgr, |
| dataStructuresMgr, |
| ttlMgr, |
| drMgr, |
| rslvrMgr, |
| pluginMgr, |
| affMgr |
| ); |
| |
| cacheCtx.cacheObjectContext(cacheObjCtx); |
| |
| GridDhtCacheAdapter dht = null; |
| |
| switch (cfg.getAtomicityMode()) { |
| case TRANSACTIONAL: { |
| assert cache instanceof GridNearTransactionalCache; |
| |
| GridNearTransactionalCache near = (GridNearTransactionalCache)cache; |
| |
| GridDhtCache dhtCache = cacheCtx.affinityNode() ? |
| new GridDhtCache(cacheCtx) : |
| new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); |
| |
| dhtCache.near(near); |
| |
| near.dht(dhtCache); |
| |
| dht = dhtCache; |
| |
| break; |
| } |
| case ATOMIC: { |
| assert cache instanceof GridNearAtomicCache; |
| |
| GridNearAtomicCache near = (GridNearAtomicCache)cache; |
| |
| GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ? |
| new GridDhtAtomicCache(cacheCtx) : |
| new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); |
| |
| dhtCache.near(near); |
| |
| near.dht(dhtCache); |
| |
| dht = dhtCache; |
| |
| break; |
| } |
| |
| default: { |
| assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); |
| } |
| } |
| |
| cacheCtx.cache(dht); |
| } |
| |
| if (!CU.isUtilityCache(cache.name()) && !CU.isSystemCache(cache.name())) |
| registerMbean(cache.mxBean(), cache.name(), false); |
| |
| return ret; |
| } |
| |
| /** |
| * Gets a collection of currently started caches. |
| * |
| * @return Collection of started cache names. |
| */ |
| public Collection<String> cacheNames() { |
| return F.viewReadOnly(registeredCaches.values(), |
| new IgniteClosure<DynamicCacheDescriptor, String>() { |
| @Override public String apply(DynamicCacheDescriptor desc) { |
| return desc.cacheConfiguration().getName(); |
| } |
| }, |
| new IgnitePredicate<DynamicCacheDescriptor>() { |
| @Override public boolean apply(DynamicCacheDescriptor desc) { |
| return desc.started(); |
| } |
| }); |
| } |
| |
| /** |
| * Gets a collection of currently started public cache names. |
| * |
| * @return Collection of currently started public cache names |
| */ |
| public Collection<String> publicCacheNames() { |
| return F.viewReadOnly(registeredCaches.values(), |
| new IgniteClosure<DynamicCacheDescriptor, String>() { |
| @Override public String apply(DynamicCacheDescriptor desc) { |
| return desc.cacheConfiguration().getName(); |
| } |
| }, |
| new IgnitePredicate<DynamicCacheDescriptor>() { |
| @Override public boolean apply(DynamicCacheDescriptor desc) { |
| return desc.started() && desc.cacheType().userCache(); |
| } |
| } |
| ); |
| } |
| /** |
| * Gets cache mode. |
| * |
| * @param cacheName Cache name to check. |
| * @return Cache mode. |
| */ |
| public CacheMode cacheMode(String cacheName) { |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); |
| |
| return desc != null ? desc.cacheConfiguration().getCacheMode() : null; |
| } |
| |
| /** |
| * @param reqs Requests to start. |
| * @param topVer Topology version. |
| * @throws IgniteCheckedException If failed to start cache. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| public void prepareCachesStart( |
| Collection<DynamicCacheChangeRequest> reqs, |
| AffinityTopologyVersion topVer |
| ) throws IgniteCheckedException { |
| if (ctx.isDaemon()) |
| return; |
| |
| for (DynamicCacheChangeRequest req : reqs) { |
| assert req.start() : req; |
| assert req.cacheType() != null : req; |
| |
| prepareCacheStart( |
| req.startCacheConfiguration(), |
| req.nearCacheConfiguration(), |
| req.cacheType(), |
| req.clientStartOnly(), |
| req.initiatingNodeId(), |
| req.deploymentId(), |
| topVer |
| ); |
| |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); |
| |
| if (desc != null) |
| desc.onStart(); |
| } |
| |
| // Start statically configured caches received from remote nodes during exchange. |
| for (DynamicCacheDescriptor desc : registeredCaches.values()) { |
| if (desc.staticallyConfigured() && !desc.locallyConfigured()) { |
| if (desc.onStart()) { |
| prepareCacheStart( |
| desc.cacheConfiguration(), |
| null, |
| desc.cacheType(), |
| false, |
| null, |
| desc.deploymentId(), |
| topVer |
| ); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param cfg Start configuration. |
| * @param nearCfg Near configuration. |
| * @param cacheType Cache type. |
| * @param clientStartOnly Client only start request. |
| * @param initiatingNodeId Initiating node ID. |
| * @param deploymentId Deployment ID. |
| * @param topVer Topology version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void prepareCacheStart( |
| CacheConfiguration cfg, |
| NearCacheConfiguration nearCfg, |
| CacheType cacheType, |
| boolean clientStartOnly, |
| UUID initiatingNodeId, |
| IgniteUuid deploymentId, |
| AffinityTopologyVersion topVer |
| ) throws IgniteCheckedException { |
| CacheConfiguration ccfg = new CacheConfiguration(cfg); |
| |
| IgnitePredicate nodeFilter = ccfg.getNodeFilter(); |
| |
| ClusterNode locNode = ctx.discovery().localNode(); |
| |
| boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); |
| boolean clientNodeStart = locNode.id().equals(initiatingNodeId); |
| |
| if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) |
| return; |
| |
| if (affNodeStart || clientNodeStart) { |
| if (clientNodeStart && !affNodeStart) { |
| if (nearCfg != null) |
| ccfg.setNearConfiguration(nearCfg); |
| else |
| ccfg.setNearConfiguration(null); |
| } |
| |
| CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); |
| |
| GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); |
| |
| cacheCtx.startTopologyVersion(topVer); |
| |
| cacheCtx.dynamicDeploymentId(deploymentId); |
| |
| sharedCtx.addCacheContext(cacheCtx); |
| |
| caches.put(maskNull(cacheCtx.name()), cacheCtx.cache()); |
| |
| startCache(cacheCtx.cache()); |
| onKernalStart(cacheCtx.cache()); |
| } |
| } |
| |
| /** |
| * @param req Stop request. |
| */ |
| public void blockGateway(DynamicCacheChangeRequest req) { |
| assert req.stop() || req.close(); |
| |
| if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { |
| // Break the proxy before exchange future is done. |
| IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); |
| |
| if (proxy != null) { |
| if (req.stop()) |
| proxy.gate().stopped(); |
| else |
| proxy.closeProxy(); |
| } |
| } |
| } |
| |
| /** |
| * @param req Request. |
| */ |
| private void stopGateway(DynamicCacheChangeRequest req) { |
| assert req.stop() : req; |
| |
| // Break the proxy before exchange future is done. |
| IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); |
| |
| if (proxy != null) |
| proxy.gate().onStopped(); |
| } |
| |
| /** |
| * @param req Stop request. |
| */ |
| public void prepareCacheStop(DynamicCacheChangeRequest req) { |
| assert req.stop() || req.close() : req; |
| |
| GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); |
| |
| if (cache != null) { |
| GridCacheContext<?, ?> ctx = cache.context(); |
| |
| sharedCtx.removeCacheContext(ctx); |
| |
| assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req + |
| ", ctxDepId=" + ctx.dynamicDeploymentId() + ']'; |
| |
| onKernalStop(cache, true); |
| stopCache(cache, true); |
| } |
| } |
| |
| /** |
| * Callback invoked when first exchange future for dynamic cache is completed. |
| * |
| * @param topVer Completed topology version. |
| * @param reqs Change requests. |
| * @param err Error. |
| */ |
| @SuppressWarnings("unchecked") |
| public void onExchangeDone( |
| AffinityTopologyVersion topVer, |
| Collection<DynamicCacheChangeRequest> reqs, |
| Throwable err |
| ) { |
| for (GridCacheAdapter<?, ?> cache : caches.values()) { |
| GridCacheContext<?, ?> cacheCtx = cache.context(); |
| |
| if (F.eq(cacheCtx.startTopologyVersion(), topVer)) { |
| cacheCtx.preloader().onInitialExchangeComplete(err); |
| |
| String masked = maskNull(cacheCtx.name()); |
| |
| jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); |
| } |
| } |
| |
| if (!F.isEmpty(reqs) && err == null) { |
| for (DynamicCacheChangeRequest req : reqs) { |
| String masked = maskNull(req.cacheName()); |
| |
| if (req.stop()) { |
| stopGateway(req); |
| |
| prepareCacheStop(req); |
| } |
| else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) { |
| IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked); |
| |
| if (proxy != null) { |
| if (proxy.context().affinityNode()) { |
| GridCacheAdapter<?, ?> cache = caches.get(masked); |
| |
| if (cache != null) |
| jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); |
| } |
| else { |
| proxy.context().gate().onStopped(); |
| |
| prepareCacheStop(req); |
| } |
| } |
| } |
| |
| completeStartFuture(req); |
| } |
| } |
| } |
| |
| /** |
| * @param req Request to complete future for. |
| */ |
| public void completeStartFuture(DynamicCacheChangeRequest req) { |
| DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); |
| |
| assert req.deploymentId() != null; |
| assert fut == null || fut.deploymentId != null; |
| |
| if (fut != null && fut.deploymentId().equals(req.deploymentId()) && |
| F.eq(req.initiatingNodeId(), ctx.localNodeId())) |
| fut.onDone(); |
| } |
| |
| /** |
| * Creates shared context. |
| * |
| * @param kernalCtx Kernal context. |
| * @param storeSesLsnrs Store session listeners. |
| * @return Shared context. |
| */ |
| @SuppressWarnings("unchecked") |
| private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, |
| Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException { |
| IgniteTxManager tm = new IgniteTxManager(); |
| GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); |
| GridCacheVersionManager verMgr = new GridCacheVersionManager(); |
| GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager(); |
| GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager(); |
| GridCacheIoManager ioMgr = new GridCacheIoManager(); |
| |
| CacheJtaManagerAdapter jta = JTA.createOptional(); |
| |
| return new GridCacheSharedContext( |
| kernalCtx, |
| tm, |
| verMgr, |
| mvccMgr, |
| depMgr, |
| exchMgr, |
| ioMgr, |
| jta, |
| storeSesLsnrs |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { |
| return DiscoveryDataExchangeType.CACHE_PROC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { |
| boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; |
| |
| // Collect dynamically started caches to a single object. |
| Collection<DynamicCacheChangeRequest> reqs; |
| |
| Map<String, Map<UUID, Boolean>> clientNodesMap; |
| |
| if (reconnect) { |
| reqs = new ArrayList<>(caches.size()); |
| |
| clientNodesMap = U.newHashMap(caches.size()); |
| |
| for (GridCacheAdapter<?, ?> cache : caches.values()) { |
| DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name())); |
| |
| if (desc == null) |
| continue; |
| |
| DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null); |
| |
| req.startCacheConfiguration(desc.cacheConfiguration()); |
| |
| req.cacheType(desc.cacheType()); |
| |
| req.deploymentId(desc.deploymentId()); |
| |
| reqs.add(req); |
| |
| Boolean nearEnabled = cache.isNear(); |
| |
| Map<UUID, Boolean> map = U.newHashMap(1); |
| |
| map.put(nodeId, nearEnabled); |
| |
| clientNodesMap.put(cache.name(), map); |
| } |
| } |
| else { |
| reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); |
| |
| for (DynamicCacheDescriptor desc : registeredCaches.values()) { |
| DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); |
| |
| req.startCacheConfiguration(desc.cacheConfiguration()); |
| |
| req.cacheType(desc.cacheType()); |
| |
| req.deploymentId(desc.deploymentId()); |
| |
| reqs.add(req); |
| } |
| |
| for (DynamicCacheDescriptor desc : registeredTemplates.values()) { |
| DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); |
| |
| req.startCacheConfiguration(desc.cacheConfiguration()); |
| |
| req.template(true); |
| |
| req.deploymentId(desc.deploymentId()); |
| |
| reqs.add(req); |
| } |
| |
| clientNodesMap = ctx.discovery().clientNodesMap(); |
| } |
| |
| DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); |
| |
| batch.clientNodes(clientNodesMap); |
| |
| batch.clientReconnect(reconnect); |
| |
| return batch; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { |
| if (data instanceof DynamicCacheChangeBatch) { |
| DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; |
| |
| if (batch.clientReconnect()) { |
| if (ctx.clientDisconnected()) { |
| if (clientReconnectReqs == null) |
| clientReconnectReqs = new LinkedHashMap<>(); |
| |
| clientReconnectReqs.put(joiningNodeId, batch); |
| |
| return; |
| } |
| |
| processClientReconnectData(joiningNodeId, batch); |
| } |
| else { |
| for (DynamicCacheChangeRequest req : batch.requests()) { |
| if (req.template()) { |
| CacheConfiguration ccfg = req.startCacheConfiguration(); |
| |
| assert ccfg != null : req; |
| |
| DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); |
| |
| if (existing == null) { |
| DynamicCacheDescriptor desc = new DynamicCacheDescriptor( |
| ctx, |
| ccfg, |
| req.cacheType(), |
| true, |
| req.deploymentId()); |
| |
| registeredTemplates.put(maskNull(req.cacheName()), desc); |
| } |
| |
| continue; |
| } |
| |
| DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); |
| |
| if (req.start() && !req.clientStartOnly()) { |
| CacheConfiguration ccfg = req.startCacheConfiguration(); |
| |
| if (existing != null) { |
| if (existing.locallyConfigured()) { |
| existing.deploymentId(req.deploymentId()); |
| |
| existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); |
| |
| ctx.discovery().setCacheFilter( |
| req.cacheName(), |
| ccfg.getNodeFilter(), |
| ccfg.getNearConfiguration() != null, |
| ccfg.getCacheMode()); |
| } |
| } |
| else { |
| assert req.cacheType() != null : req; |
| |
| DynamicCacheDescriptor desc = new DynamicCacheDescriptor( |
| ctx, |
| ccfg, |
| req.cacheType(), |
| false, |
| req.deploymentId()); |
| |
| // Received statically configured cache. |
| if (req.initiatingNodeId() == null) |
| desc.staticallyConfigured(true); |
| |
| desc.receivedOnDiscovery(true); |
| |
| DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); |
| |
| assert old == null : old; |
| |
| ctx.discovery().setCacheFilter( |
| req.cacheName(), |
| ccfg.getNodeFilter(), |
| ccfg.getNearConfiguration() != null, |
| ccfg.getCacheMode()); |
| } |
| } |
| } |
| |
| if (!F.isEmpty(batch.clientNodes())) { |
| for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) { |
| String cacheName = entry.getKey(); |
| |
| for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) |
| ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param clientNodeId Client node ID. |
| * @param batch Cache change batch. |
| */ |
| private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) { |
| assert batch.clientReconnect() : batch; |
| |
| for (DynamicCacheChangeRequest req : batch.requests()) { |
| assert !req.template() : req; |
| |
| String name = req.cacheName(); |
| |
| boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); |
| |
| if (!sysCache) { |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); |
| |
| if (desc != null && desc.deploymentId().equals(req.deploymentId())) { |
| Map<UUID, Boolean> nodes = batch.clientNodes().get(name); |
| |
| assert nodes != null : req; |
| assert nodes.containsKey(clientNodeId) : nodes; |
| |
| ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId)); |
| } |
| } |
| else |
| ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false); |
| } |
| } |
| |
| /** |
| * Dynamically starts cache using template configuration. |
| * |
| * @param cacheName Cache name. |
| * @return Future that will be completed when cache is deployed. |
| */ |
| public IgniteInternalFuture<?> createFromTemplate(String cacheName) { |
| try { |
| CacheConfiguration cfg = createConfigFromTemplate(cacheName); |
| |
| return dynamicStartCache(cfg, cacheName, null, true, true); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** |
| * Dynamically starts cache using template configuration. |
| * |
| * @param cacheName Cache name. |
| * @return Future that will be completed when cache is deployed. |
| */ |
| public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName) { |
| try { |
| if (publicJCache(cacheName, false) != null) // Cache with given name already started. |
| return new GridFinishedFuture<>(); |
| |
| CacheConfiguration cfg = createConfigFromTemplate(cacheName); |
| |
| return dynamicStartCache(cfg, cacheName, null, false, true); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @return Cache configuration. |
| */ |
| private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException { |
| CacheConfiguration cfgTemplate = null; |
| |
| CacheConfiguration dfltCacheCfg = null; |
| |
| List<CacheConfiguration> wildcardNameCfgs = null; |
| |
| for (DynamicCacheDescriptor desc : registeredTemplates.values()) { |
| assert desc.template(); |
| |
| CacheConfiguration cfg = desc.cacheConfiguration(); |
| |
| assert cfg != null; |
| |
| if (F.eq(cacheName, cfg.getName())) { |
| cfgTemplate = cfg; |
| |
| break; |
| } |
| |
| if (cfg.getName() != null ) { |
| if (cfg.getName().endsWith("*")) { |
| if (cfg.getName().length() > 1) { |
| if (wildcardNameCfgs == null) |
| wildcardNameCfgs = new ArrayList<>(); |
| |
| wildcardNameCfgs.add(cfg); |
| } |
| else |
| dfltCacheCfg = cfg; // Template with name '*'. |
| } |
| } |
| else if (dfltCacheCfg == null) |
| dfltCacheCfg = cfg; |
| } |
| |
| if (cfgTemplate == null && cacheName != null && wildcardNameCfgs != null) { |
| Collections.sort(wildcardNameCfgs, new Comparator<CacheConfiguration>() { |
| @Override public int compare(CacheConfiguration cfg1, CacheConfiguration cfg2) { |
| Integer len1 = cfg1.getName() != null ? cfg1.getName().length() : 0; |
| Integer len2 = cfg2.getName() != null ? cfg2.getName().length() : 0; |
| |
| return len2.compareTo(len1); |
| } |
| }); |
| |
| for (CacheConfiguration cfg : wildcardNameCfgs) { |
| if (cacheName.startsWith(cfg.getName().substring(0, cfg.getName().length() - 1))) { |
| cfgTemplate = cfg; |
| |
| break; |
| } |
| } |
| } |
| |
| if (cfgTemplate == null) |
| cfgTemplate = dfltCacheCfg; |
| |
| if (cfgTemplate == null) |
| cfgTemplate = new CacheConfiguration(); |
| else |
| cfgTemplate = cloneCheckSerializable(cfgTemplate); |
| |
| CacheConfiguration cfg = new CacheConfiguration(cfgTemplate); |
| |
| cfg.setName(cacheName); |
| |
| return cfg; |
| } |
| |
| /** |
| * Dynamically starts cache. |
| * |
| * @param ccfg Cache configuration. |
| * @param cacheName Cache name. |
| * @param nearCfg Near cache configuration. |
| * @param failIfExists Fail if exists flag. |
| * @return Future that will be completed when cache is deployed. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| public IgniteInternalFuture<?> dynamicStartCache( |
| @Nullable CacheConfiguration ccfg, |
| String cacheName, |
| @Nullable NearCacheConfiguration nearCfg, |
| boolean failIfExists, |
| boolean failIfNotStarted |
| ) { |
| return dynamicStartCache(ccfg, cacheName, nearCfg, CacheType.USER, failIfExists, failIfNotStarted); |
| } |
| |
| /** |
| * Dynamically starts cache. |
| * |
| * @param ccfg Cache configuration. |
| * @param cacheName Cache name. |
| * @param nearCfg Near cache configuration. |
| * @param failIfExists Fail if exists flag. |
| * @return Future that will be completed when cache is deployed. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| public IgniteInternalFuture<?> dynamicStartCache( |
| @Nullable CacheConfiguration ccfg, |
| String cacheName, |
| @Nullable NearCacheConfiguration nearCfg, |
| CacheType cacheType, |
| boolean failIfExists, |
| boolean failIfNotStarted |
| ) { |
| checkEmptyTransactions(); |
| |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); |
| |
| DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); |
| |
| req.failIfExists(failIfExists); |
| |
| if (ccfg != null) { |
| try { |
| cloneCheckSerializable(ccfg); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| if (desc != null) { |
| if (failIfExists) { |
| return new GridFinishedFuture<>(new CacheExistsException("Failed to start cache " + |
| "(a cache with the same name is already started): " + cacheName)); |
| } |
| else { |
| CacheConfiguration descCfg = desc.cacheConfiguration(); |
| |
| // Check if we were asked to start a near cache. |
| if (nearCfg != null) { |
| if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) { |
| // If we are on a data node and near cache was enabled, return success, else - fail. |
| if (descCfg.getNearConfiguration() != null) |
| return new GridFinishedFuture<>(); |
| else |
| return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start near " + |
| "cache (local node is an affinity node for cache): " + cacheName)); |
| } |
| else |
| // If local node has near cache, return success. |
| req.clientStartOnly(true); |
| } |
| else |
| req.clientStartOnly(true); |
| |
| req.deploymentId(desc.deploymentId()); |
| |
| req.startCacheConfiguration(descCfg); |
| } |
| } |
| else { |
| req.deploymentId(IgniteUuid.randomUuid()); |
| |
| try { |
| CacheConfiguration cfg = new CacheConfiguration(ccfg); |
| |
| CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); |
| |
| initialize(false, cfg, cacheObjCtx); |
| |
| req.startCacheConfiguration(cfg); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture(e); |
| } |
| } |
| } |
| else { |
| req.clientStartOnly(true); |
| |
| if (desc != null) |
| ccfg = desc.cacheConfiguration(); |
| |
| if (ccfg == null) { |
| if (failIfNotStarted) |
| return new GridFinishedFuture<>(new CacheExistsException("Failed to start client cache " + |
| "(a cache with the given name is not started): " + cacheName)); |
| else |
| return new GridFinishedFuture<>(); |
| } |
| |
| req.deploymentId(desc.deploymentId()); |
| req.startCacheConfiguration(ccfg); |
| } |
| |
| if (nearCfg != null) |
| req.nearCacheConfiguration(nearCfg); |
| |
| req.cacheType(cacheType); |
| |
| return F.first(initiateCacheChanges(F.asList(req), failIfExists)); |
| } |
| |
| /** |
| * @param cacheName Cache name to destroy. |
| * @return Future that will be completed when cache is destroyed. |
| */ |
| public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) { |
| checkEmptyTransactions(); |
| |
| DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); |
| |
| t.stop(true); |
| |
| return F.first(initiateCacheChanges(F.asList(t), false)); |
| } |
| |
| |
| /** |
| * @param cacheName Cache name to close. |
| * @return Future that will be completed when cache is closed. |
| */ |
| public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) { |
| IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName)); |
| |
| if (proxy == null || proxy.proxyClosed()) |
| return new GridFinishedFuture<>(); // No-op. |
| |
| checkEmptyTransactions(); |
| |
| DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); |
| |
| t.close(true); |
| |
| return F.first(initiateCacheChanges(F.asList(t), false)); |
| } |
| |
| /** |
| * @param reqs Requests. |
| * @return Collection of futures. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs, |
| boolean failIfExists) { |
| Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size()); |
| |
| Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size()); |
| |
| for (DynamicCacheChangeRequest req : reqs) { |
| DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req); |
| |
| try { |
| if (req.stop() || req.close()) { |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); |
| |
| if (desc == null) |
| // No-op. |
| fut.onDone(); |
| else { |
| assert desc.cacheConfiguration() != null : desc; |
| |
| if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) { |
| req.close(false); |
| |
| req.stop(true); |
| } |
| |
| IgniteUuid dynamicDeploymentId = desc.deploymentId(); |
| |
| assert dynamicDeploymentId != null : desc; |
| |
| // Save deployment ID to avoid concurrent stops. |
| req.deploymentId(dynamicDeploymentId); |
| fut.deploymentId = dynamicDeploymentId; |
| } |
| } |
| |
| if (fut.isDone()) |
| continue; |
| |
| DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent( |
| maskNull(req.cacheName()), fut); |
| |
| if (old != null) { |
| if (req.start()) { |
| if (!req.clientStartOnly()) { |
| if (failIfExists) |
| fut.onDone(new CacheExistsException("Failed to start cache " + |
| "(a cache with the same name is already being started or stopped): " + |
| req.cacheName())); |
| else { |
| fut = old; |
| |
| continue; |
| } |
| } |
| else { |
| fut = old; |
| |
| continue; |
| } |
| } |
| else { |
| fut = old; |
| |
| continue; |
| } |
| } |
| |
| if (fut.isDone()) |
| continue; |
| |
| sndReqs.add(req); |
| } |
| catch (Exception e) { |
| fut.onDone(e); |
| } |
| finally { |
| res.add(fut); |
| } |
| } |
| |
| Exception err = null; |
| |
| if (!sndReqs.isEmpty()) { |
| try { |
| ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); |
| |
| if (ctx.isStopping()) { |
| err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + |
| "node is stopping."); |
| } |
| else if (ctx.clientDisconnected()) { |
| err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), |
| "Failed to execute dynamic cache change request, client node disconnected."); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| } |
| } |
| |
| if (err != null) { |
| for (DynamicCacheStartFuture fut : res) |
| fut.onDone(err); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Callback invoked from discovery thread when cache deployment request is received. |
| * |
| * @param batch Change request batch. |
| * @param topVer Current topology version. |
| */ |
| private void onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { |
| for (DynamicCacheChangeRequest req : batch.requests()) { |
| if (req.template()) { |
| CacheConfiguration ccfg = req.startCacheConfiguration(); |
| |
| assert ccfg != null : req; |
| |
| DynamicCacheDescriptor desc = registeredTemplates.get(maskNull(req.cacheName())); |
| |
| if (desc == null) { |
| DynamicCacheDescriptor templateDesc = |
| new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId()); |
| |
| DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc); |
| |
| assert old == null : |
| "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']'; |
| } |
| |
| TemplateConfigurationFuture fut = |
| (TemplateConfigurationFuture)pendingTemplateFuts.get(maskNull(ccfg.getName())); |
| |
| if (fut != null && fut.deploymentId().equals(req.deploymentId())) |
| fut.onDone(); |
| |
| continue; |
| } |
| |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); |
| |
| boolean needExchange = false; |
| |
| DynamicCacheStartFuture fut = null; |
| |
| if (ctx.localNodeId().equals(req.initiatingNodeId())) { |
| fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); |
| |
| if (fut != null && !req.deploymentId().equals(fut.deploymentId())) |
| fut = null; |
| } |
| |
| if (req.start()) { |
| if (desc == null) { |
| if (req.clientStartOnly()) { |
| if (fut != null) |
| fut.onDone(new IgniteCheckedException("Failed to start client cache " + |
| "(a cache with the given name is not started): " + U.maskName(req.cacheName()))); |
| } |
| else { |
| CacheConfiguration ccfg = req.startCacheConfiguration(); |
| |
| assert req.cacheType() != null : req; |
| assert F.eq(ccfg.getName(), req.cacheName()) : req; |
| |
| DynamicCacheDescriptor startDesc = |
| new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId()); |
| |
| startDesc.startTopologyVersion(topVer); |
| |
| DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); |
| |
| assert old == null : |
| "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; |
| |
| ctx.discovery().setCacheFilter( |
| ccfg.getName(), |
| ccfg.getNodeFilter(), |
| ccfg.getNearConfiguration() != null, |
| ccfg.getCacheMode()); |
| |
| ctx.discovery().addClientNode(req.cacheName(), |
| req.initiatingNodeId(), |
| req.nearCacheConfiguration() != null); |
| |
| needExchange = true; |
| } |
| } |
| else { |
| assert req.initiatingNodeId() != null : req; |
| |
| // Cache already exists, exchange is needed only if client cache should be created. |
| ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); |
| |
| boolean clientReq = node != null && |
| !ctx.discovery().cacheAffinityNode(node, req.cacheName()); |
| |
| if (req.clientStartOnly()) { |
| needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), |
| req.initiatingNodeId(), |
| req.nearCacheConfiguration() != null); |
| } |
| else { |
| if (req.failIfExists() ) { |
| if (fut != null) |
| fut.onDone(new CacheExistsException("Failed to start cache " + |
| "(a cache with the same name is already started): " + U.maskName(req.cacheName()))); |
| } |
| else { |
| needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), |
| req.initiatingNodeId(), |
| req.nearCacheConfiguration() != null); |
| |
| if (needExchange) |
| req.clientStartOnly(true); |
| } |
| } |
| } |
| |
| if (!needExchange && desc != null) |
| req.cacheFutureTopologyVersion(desc.startTopologyVersion()); |
| } |
| else { |
| assert req.stop() ^ req.close() : req; |
| |
| if (desc != null) { |
| if (req.stop()) { |
| DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName())); |
| |
| assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; |
| |
| ctx.discovery().removeCacheFilter(req.cacheName()); |
| |
| needExchange = true; |
| } |
| else { |
| assert req.close() : req; |
| |
| needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); |
| } |
| } |
| } |
| |
| req.exchangeNeeded(needExchange); |
| } |
| } |
| |
| /** |
| * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode. |
| * |
| * @param cfgs Caches. |
| * @return Maximum detected preload order. |
| * @throws IgniteCheckedException If validation failed. |
| */ |
| private int validatePreloadOrder(CacheConfiguration[] cfgs) throws IgniteCheckedException { |
| int maxOrder = 0; |
| |
| for (CacheConfiguration cfg : cfgs) { |
| int rebalanceOrder = cfg.getRebalanceOrder(); |
| |
| if (rebalanceOrder > 0) { |
| if (cfg.getCacheMode() == LOCAL) |
| throw new IgniteCheckedException("Rebalance order set for local cache (fix configuration and restart the " + |
| "node): " + U.maskName(cfg.getName())); |
| |
| if (cfg.getRebalanceMode() == CacheRebalanceMode.NONE) |
| throw new IgniteCheckedException("Only caches with SYNC or ASYNC rebalance mode can be set as rebalance " + |
| "dependency for other caches [cacheName=" + U.maskName(cfg.getName()) + |
| ", rebalanceMode=" + cfg.getRebalanceMode() + ", rebalanceOrder=" + cfg.getRebalanceOrder() + ']'); |
| |
| maxOrder = Math.max(maxOrder, rebalanceOrder); |
| } |
| else if (rebalanceOrder < 0) |
| throw new IgniteCheckedException("Rebalance order cannot be negative for cache (fix configuration and restart " + |
| "the node) [cacheName=" + U.maskName(cfg.getName()) + ", rebalanceOrder=" + rebalanceOrder + ']'); |
| } |
| |
| return maxOrder; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { |
| return validateHashIdResolvers(node); |
| } |
| |
| /** |
| * @param node Joining node. |
| * @return Validation result or {@code null} in case of success. |
| */ |
| @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) { |
| if (!node.isClient()) { |
| for (DynamicCacheDescriptor desc : registeredCaches.values()) { |
| CacheConfiguration cfg = desc.cacheConfiguration(); |
| |
| if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { |
| RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity(); |
| |
| Object nodeHashObj = aff.resolveNodeHash(node); |
| |
| for (ClusterNode topNode : ctx.discovery().allNodes()) { |
| Object topNodeHashObj = aff.resolveNodeHash(topNode); |
| |
| if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) { |
| String hashIdRslvrName = ""; |
| |
| if (aff.getHashIdResolver() != null) |
| hashIdRslvrName = ", hashIdResolverClass=" + |
| aff.getHashIdResolver().getClass().getName(); |
| |
| String errMsg = "Failed to add node to topology because it has the same hash code for " + |
| "partitioned affinity as one of existing nodes [cacheName=" + |
| U.maskName(cfg.getName()) + hashIdRslvrName + ", existingNodeId=" + topNode.id() + ']'; |
| |
| String sndMsg = "Failed to add node to topology because it has the same hash code for " + |
| "partitioned affinity as one of existing nodes [cacheName=" + |
| U.maskName(cfg.getName()) + hashIdRslvrName + ", existingNodeId=" + topNode.id() + ']'; |
| |
| return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg); |
| } |
| } |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Checks that remote caches has configuration compatible with the local. |
| * |
| * @param locCfg Local configuration. |
| * @param rmtCfg Remote configuration. |
| * @param rmtNode Remote node. |
| * @param desc Cache descriptor. |
| * @throws IgniteCheckedException If check failed. |
| */ |
| private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode, |
| DynamicCacheDescriptor desc) throws IgniteCheckedException { |
| ClusterNode locNode = ctx.discovery().localNode(); |
| |
| UUID rmt = rmtNode.id(); |
| |
| GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg); |
| GridCacheAttributes locAttr = new GridCacheAttributes(locCfg); |
| |
| boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter()); |
| boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", |
| locAttr.cacheMode(), rmtAttr.cacheMode(), true); |
| |
| if (rmtAttr.cacheMode() != LOCAL) { |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor", |
| locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode", |
| "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", |
| "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); |
| |
| boolean checkStore = isLocAff && isRmtAff; |
| |
| if (checkStore) |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory", |
| locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity", |
| locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper", |
| "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(), |
| rmtAttr.cacheAffinityMapperClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount", |
| "Affinity partitions count", locAttr.affinityPartitionsCount(), |
| rmtAttr.affinityPartitionsCount(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter", |
| locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy", |
| locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup", |
| "Transaction manager lookup", locAttr.transactionManagerLookupClassName(), |
| rmtAttr.transactionManagerLookupClassName(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout", |
| "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize", |
| "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "swapEnabled", |
| "Swap enabled", locAttr.swapEnabled(), rmtAttr.swapEnabled(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode", |
| "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(), |
| true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize", |
| "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(), |
| false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled", |
| "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency", |
| "Write behind flush frequency", locAttr.writeBehindFlushFrequency(), |
| rmtAttr.writeBehindFlushFrequency(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize", |
| "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(), |
| false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount", |
| "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(), |
| rmtAttr.writeBehindFlushThreadCount(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictMaxOverflowRatio", |
| "Eviction max overflow ratio", locAttr.evictMaxOverflowRatio(), |
| rmtAttr.evictMaxOverflowRatio(), true); |
| |
| if (locAttr.cacheMode() == PARTITIONED) { |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictSynchronized", |
| "Eviction synchronized", locAttr.evictSynchronized(), rmtAttr.evictSynchronized(), |
| true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy", |
| "Near eviction policy", locAttr.nearEvictionPolicyClassName(), |
| rmtAttr.nearEvictionPolicyClassName(), false); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors", |
| "Affinity include neighbors", locAttr.affinityIncludeNeighbors(), |
| rmtAttr.affinityIncludeNeighbors(), true); |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups", |
| "Affinity key backups", locAttr.affinityKeyBackups(), |
| rmtAttr.affinityKeyBackups(), true); |
| |
| String locHashIdResolver = locAttr.affinityHashIdResolverClassName(); |
| String rmtHashIdResolver = rmtAttr.affinityHashIdResolverClassName(); |
| String defHashIdResolver = AffinityNodeAddressHashResolver.class.getName(); |
| |
| if (!((locHashIdResolver == null && rmtHashIdResolver == null) || |
| (locHashIdResolver == null && rmtHashIdResolver.equals(defHashIdResolver)) || |
| (rmtHashIdResolver == null && locHashIdResolver.equals(defHashIdResolver)))) { |
| |
| CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity.hashIdResolver", |
| "Partitioned cache affinity hash ID resolver class", |
| locHashIdResolver, rmtHashIdResolver, true); |
| } |
| |
| if (locHashIdResolver == null && |
| (rmtHashIdResolver != null && rmtHashIdResolver.equals(defHashIdResolver))) { |
| U.warn(log, "Set " + RendezvousAffinityFunction.class + " with " + defHashIdResolver + |
| " to CacheConfiguration to start node [cacheName=" + rmtAttr.cacheName() + "]"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param rmt Remote node to check. |
| * @throws IgniteCheckedException If check failed. |
| */ |
| private void checkTransactionConfiguration(ClusterNode rmt) throws IgniteCheckedException { |
| TransactionConfiguration txCfg = rmt.attribute(ATTR_TX_CONFIG); |
| |
| if (txCfg != null) { |
| TransactionConfiguration locTxCfg = ctx.config().getTransactionConfiguration(); |
| |
| if (locTxCfg.isTxSerializableEnabled() != txCfg.isTxSerializableEnabled()) |
| throw new IgniteCheckedException("Serializable transactions enabled mismatch " + |
| "(fix txSerializableEnabled property or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " + |
| "system property) [rmtNodeId=" + rmt.id() + |
| ", locTxSerializableEnabled=" + locTxCfg.isTxSerializableEnabled() + |
| ", rmtTxSerializableEnabled=" + txCfg.isTxSerializableEnabled() + ']'); |
| } |
| } |
| |
| /** |
| * @param spaceName Space name. |
| * @param keyBytes Key bytes. |
| * @param valBytes Value bytes. |
| */ |
| @SuppressWarnings( {"unchecked"}) |
| public void onEvictFromSwap(String spaceName, byte[] keyBytes, byte[] valBytes) { |
| assert spaceName != null; |
| assert keyBytes != null; |
| assert valBytes != null; |
| |
| /* |
| * NOTE: this method should not have any synchronization because |
| * it is called from synchronization block within Swap SPI. |
| */ |
| |
| GridCacheAdapter cache = caches.get(maskNull(CU.cacheNameForSwapSpaceName(spaceName))); |
| |
| assert cache != null : "Failed to resolve cache name for swap space name: " + spaceName; |
| |
| GridCacheContext cctx = cache.configuration().getCacheMode() == PARTITIONED ? |
| ((GridNearCacheAdapter<?, ?>)cache).dht().context() : cache.context(); |
| |
| if (spaceName.equals(CU.swapSpaceName(cctx))) { |
| GridCacheQueryManager qryMgr = cctx.queries(); |
| |
| if (qryMgr.enabled()) { |
| try { |
| KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); |
| |
| GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true); |
| |
| CacheObject val = swapEntry.value(); |
| |
| assert val != null; |
| |
| qryMgr.remove(key, val); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to unmarshal key evicted from swap [swapSpaceName=" + spaceName + ']', e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param cfg Cache configuration. |
| * @return Query manager. |
| */ |
| private GridCacheQueryManager queryManager(CacheConfiguration cfg) { |
| return cfg.getCacheMode() == LOCAL ? new GridCacheLocalQueryManager() : new GridCacheDistributedQueryManager(); |
| } |
| |
| /** |
| * @return Last data version. |
| */ |
| public long lastDataVersion() { |
| long max = 0; |
| |
| for (GridCacheAdapter<?, ?> cache : caches.values()) { |
| GridCacheContext<?, ?> ctx = cache.context(); |
| |
| if (ctx.versions().last().order() > max) |
| max = ctx.versions().last().order(); |
| |
| if (ctx.isNear()) { |
| ctx = ctx.near().dht().context(); |
| |
| if (ctx.versions().last().order() > max) |
| max = ctx.versions().last().order(); |
| } |
| } |
| |
| return max; |
| } |
| |
| /** |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Default cache. |
| */ |
| public <K, V> IgniteInternalCache<K, V> cache() { |
| return cache(null); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Cache instance for given name. |
| */ |
| @SuppressWarnings("unchecked") |
| public <K, V> IgniteInternalCache<K, V> cache(@Nullable String name) { |
| if (log.isDebugEnabled()) |
| log.debug("Getting cache for name: " + name); |
| |
| IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); |
| |
| return jcache == null ? null : jcache.internalProxy(); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @return Cache instance for given name. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| public <K, V> IgniteInternalCache<K, V> getOrStartCache(@Nullable String name) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Getting cache for name: " + name); |
| |
| String masked = maskNull(name); |
| |
| IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); |
| |
| if (cache == null) { |
| dynamicStartCache(null, name, null, false, true).get(); |
| |
| cache = jCacheProxies.get(masked); |
| } |
| |
| return cache == null ? null : (IgniteInternalCache<K, V>)cache.internalProxy(); |
| } |
| |
| /** |
| * @return All configured cache instances. |
| */ |
| public Collection<IgniteInternalCache<?, ?>> caches() { |
| return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?, ?>, |
| IgniteInternalCache<?, ?>>() { |
| @Override public IgniteInternalCache<?, ?> apply(IgniteCacheProxy<?, ?> entries) { |
| return entries.internalProxy(); |
| } |
| }); |
| } |
| |
| /** |
| * @return All configured cache instances. |
| */ |
| public Collection<IgniteCacheProxy<?, ?>> jcaches() { |
| return jCacheProxies.values(); |
| } |
| |
| /** |
| * @return Marshaller system cache. |
| */ |
| public GridCacheAdapter<Integer, String> marshallerCache() { |
| return internalCache(CU.MARSH_CACHE_NAME); |
| } |
| |
| /** |
| * Gets utility cache. |
| * |
| * @return Utility cache. |
| */ |
| public <K, V> IgniteInternalCache<K, V> utilityCache() { |
| return internalCacheEx(CU.UTILITY_CACHE_NAME); |
| } |
| |
| /** |
| * Gets utility cache for atomic data structures. |
| * |
| * @return Utility cache for atomic data structures. |
| */ |
| public <K, V> IgniteInternalCache<K, V> atomicsCache() { |
| return internalCacheEx(CU.ATOMICS_CACHE_NAME); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @return Cache. |
| */ |
| private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) { |
| if (ctx.discovery().localNode().isClient()) { |
| IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); |
| |
| assert proxy != null; |
| |
| return proxy.internalProxy(); |
| } |
| |
| return internalCache(name); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Cache instance for given name. |
| */ |
| @SuppressWarnings("unchecked") |
| public <K, V> IgniteInternalCache<K, V> publicCache(@Nullable String name) { |
| if (log.isDebugEnabled()) |
| log.debug("Getting public cache for name: " + name); |
| |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); |
| |
| if (desc == null) |
| throw new IllegalArgumentException("Cache is not started: " + name); |
| |
| if (!desc.cacheType().userCache()) |
| throw new IllegalStateException("Failed to get cache because it is a system cache: " + name); |
| |
| IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); |
| |
| if (jcache == null) |
| throw new IllegalArgumentException("Cache is not started: " + name); |
| |
| return jcache.internalProxy(); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Cache instance for given name. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { |
| return publicJCache(cacheName, true); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started, |
| * otherwise returns {@code null} in this case. |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Cache instance for given name. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings({"unchecked", "ConstantConditions"}) |
| @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) |
| throws IgniteCheckedException |
| { |
| if (log.isDebugEnabled()) |
| log.debug("Getting public cache for name: " + cacheName); |
| |
| String masked = maskNull(cacheName); |
| |
| IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); |
| |
| DynamicCacheDescriptor desc = registeredCaches.get(masked); |
| |
| if (desc != null && !desc.cacheType().userCache()) |
| throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); |
| |
| if (cache == null) { |
| dynamicStartCache(null, cacheName, null, false, failIfNotStarted).get(); |
| |
| cache = jCacheProxies.get(masked); |
| } |
| |
| return (IgniteCacheProxy<K, V>)cache; |
| } |
| |
| /** |
| * Get configuration for the given cache. |
| * |
| * @param name Cache name. |
| * @return Cache configuration. |
| */ |
| public CacheConfiguration cacheConfiguration(String name) { |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); |
| |
| if (desc == null) |
| throw new IllegalStateException("Cache doesn't exist: " + name); |
| else |
| return desc.cacheConfiguration(); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @return Cache descriptor. |
| */ |
| @Nullable public DynamicCacheDescriptor cacheDescriptor(int cacheId) { |
| for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { |
| CacheConfiguration ccfg = cacheDesc.cacheConfiguration(); |
| |
| assert ccfg != null : cacheDesc; |
| |
| if (CU.cacheId(ccfg.getName()) == cacheId) |
| return cacheDesc; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param cacheCfg Cache configuration template. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException { |
| String masked = maskNull(cacheCfg.getName()); |
| |
| DynamicCacheDescriptor desc = registeredTemplates.get(masked); |
| |
| if (desc != null) |
| return; |
| |
| DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheCfg.getName(), ctx.localNodeId()); |
| |
| CacheConfiguration cfg = new CacheConfiguration(cacheCfg); |
| |
| req.template(true); |
| |
| req.startCacheConfiguration(cfg); |
| |
| req.deploymentId(IgniteUuid.randomUuid()); |
| |
| TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId()); |
| |
| TemplateConfigurationFuture old = |
| (TemplateConfigurationFuture)pendingTemplateFuts.putIfAbsent(maskNull(cacheCfg.getName()), fut); |
| |
| if (old != null) |
| fut = old; |
| |
| Exception err = null; |
| |
| try { |
| ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req))); |
| |
| if (ctx.isStopping()) { |
| err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + |
| "node is stopping."); |
| } |
| else if (ctx.clientDisconnected()) { |
| err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), |
| "Failed to execute dynamic cache change request, client node disconnected."); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| } |
| |
| if (err != null) |
| fut.onDone(err); |
| |
| fut.get(); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @return Cache instance for given name. |
| */ |
| @SuppressWarnings("unchecked") |
| public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) { |
| IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); |
| |
| if (cache == null) |
| throw new IllegalArgumentException("Cache is not configured: " + name); |
| |
| return cache; |
| } |
| |
| /** |
| * @return All configured public cache instances. |
| */ |
| public Collection<IgniteCacheProxy<?, ?>> publicCaches() { |
| Collection<IgniteCacheProxy<?, ?>> res = new ArrayList<>(jCacheProxies.size()); |
| |
| for (Map.Entry<String, IgniteCacheProxy<?, ?>> entry : jCacheProxies.entrySet()) { |
| if (entry.getValue().context().userCache()) |
| res.add(entry.getValue()); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Default cache. |
| */ |
| public <K, V> GridCacheAdapter<K, V> internalCache() { |
| return internalCache(null); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @param <K> type of keys. |
| * @param <V> type of values. |
| * @return Cache instance for given name. |
| */ |
| @SuppressWarnings("unchecked") |
| public <K, V> GridCacheAdapter<K, V> internalCache(@Nullable String name) { |
| if (log.isDebugEnabled()) |
| log.debug("Getting internal cache adapter: " + name); |
| |
| return (GridCacheAdapter<K, V>)caches.get(maskNull(name)); |
| } |
| |
| /** |
| * Cancel all user operations. |
| */ |
| private void cancelFutures() { |
| sharedCtx.mvcc().onStop(); |
| |
| Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping)."); |
| |
| for (IgniteInternalFuture fut : pendingFuts.values()) |
| ((GridFutureAdapter)fut).onDone(err); |
| |
| for (IgniteInternalFuture fut : pendingTemplateFuts.values()) |
| ((GridFutureAdapter)fut).onDone(err); |
| } |
| |
| /** |
| * @return All internal cache instances. |
| */ |
| public Collection<GridCacheAdapter<?, ?>> internalCaches() { |
| return caches.values(); |
| } |
| |
| /** |
| * @param name Cache name. |
| * @return {@code True} if specified cache is system, {@code false} otherwise. |
| */ |
| public boolean systemCache(@Nullable String name) { |
| DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); |
| |
| return desc != null && !desc.cacheType().userCache(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| X.println(">>> "); |
| |
| for (GridCacheAdapter c : caches.values()) { |
| X.println(">>> Cache memory stats [grid=" + ctx.gridName() + ", cache=" + c.name() + ']'); |
| |
| c.context().printMemoryStats(); |
| } |
| } |
| |
| /** |
| * Callback invoked by deployment manager for whenever a class loader |
| * gets undeployed. |
| * |
| * @param ldr Class loader. |
| */ |
| public void onUndeployed(ClassLoader ldr) { |
| if (!ctx.isStopping()) { |
| for (GridCacheAdapter<?, ?> cache : caches.values()) { |
| // Do not notify system caches. |
| if (cache.context().userCache()) |
| cache.onUndeploy(ldr); |
| } |
| } |
| } |
| |
| /** |
| * @return Shared context. |
| */ |
| public <K, V> GridCacheSharedContext<K, V> context() { |
| return (GridCacheSharedContext<K, V>)sharedCtx; |
| } |
| |
| /** |
| * @return Transactions interface implementation. |
| */ |
| public IgniteTransactionsEx transactions() { |
| return transactions; |
| } |
| |
| /** |
| * Registers MBean for cache components. |
| * |
| * @param o Cache component. |
| * @param cacheName Cache name. |
| * @param near Near flag. |
| * @throws IgniteCheckedException If registration failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private void registerMbean(Object o, @Nullable String cacheName, boolean near) |
| throws IgniteCheckedException { |
| assert o != null; |
| |
| MBeanServer srvr = ctx.config().getMBeanServer(); |
| |
| assert srvr != null; |
| |
| cacheName = U.maskName(cacheName); |
| |
| cacheName = near ? cacheName + "-near" : cacheName; |
| |
| for (Class<?> itf : o.getClass().getInterfaces()) { |
| if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) { |
| try { |
| U.registerCacheMBean(srvr, ctx.gridName(), cacheName, o.getClass().getName(), o, |
| (Class<Object>)itf); |
| } |
| catch (JMException e) { |
| throw new IgniteCheckedException("Failed to register MBean for component: " + o, e); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| /** |
| * Unregisters MBean for cache components. |
| * |
| * @param o Cache component. |
| * @param cacheName Cache name. |
| * @param near Near flag. |
| */ |
| private void unregisterMbean(Object o, @Nullable String cacheName, boolean near) { |
| assert o != null; |
| |
| MBeanServer srvr = ctx.config().getMBeanServer(); |
| |
| assert srvr != null; |
| |
| cacheName = U.maskName(cacheName); |
| |
| cacheName = near ? cacheName + "-near" : cacheName; |
| |
| for (Class<?> itf : o.getClass().getInterfaces()) { |
| if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) { |
| try { |
| srvr.unregisterMBean(U.makeCacheMBeanName(ctx.gridName(), cacheName, o.getClass().getName())); |
| } |
| catch (JMException e) { |
| U.error(log, "Failed to unregister MBean for component: " + o, e); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| /** |
| * @param ccfg Cache configuration. |
| * @param objs Extra components. |
| * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface. |
| */ |
| private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object...objs) { |
| Collection<Object> ret = new ArrayList<>(7 + objs.length); |
| |
| ret.add(ccfg.getAffinity()); |
| ret.add(ccfg.getAffinityMapper()); |
| ret.add(ccfg.getEvictionFilter()); |
| ret.add(ccfg.getEvictionPolicy()); |
| ret.add(ccfg.getInterceptor()); |
| |
| NearCacheConfiguration nearCfg = ccfg.getNearConfiguration(); |
| |
| if (nearCfg != null) |
| ret.add(nearCfg.getNearEvictionPolicy()); |
| |
| Collections.addAll(ret, objs); |
| |
| return ret; |
| } |
| |
| /** |
| * @throws IgniteException If transaction exist. |
| */ |
| private void checkEmptyTransactions() throws IgniteException { |
| if (transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) |
| throw new IgniteException("Cannot start/stop cache within lock or transaction."); |
| } |
| |
| /** |
| * @param val Object to check. |
| * @throws IgniteCheckedException If validation failed. |
| * @return Configuration copy. |
| */ |
| private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException { |
| if (val == null) |
| return null; |
| |
| if (val.getCacheStoreFactory() != null) { |
| try { |
| marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), |
| val.getCacheStoreFactory().getClass().getClassLoader()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteCheckedException("Failed to validate cache configuration. " + |
| "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e); |
| } |
| } |
| |
| try { |
| return marshaller.unmarshal(marshaller.marshal(val), val.getClass().getClassLoader()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteCheckedException("Failed to validate cache configuration " + |
| "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e); |
| } |
| } |
| |
| /** |
| * @param name Name to mask. |
| * @return Masked name. |
| */ |
| private static String maskNull(String name) { |
| return name == null ? NULL_NAME : name; |
| } |
| |
| /** |
| * @param name Name to unmask. |
| * @return Unmasked name. |
| */ |
| @SuppressWarnings("StringEquality") |
| private static String unmaskNull(String name) { |
| // Intentional identity equality. |
| return name == NULL_NAME ? null : name; |
| } |
| |
| /** |
| * |
| */ |
| @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") |
| private class DynamicCacheStartFuture extends GridFutureAdapter<Object> { |
| /** Start ID. */ |
| @GridToStringInclude |
| private IgniteUuid deploymentId; |
| |
| /** Cache name. */ |
| private String cacheName; |
| |
| /** Change request. */ |
| @GridToStringInclude |
| private DynamicCacheChangeRequest req; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param deploymentId Deployment ID. |
| * @param req Cache start request. |
| */ |
| private DynamicCacheStartFuture(String cacheName, IgniteUuid deploymentId, DynamicCacheChangeRequest req) { |
| this.deploymentId = deploymentId; |
| this.cacheName = cacheName; |
| this.req = req; |
| } |
| |
| /** |
| * @return Start ID. |
| */ |
| public IgniteUuid deploymentId() { |
| return deploymentId; |
| } |
| |
| /** |
| * @return Request. |
| */ |
| public DynamicCacheChangeRequest request() { |
| return req; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { |
| // Make sure to remove future before completion. |
| pendingFuts.remove(maskNull(cacheName), this); |
| |
| return super.onDone(res, err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DynamicCacheStartFuture.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") |
| private class TemplateConfigurationFuture extends GridFutureAdapter<Object> { |
| /** Start ID. */ |
| @GridToStringInclude |
| private IgniteUuid deploymentId; |
| |
| /** Cache name. */ |
| private String cacheName; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param deploymentId Deployment ID. |
| */ |
| private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) { |
| this.deploymentId = deploymentId; |
| this.cacheName = cacheName; |
| } |
| |
| /** |
| * @return Start ID. |
| */ |
| public IgniteUuid deploymentId() { |
| return deploymentId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { |
| // Make sure to remove future before completion. |
| pendingTemplateFuts.remove(maskNull(cacheName), this); |
| |
| return super.onDone(res, err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(TemplateConfigurationFuture.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class LocalAffinityFunction implements AffinityFunction { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { |
| ClusterNode locNode = null; |
| |
| for (ClusterNode n : affCtx.currentTopologySnapshot()) { |
| if (n.isLocal()) { |
| locNode = n; |
| |
| break; |
| } |
| } |
| |
| if (locNode == null) |
| throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache"); |
| |
| List<List<ClusterNode>> res = new ArrayList<>(partitions()); |
| |
| for (int part = 0; part < partitions(); part++) |
| res.add(Collections.singletonList(locNode)); |
| |
| return Collections.unmodifiableList(res); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void reset() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partitions() { |
| return 1; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partition(Object key) { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeNode(UUID nodeId) { |
| // No-op. |
| } |
| } |
| } |