| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.geode.distributed.internal.InternalDistributedSystem.getAnyInstance; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintStream; |
| import java.io.Reader; |
| import java.io.StringBufferInputStream; |
| import java.io.StringWriter; |
| import java.io.Writer; |
| import java.net.InetSocketAddress; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.net.UnknownHostException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.BiPredicate; |
| import java.util.function.Supplier; |
| import java.util.regex.Pattern; |
| |
| import javax.naming.Context; |
| import javax.transaction.TransactionManager; |
| |
| import com.sun.jna.Native; |
| import com.sun.jna.Platform; |
| import io.micrometer.core.instrument.MeterRegistry; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.ForcedDisconnectException; |
| import org.apache.geode.GemFireCacheException; |
| import org.apache.geode.GemFireConfigException; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.SerializationException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.admin.internal.SystemMemberCacheEventProcessor; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheExistsException; |
| import org.apache.geode.cache.CacheTransactionManager; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.CacheXmlException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.Declarable; |
| import org.apache.geode.cache.DiskStore; |
| import org.apache.geode.cache.DiskStoreFactory; |
| import org.apache.geode.cache.DynamicRegionFactory; |
| import org.apache.geode.cache.EvictionAction; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.GatewayException; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionExistsException; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.RegionService; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.asyncqueue.AsyncEventQueue; |
| import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; |
| import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; |
| import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; |
| import org.apache.geode.cache.client.ClientRegionFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.ClientMetadataService; |
| import org.apache.geode.cache.client.internal.ClientRegionFactoryImpl; |
| import org.apache.geode.cache.client.internal.InternalClientCache; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.control.ResourceManager; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.internal.DefaultQueryService; |
| import org.apache.geode.cache.query.internal.InternalQueryService; |
| import org.apache.geode.cache.query.internal.QueryMonitor; |
| import org.apache.geode.cache.query.internal.cq.CqService; |
| import org.apache.geode.cache.query.internal.cq.CqServiceProvider; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.snapshot.CacheSnapshotService; |
| import org.apache.geode.cache.util.GatewayConflictResolver; |
| import org.apache.geode.cache.wan.GatewayReceiver; |
| import org.apache.geode.cache.wan.GatewayReceiverFactory; |
| import org.apache.geode.cache.wan.GatewaySender; |
| import org.apache.geode.cache.wan.GatewaySenderFactory; |
| import org.apache.geode.distributed.ConfigurationPersistenceService; |
| import org.apache.geode.distributed.ConfigurationProperties; |
| import org.apache.geode.distributed.DistributedLockService; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.Locator; |
| import org.apache.geode.distributed.internal.CacheTime; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionAdvisee; |
| import org.apache.geode.distributed.internal.DistributionAdvisor; |
| import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.InternalLocator; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.ResourceEvent; |
| import org.apache.geode.distributed.internal.ResourceEventsListener; |
| import org.apache.geode.distributed.internal.ServerLocation; |
| import org.apache.geode.distributed.internal.locks.DLockService; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.i18n.LogWriterI18n; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.ClassPathLoader; |
| import org.apache.geode.internal.DSCODE; |
| import org.apache.geode.internal.SystemTimer; |
| import org.apache.geode.internal.Version; |
| import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; |
| import org.apache.geode.internal.cache.backup.BackupService; |
| import org.apache.geode.internal.cache.control.InternalResourceManager; |
| import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; |
| import org.apache.geode.internal.cache.control.ResourceAdvisor; |
| import org.apache.geode.internal.cache.event.EventTrackerExpiryTask; |
| import org.apache.geode.internal.cache.eviction.HeapEvictor; |
| import org.apache.geode.internal.cache.eviction.OffHeapEvictor; |
| import org.apache.geode.internal.cache.execute.util.FindRestEnabledServersFunction; |
| import org.apache.geode.internal.cache.extension.Extensible; |
| import org.apache.geode.internal.cache.extension.ExtensionPoint; |
| import org.apache.geode.internal.cache.extension.SimpleExtensionPoint; |
| import org.apache.geode.internal.cache.ha.HARegionQueue; |
| import org.apache.geode.internal.cache.locks.TXLockService; |
| import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberID; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberManager; |
| import org.apache.geode.internal.cache.snapshot.CacheSnapshotServiceImpl; |
| import org.apache.geode.internal.cache.tier.Acceptor; |
| import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; |
| import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ServerConnection; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySender; |
| import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor; |
| import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener; |
| import org.apache.geode.internal.cache.wan.WANServiceProvider; |
| import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; |
| import org.apache.geode.internal.cache.xmlcache.CacheServerCreation; |
| import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator; |
| import org.apache.geode.internal.cache.xmlcache.CacheXmlParser; |
| import org.apache.geode.internal.cache.xmlcache.CacheXmlPropertyResolver; |
| import org.apache.geode.internal.cache.xmlcache.PropertyResolver; |
| import org.apache.geode.internal.concurrent.ConcurrentHashSet; |
| import org.apache.geode.internal.config.ClusterConfigurationNotAvailableException; |
| import org.apache.geode.internal.jndi.JNDIInvoker; |
| import org.apache.geode.internal.jta.TransactionManagerImpl; |
| import org.apache.geode.internal.lang.ThrowableUtils; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.LoggingExecutors; |
| import org.apache.geode.internal.logging.LoggingThread; |
| import org.apache.geode.internal.monitoring.ThreadsMonitoring; |
| import org.apache.geode.internal.net.SSLConfigurationFactory; |
| import org.apache.geode.internal.net.SocketCreator; |
| import org.apache.geode.internal.offheap.MemoryAllocator; |
| import org.apache.geode.internal.security.SecurableCommunicationChannel; |
| import org.apache.geode.internal.security.SecurityService; |
| import org.apache.geode.internal.security.SecurityServiceFactory; |
| import org.apache.geode.internal.sequencelog.SequenceLoggerImpl; |
| import org.apache.geode.internal.shared.StringPrintWriter; |
| import org.apache.geode.internal.tcp.ConnectionTable; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.internal.util.concurrent.FutureResult; |
| import org.apache.geode.lang.Identifiable; |
| import org.apache.geode.management.internal.JmxManagerAdvisee; |
| import org.apache.geode.management.internal.JmxManagerAdvisor; |
| import org.apache.geode.management.internal.RestAgent; |
| import org.apache.geode.management.internal.beans.ManagementListener; |
| import org.apache.geode.management.internal.configuration.domain.Configuration; |
| import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse; |
| import org.apache.geode.pdx.JSONFormatter; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.pdx.PdxInstanceFactory; |
| import org.apache.geode.pdx.PdxSerializer; |
| import org.apache.geode.pdx.ReflectionBasedAutoSerializer; |
| import org.apache.geode.pdx.internal.AutoSerializableManager; |
| import org.apache.geode.pdx.internal.InternalPdxInstance; |
| import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl; |
| import org.apache.geode.pdx.internal.PdxInstanceImpl; |
| import org.apache.geode.pdx.internal.TypeRegistry; |
| |
| // TODO: somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc. |
| /** |
| * GemFire's implementation of a distributed {@link Cache}. |
| */ |
| @SuppressWarnings("deprecation") |
| public class GemFireCacheImpl implements InternalCache, InternalClientCache, HasCachePerfStats, |
| DistributionAdvisee, CacheTime { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** The default number of seconds to wait for a distributed lock */ |
| public static final int DEFAULT_LOCK_TIMEOUT = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60); |
| |
| /** |
| * The default duration (in seconds) of a lease on a distributed lock |
| */ |
| public static final int DEFAULT_LOCK_LEASE = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120); |
| |
| /** The default "copy on read" attribute value */ |
| public static final boolean DEFAULT_COPY_ON_READ = false; |
| |
| /** |
| * getcachefor |
| * The default amount of time to wait for a {@code netSearch} to complete |
| */ |
| public static final int DEFAULT_SEARCH_TIMEOUT = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300); |
| |
| /** |
| * The {@code CacheLifecycleListener} s that have been registered in this VM |
| */ |
| @MakeNotStatic |
| private static final Set<CacheLifecycleListener> cacheLifecycleListeners = |
| new CopyOnWriteArraySet<>(); |
| |
| /** |
| * Name of the default pool. |
| */ |
| public static final String DEFAULT_POOL_NAME = "DEFAULT"; |
| |
| |
| /** |
| * The number of threads that the QueryMonitor will use to mark queries as cancelled |
| * (see QueryMonitor class for reasons why a query might be cancelled). |
| * That processing is very efficient, so we don't foresee needing to raise this above 1. |
| */ |
| private static final int QUERY_MONITOR_THREAD_POOL_SIZE = 1; |
| |
| /** |
| * If true then when a delta is applied the size of the entry value will be recalculated. If false |
| * (the default) then the size of the entry value is unchanged by a delta application. Not a final |
| * so that tests can change this value. |
| * |
| * TODO: move or static or encapsulate with interface methods |
| * |
| * @since GemFire h****** 6.1.2.9 |
| */ |
| @MutableForTesting |
| static boolean DELTAS_RECALCULATE_SIZE = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DELTAS_RECALCULATE_SIZE"); |
| |
| private static final int EVENT_QUEUE_LIMIT = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_QUEUE_LIMIT", 4096); |
| |
| static final int EVENT_THREAD_LIMIT = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16); |
| |
| /** |
| * System property to limit the max query-execution time. By default its turned off (-1), the time |
| * is set in milliseconds. |
| */ |
| @MutableForTesting |
| public static int MAX_QUERY_EXECUTION_TIME = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1); |
| |
| /** |
| * System property to disable query monitor even if resource manager is in use |
| */ |
| private final boolean queryMonitorDisabledForLowMem = Boolean |
| .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY"); |
| |
| /** |
| * Property set to true if resource manager heap percentage is set and query monitor is required |
| */ |
| @MakeNotStatic |
| private static boolean queryMonitorRequiredForResourceManager = false; |
| |
| /** time in milliseconds */ |
| private static final int FIVE_HOURS = 5 * 60 * 60 * 1000; |
| |
| private static final Pattern DOUBLE_BACKSLASH = Pattern.compile("\\\\"); |
| |
| private volatile ConfigurationResponse configurationResponse; |
| |
| private final InternalDistributedSystem system; |
| |
| private final DistributionManager dm; |
| |
| private final Map<String, InternalRegion> rootRegions; |
| |
| /** |
| * True if this cache is being created by a ClientCacheFactory. |
| */ |
| private final boolean isClient; |
| |
| private PoolFactory poolFactory; |
| |
| /** |
| * It is not final to allow cache.xml parsing to set it. |
| */ |
| private Pool defaultPool; |
| |
| private final ConcurrentMap<String, InternalRegion> pathToRegion = new ConcurrentHashMap<>(); |
| |
| private volatile boolean isInitialized; |
| |
| volatile boolean isClosing = false; // used in Stopper inner class |
| |
| /** Amount of time (in seconds) to wait for a distributed lock */ |
| private int lockTimeout = DEFAULT_LOCK_TIMEOUT; |
| |
| /** Amount of time a lease of a distributed lock lasts */ |
| private int lockLease = DEFAULT_LOCK_LEASE; |
| |
| /** Amount of time to wait for a {@code netSearch} to complete */ |
| private int searchTimeout = DEFAULT_SEARCH_TIMEOUT; |
| |
| private final CachePerfStats cachePerfStats; |
| |
| /** Date on which this instances was created */ |
| private final Date creationDate; |
| |
| /** thread pool for event dispatching */ |
| private final ExecutorService eventThreadPool; |
| |
| /** |
| * the list of all cache servers. CopyOnWriteArrayList is used to allow concurrent add, remove and |
| * retrieval operations. It is assumed that the traversal operations on cache servers list vastly |
| * outnumber the mutative operations such as add, remove. |
| */ |
| private final List<InternalCacheServer> allCacheServers = new CopyOnWriteArrayList<>(); |
| /** |
| * Unmodifiable view of "allCacheServers". |
| */ |
| private final List<CacheServer> unmodifiableAllCacheServers = |
| Collections.unmodifiableList(allCacheServers); |
| |
| /** |
| * Controls updates to the list of all gateway senders |
| * |
| * @see #allGatewaySenders |
| */ |
| private final Object allGatewaySendersLock = new Object(); |
| |
| /** |
| * the set of all gateway senders. It may be fetched safely (for enumeration), but updates must by |
| * synchronized via {@link #allGatewaySendersLock} |
| */ |
| private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet(); |
| |
| /** |
| * The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow |
| * concurrent add, remove and retrieval operations. |
| */ |
| private final Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<>(); |
| |
| /** |
| * The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow |
| * concurrent add, remove and retrieval operations. |
| */ |
| private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>(); |
| |
| private final AtomicReference<GatewayReceiver> gatewayReceiver = new AtomicReference<>(); |
| |
| private final AtomicReference<InternalCacheServer> gatewayReceiverServer = |
| new AtomicReference<>(); |
| |
| /** |
| * PartitionedRegion instances (for required-events notification |
| */ |
| private final Set<PartitionedRegion> partitionedRegions = new HashSet<>(); |
| |
| /** |
| * Fix for 42051 This is a map of regions that are in the process of being destroyed. We could |
| * potentially leave the regions in the pathToRegion map, but that would entail too many changes |
| * at this point in the release. We need to know which regions are being destroyed so that a |
| * profile exchange can get the persistent id of the destroying region and know not to persist |
| * that ID if it receives it as part of the persistent view. |
| */ |
| private final ConcurrentMap<String, DistributedRegion> regionsInDestroy = |
| new ConcurrentHashMap<>(); |
| |
| private final Object allGatewayHubsLock = new Object(); |
| |
| /** |
| * conflict resolver for WAN, if any |
| * |
| * GuardedBy {@link #allGatewayHubsLock} |
| */ |
| private GatewayConflictResolver gatewayConflictResolver; |
| |
| /** Is this is "server" cache? */ |
| private boolean isServer = false; |
| |
| /** transaction manager for this cache */ |
| private final TXManagerImpl transactionManager; |
| |
| private RestAgent restAgent; |
| |
| private boolean isRESTServiceRunning = false; |
| |
| /** Copy on Read feature for all read operations e.g. get */ |
| private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ; |
| |
| /** The named region attributes registered with this cache. */ |
| private final Map<String, RegionAttributes<?, ?>> namedRegionAttributes = |
| Collections.synchronizedMap(new HashMap<>()); |
| |
| /** |
| * if this cache was forced to close due to a forced-disconnect, we retain a |
| * ForcedDisconnectException that can be used as the cause |
| */ |
| private boolean forcedDisconnect; |
| |
| /** |
| * if this cache was forced to close due to a forced-disconnect or system failure, this keeps |
| * track of the reason |
| */ |
| volatile Throwable disconnectCause; // used in Stopper inner class |
| |
| /** context where this cache was created -- for debugging, really... */ |
| private Exception creationStack = null; |
| |
| /** |
| * a system timer task for cleaning up old bridge thread event entries |
| */ |
| private final EventTrackerExpiryTask recordedEventSweeper; |
| |
| private final TombstoneService tombstoneService; |
| |
| /** |
| * DistributedLockService for PartitionedRegions. Remains null until the first PartitionedRegion |
| * is created. Destroyed by GemFireCache when closing the cache. Protected by synchronization on |
| * this GemFireCache. |
| * |
| * GuardedBy prLockServiceLock |
| */ |
| private DistributedLockService prLockService; |
| |
| /** |
| * lock used to access prLockService |
| */ |
| private final Object prLockServiceLock = new Object(); |
| |
| /** |
| * DistributedLockService for GatewaySenders. Remains null until the first GatewaySender is |
| * created. Destroyed by GemFireCache when closing the cache. |
| * |
| * GuardedBy gatewayLockServiceLock |
| */ |
| private volatile DistributedLockService gatewayLockService; |
| |
| /** |
| * Lock used to access gatewayLockService |
| */ |
| private final Object gatewayLockServiceLock = new Object(); |
| |
| private final InternalResourceManager resourceManager; |
| |
| private final BackupService backupService; |
| |
| private HeapEvictor heapEvictor = null; |
| |
| private OffHeapEvictor offHeapEvictor = null; |
| |
| private final Object heapEvictorLock = new Object(); |
| |
| private final Object offHeapEvictorLock = new Object(); |
| |
| private ResourceEventsListener resourceEventsListener; |
| |
| /** |
| * Enabled when CacheExistsException issues arise in debugging |
| * |
| * @see #creationStack |
| */ |
| private static final boolean DEBUG_CREATION_STACK = false; |
| |
| private volatile QueryMonitor queryMonitor; |
| |
| private final Object queryMonitorLock = new Object(); |
| |
| private final PersistentMemberManager persistentMemberManager; |
| |
| private final ClientMetadataService clientMetadataService; |
| |
| private final AtomicBoolean isShutDownAll = new AtomicBoolean(); |
| private final CountDownLatch shutDownAllFinished = new CountDownLatch(1); |
| |
| private final ResourceAdvisor resourceAdvisor; |
| private final JmxManagerAdvisor jmxAdvisor; |
| |
| private final int serialNumber; |
| |
| private final TXEntryStateFactory txEntryStateFactory; |
| |
| private final CacheConfig cacheConfig; |
| |
| private final DiskStoreMonitor diskMonitor; |
| |
| /** |
| * Stores the properties used to initialize declarables. |
| */ |
| private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<>(); |
| |
| /** {@link PropertyResolver} to resolve ${} type property strings */ |
| private final PropertyResolver resolver; |
| |
| private static final boolean XML_PARAMETERIZATION_ENABLED = |
| !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "xml.parameterization.disabled"); |
| |
| /** |
| * {@link ExtensionPoint} support. |
| * |
| * @since GemFire 8.1 |
| */ |
| private final SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<>(this, this); |
| |
| private final CqService cqService; |
| |
| private final Set<RegionListener> regionListeners = new ConcurrentHashSet<>(); |
| |
| private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>(); |
| |
| private final SecurityService securityService; |
| |
| private final Set<RegionEntrySynchronizationListener> synchronizationListeners = |
| new ConcurrentHashSet<>(); |
| |
| private final ClusterConfigurationLoader ccLoader = new ClusterConfigurationLoader(); |
| |
| private Optional<HttpService> httpService = Optional.ofNullable(null); |
| |
| private final MeterRegistry meterRegistry; |
| private final Set<MeterRegistry> meterSubregistries; |
| |
| static { |
| // this works around jdk bug 6427854, reported in ticket #44434 |
| String propertyName = "sun.nio.ch.bugLevel"; |
| String value = System.getProperty(propertyName); |
| if (value == null) { |
| System.setProperty(propertyName, ""); |
| } |
| } |
| |
| /** |
| * Invokes mlockall(). Locks all pages mapped into the address space of the calling process. This |
| * includes the pages of the code, data and stack segment, as well as shared libraries, user space |
| * kernel data, shared memory, and memory-mapped files. All mapped pages are guaranteed to be |
| * resident in RAM when the call returns successfully; the pages are guaranteed to stay in RAM |
| * until later unlocked. |
| * |
| * @param flags MCL_CURRENT 1 - Lock all pages which are currently mapped into the address space |
| * of the process. |
| * |
| * MCL_FUTURE 2 - Lock all pages which will become mapped into the address space of the |
| * process in the future. These could be for instance new pages required by a growing heap |
| * and stack as well as new memory mapped files or shared memory regions. |
| * |
| * @return 0 if success, non-zero if error and errno set |
| */ |
| private static native int mlockall(int flags); |
| |
| public static void lockMemory() { |
| try { |
| Native.register(Platform.C_LIBRARY_NAME); |
| int result = mlockall(1); |
| if (result == 0) { |
| return; |
| } |
| } catch (Throwable t) { |
| throw new IllegalStateException("Error trying to lock memory", t); |
| } |
| |
| int lastError = Native.getLastError(); |
| String message = "mlockall failed: " + lastError; |
| if (lastError == 1 || lastError == 12) { // EPERM || ENOMEM |
| message = "Unable to lock memory due to insufficient free space or privileges. " |
| + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and " |
| + "increase the available memory if needed"; |
| } |
| throw new IllegalStateException(message); |
| } |
| |
| /** |
| * This is for debugging cache-open issues (esp. {@link CacheExistsException}) |
| */ |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("GemFireCache["); |
| sb.append("id = ").append(System.identityHashCode(this)); |
| sb.append("; isClosing = ").append(isClosing); |
| sb.append("; isShutDownAll = ").append(isCacheAtShutdownAll()); |
| sb.append("; created = ").append(creationDate); |
| sb.append("; server = ").append(isServer); |
| sb.append("; copyOnRead = ").append(copyOnRead); |
| sb.append("; lockLease = ").append(lockLease); |
| sb.append("; lockTimeout = ").append(lockTimeout); |
| if (creationStack != null) { |
| // TODO: eliminate anonymous inner class and maybe move this to ExceptionUtils |
| sb.append(System.lineSeparator()).append("Creation context:").append(System.lineSeparator()); |
| OutputStream os = new OutputStream() { |
| @Override |
| public void write(int i) { |
| sb.append((char) i); |
| } |
| }; |
| PrintStream ps = new PrintStream(os); |
| creationStack.printStackTrace(ps); |
| } |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| /** Map of Futures used to track Regions that are being reinitialized */ |
| private final ConcurrentMap<String, FutureResult<InternalRegion>> reinitializingRegions = |
| new ConcurrentHashMap<>(); |
| |
| /** |
| * Returns the last created instance of GemFireCache |
| * |
| * @deprecated use DM.getCache instead |
| */ |
| @Deprecated |
| public static GemFireCacheImpl getInstance() { |
| InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance(); |
| if (system == null) { |
| return null; |
| } |
| GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache(); |
| if (cache == null) { |
| return null; |
| } |
| |
| if (cache.isClosing) { |
| return null; |
| } |
| |
| return cache; |
| |
| } |
| |
| /** |
| * Returns an existing instance. If a cache does not exist throws a cache closed exception. |
| * |
| * @return the existing cache |
| * @throws CacheClosedException if an existing cache can not be found. |
| * @deprecated use DM.getExistingCache instead. |
| */ |
| @Deprecated |
| public static GemFireCacheImpl getExisting() { |
| final GemFireCacheImpl result = getInstance(); |
| if (result != null && !result.isClosing) { |
| return result; |
| } |
| if (result != null) { |
| throw result.getCacheClosedException( |
| "The cache has been closed."); |
| } |
| throw new CacheClosedException( |
| "A cache has not yet been created."); |
| } |
| |
| /** |
| * Returns an existing instance. If a cache does not exist throws an exception. |
| * |
| * @param reason the reason an existing cache is being requested. |
| * @return the existing cache |
| * @throws CacheClosedException if an existing cache can not be found. |
| * @deprecated use DM.getExistingCache instead. |
| */ |
| @Deprecated |
| public static GemFireCacheImpl getExisting(String reason) { |
| GemFireCacheImpl result = getInstance(); |
| if (result == null) { |
| throw new CacheClosedException(reason); |
| } |
| return result; |
| } |
| |
| /** |
| * Pdx is allowed to obtain the cache even while it is being closed |
| * |
| * @deprecated Rather than fishing for a cache with this static method, use a cache that is passed |
| * in to your method. |
| */ |
| @Deprecated |
| public static GemFireCacheImpl getForPdx(String reason) { |
| |
| InternalDistributedSystem system = getAnyInstance(); |
| if (system == null) { |
| throw new CacheClosedException(reason); |
| } |
| GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache(); |
| if (cache == null) { |
| throw new CacheClosedException(reason); |
| } |
| |
| return cache; |
| } |
| |
| /** |
| * Creates a new instance of GemFireCache and populates it according to the {@code cache.xml}, if |
| * appropriate. |
| * |
| * Currently only unit tests set the typeRegistry parameter to a non-null value |
| */ |
| GemFireCacheImpl(boolean isClient, PoolFactory poolFactory, |
| InternalDistributedSystem internalDistributedSystem, CacheConfig cacheConfig, |
| boolean useAsyncEventListeners, TypeRegistry typeRegistry, MeterRegistry meterRegistry, |
| Set<MeterRegistry> meterSubregistries) { |
| this.isClient = isClient; |
| this.poolFactory = poolFactory; |
| this.cacheConfig = cacheConfig; // do early for bug 43213 |
| pdxRegistry = typeRegistry; |
| this.meterRegistry = meterRegistry; |
| this.meterSubregistries = meterSubregistries; |
| |
| // Synchronized to prevent a new cache from being created |
| // before an old one has finished closing |
| synchronized (GemFireCacheImpl.class) { |
| |
| // start JTA transaction manager within this synchronized block |
| // to prevent race with cache close. fixes bug 43987 |
| JNDIInvoker.mapTransactions(internalDistributedSystem); |
| system = internalDistributedSystem; |
| dm = system.getDistributionManager(); |
| |
| if (!isClient) { |
| configurationResponse = requestSharedConfiguration(); |
| |
| // apply the cluster's properties configuration and initialize security using that |
| // configuration |
| ccLoader.applyClusterPropertiesConfiguration(configurationResponse, |
| system.getConfig()); |
| |
| securityService = |
| SecurityServiceFactory.create(system.getConfig().getSecurityProps(), cacheConfig); |
| system.setSecurityService(securityService); |
| } else { |
| // create a no-op security service for client |
| securityService = SecurityServiceFactory.create(); |
| } |
| |
| DistributionConfig systemConfig = internalDistributedSystem.getConfig(); |
| if (!this.isClient && PoolManager.getAll().isEmpty()) { |
| // We only support management on members of a distributed system |
| // Should do this: if (!getSystem().isLoner()) { |
| // but it causes quickstart.CqClientTest to hang |
| boolean disableJmx = systemConfig.getDisableJmx(); |
| if (disableJmx) { |
| logger.info("Running with JMX disabled."); |
| } else { |
| resourceEventsListener = new ManagementListener(system); |
| system.addResourceListener(resourceEventsListener); |
| if (system.isLoner()) { |
| system.getInternalLogWriter() |
| .info("Running in local mode since no locators were specified."); |
| } |
| } |
| |
| } else { |
| logger.info("Running in client mode"); |
| resourceEventsListener = null; |
| } |
| |
| // Don't let admin-only VMs create Cache's just yet. |
| if (dm.getDMType() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) { |
| throw new IllegalStateException( |
| "Cannot create a Cache in an admin-only VM."); |
| } |
| |
| rootRegions = new HashMap<>(); |
| |
| cqService = CqServiceProvider.create(this); |
| |
| // Create the CacheStatistics |
| CachePerfStats.enableClockStats = system.getConfig().getEnableTimeStatistics(); |
| cachePerfStats = new CachePerfStats(internalDistributedSystem.getStatisticsManager()); |
| |
| transactionManager = new TXManagerImpl(cachePerfStats, this); |
| dm.addMembershipListener(transactionManager); |
| |
| creationDate = new Date(); |
| |
| persistentMemberManager = new PersistentMemberManager(); |
| |
| if (useAsyncEventListeners) { |
| eventThreadPool = LoggingExecutors.newThreadPoolWithFixedFeed("Message Event Thread", |
| command -> { |
| ConnectionTable.threadWantsSharedResources(); |
| command.run(); |
| }, EVENT_THREAD_LIMIT, cachePerfStats.getEventPoolHelper(), 1000, |
| getThreadMonitorObj(), |
| EVENT_QUEUE_LIMIT); |
| } else { |
| eventThreadPool = null; |
| } |
| |
| // Initialize the advisor here, but wait to exchange profiles until cache is fully built |
| resourceAdvisor = ResourceAdvisor.createResourceAdvisor(this); |
| |
| // Initialize the advisor here, but wait to exchange profiles until cache is fully built |
| jmxAdvisor = JmxManagerAdvisor |
| .createJmxManagerAdvisor(new JmxManagerAdvisee(getCacheForProcessingClientRequests())); |
| |
| resourceManager = InternalResourceManager.createResourceManager(this); |
| serialNumber = DistributionAdvisor.createSerialNumber(); |
| |
| getInternalResourceManager().addResourceListener(ResourceType.HEAP_MEMORY, getHeapEvictor()); |
| |
| /* |
| * Only bother creating an off-heap evictor if we have off-heap memory enabled. |
| */ |
| if (null != getOffHeapStore()) { |
| getInternalResourceManager().addResourceListener(ResourceType.OFFHEAP_MEMORY, |
| getOffHeapEvictor()); |
| } |
| |
| recordedEventSweeper = createEventTrackerExpiryTask(); |
| tombstoneService = TombstoneService.initialize(this); |
| |
| TypeRegistry.init(); |
| basicSetPdxSerializer(this.cacheConfig.getPdxSerializer()); |
| TypeRegistry.open(); |
| |
| if (!isClient()) { |
| // Initialize the QRM thread frequency to default (1 second )to prevent spill |
| // over from previous Cache , as the interval is stored in a static |
| // volatile field. |
| HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); |
| } |
| FunctionService.registerFunction(new PRContainsValueFunction()); |
| expirationScheduler = new ExpirationScheduler(system); |
| |
| // uncomment following line when debugging CacheExistsException |
| if (DEBUG_CREATION_STACK) { |
| creationStack = new Exception( |
| String.format("Created GemFireCache %s", toString())); |
| } |
| |
| txEntryStateFactory = TXEntryState.getFactory(); |
| if (XML_PARAMETERIZATION_ENABLED) { |
| // If product properties file is available replace properties from there |
| Properties userProps = system.getConfig().getUserDefinedProps(); |
| if (userProps != null && !userProps.isEmpty()) { |
| resolver = new CacheXmlPropertyResolver(false, |
| PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, userProps); |
| } else { |
| resolver = new CacheXmlPropertyResolver(false, |
| PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, null); |
| } |
| } else { |
| resolver = null; |
| } |
| |
| SystemFailure.signalCacheCreate(); |
| |
| diskMonitor = new DiskStoreMonitor(systemConfig.getLogFile()); |
| |
| addRegionEntrySynchronizationListener(new GatewaySenderQueueEntrySynchronizationListener()); |
| backupService = new BackupService(this); |
| if (!this.isClient) { |
| if (systemConfig.getHttpServicePort() == 0) { |
| logger.info("HttpService is disabled with http-serivce-port = 0"); |
| httpService = Optional.empty(); |
| } else { |
| try { |
| httpService = Optional.of(new HttpService(systemConfig.getHttpServiceBindAddress(), |
| systemConfig.getHttpServicePort(), SSLConfigurationFactory |
| .getSSLConfigForComponent(systemConfig, SecurableCommunicationChannel.WEB))); |
| } catch (Throwable ex) { |
| logger.warn("Could not enable HttpService: {}", ex.getMessage()); |
| } |
| } |
| } |
| } // synchronized |
| |
| clientMetadataService = new ClientMetadataService(this); |
| } |
| |
| @Override |
| public void throwCacheExistsException() { |
| throw new CacheExistsException(this, String.format("%s: An open cache already exists.", this), |
| creationStack); |
| } |
| |
| @Override |
| public MeterRegistry getMeterRegistry() { |
| return meterRegistry; |
| } |
| |
| /** generate XML for the cache before shutting down due to forced disconnect */ |
| public void saveCacheXmlForReconnect() { |
| // there are two versions of this method so it can be unit-tested |
| boolean sharedConfigEnabled = |
| getDistributionManager().getConfig().getUseSharedConfiguration(); |
| |
| if (!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile") |
| && !sharedConfigEnabled) { |
| try { |
| logger.info("generating XML to rebuild the cache after reconnect completes"); |
| StringPrintWriter pw = new StringPrintWriter(); |
| CacheXmlGenerator.generate((Cache) this, pw, false); |
| String cacheXML = pw.toString(); |
| getCacheConfig().setCacheXMLDescription(cacheXML); |
| logger.info("XML generation completed: {}", cacheXML); |
| } catch (CancelException e) { |
| logger.info("Unable to generate XML description for reconnect of cache due to exception", |
| e); |
| } |
| } else if (sharedConfigEnabled && !getCacheServers().isEmpty()) { |
| // we need to retain a cache-server description if this JVM was started by gfsh |
| List<CacheServerCreation> list = new ArrayList<>(getCacheServers().size()); |
| for (final Object o : getCacheServers()) { |
| CacheServerImpl cs = (CacheServerImpl) o; |
| if (cs.isDefaultServer()) { |
| CacheServerCreation bsc = new CacheServerCreation(this, cs); |
| list.add(bsc); |
| } |
| } |
| getCacheConfig().setCacheServerCreation(list); |
| logger.info("CacheServer configuration saved"); |
| } |
| } |
| |
| @Override |
| public Set<MeterRegistry> getMeterSubregistries() { |
| return meterSubregistries; |
| } |
| |
| @Override |
| public Optional<HttpService> getHttpService() { |
| return httpService; |
| } |
| |
| @Override |
| public void reLoadClusterConfiguration() throws IOException, ClassNotFoundException { |
| configurationResponse = requestSharedConfiguration(); |
| if (configurationResponse != null) { |
| ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse); |
| ccLoader.applyClusterPropertiesConfiguration(configurationResponse, |
| system.getConfig()); |
| ccLoader.applyClusterXmlConfiguration(this, configurationResponse, |
| system.getConfig().getGroups()); |
| initializeDeclarativeCache(); |
| } |
| } |
| |
| /** |
| * Initialize the EventTracker's timer task. This is stored for tracking and shutdown purposes |
| */ |
| private EventTrackerExpiryTask createEventTrackerExpiryTask() { |
| long lifetimeInMillis = |
| Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", |
| PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3); |
| EventTrackerExpiryTask task = new EventTrackerExpiryTask(lifetimeInMillis); |
| getCCPTimer().scheduleAtFixedRate(task, lifetimeInMillis, lifetimeInMillis); |
| return task; |
| } |
| |
| @Override |
| public SecurityService getSecurityService() { |
| return securityService; |
| } |
| |
| @Override |
| public boolean isRESTServiceRunning() { |
| return isRESTServiceRunning; |
| } |
| |
| @Override |
| public void setRESTServiceRunning(boolean isRESTServiceRunning) { |
| this.isRESTServiceRunning = isRESTServiceRunning; |
| } |
| |
| /** |
| * Used by Hydra tests to get handle of Rest Agent |
| * |
| */ |
| @Override |
| public RestAgent getRestAgent() { |
| return restAgent; |
| } |
| |
| /** |
| * Request the shared configuration from the locator(s) which have the Cluster config service |
| * running |
| */ |
| ConfigurationResponse requestSharedConfiguration() { |
| final DistributionConfig config = system.getConfig(); |
| |
| if (!(dm instanceof ClusterDistributionManager)) { |
| return null; |
| } |
| |
| // do nothing if this vm is/has locator or this is a client |
| if (dm.getDMType() == ClusterDistributionManager.LOCATOR_DM_TYPE || isClient |
| || Locator.getLocator() != null) { |
| return null; |
| } |
| |
| // can't simply return null if server is not using shared configuration, since we need to find |
| // out if the locator is running in secure mode or not, if yes, then we need to throw an |
| // exception if server is not using cluster config. |
| |
| Map<InternalDistributedMember, Collection<String>> locatorsWithClusterConfig = |
| getDistributionManager().getAllHostedLocatorsWithSharedConfiguration(); |
| |
| // If there are no locators with Shared configuration, that means the system has been started |
| // without shared configuration then do not make requests to the locators. |
| if (locatorsWithClusterConfig.isEmpty()) { |
| logger.info("No locator(s) found with cluster configuration service"); |
| return null; |
| } |
| |
| try { |
| ConfigurationResponse response = ccLoader.requestConfigurationFromLocators( |
| system.getConfig().getGroups(), locatorsWithClusterConfig.keySet()); |
| |
| // log the configuration received from the locator |
| logger.info("Received cluster configuration from the locator"); |
| logger.info(response.describeConfig()); |
| |
| Configuration clusterConfig = |
| response.getRequestedConfiguration().get(ConfigurationPersistenceService.CLUSTER_CONFIG); |
| Properties clusterSecProperties = |
| clusterConfig == null ? new Properties() : clusterConfig.getGemfireProperties(); |
| |
| // If not using shared configuration, return null or throw an exception is locator is secured |
| if (!config.getUseSharedConfiguration()) { |
| if (clusterSecProperties.containsKey(ConfigurationProperties.SECURITY_MANAGER)) { |
| throw new GemFireConfigException( |
| "A server must use cluster configuration when joining a secured cluster."); |
| } else { |
| logger.info( |
| "The cache has been created with use-cluster-configuration=false. It will not receive any cluster configuration"); |
| return null; |
| } |
| } |
| |
| Properties serverSecProperties = config.getSecurityProps(); |
| // check for possible mis-configuration |
| if (isMisConfigured(clusterSecProperties, serverSecProperties, |
| ConfigurationProperties.SECURITY_MANAGER) |
| || isMisConfigured(clusterSecProperties, serverSecProperties, |
| ConfigurationProperties.SECURITY_POST_PROCESSOR)) { |
| throw new GemFireConfigException( |
| "A server cannot specify its own security-manager or security-post-processor when using cluster configuration"); |
| } |
| return response; |
| |
| } catch (ClusterConfigurationNotAvailableException e) { |
| throw new GemFireConfigException( |
| "cluster configuration service not available", e); |
| } catch (UnknownHostException e) { |
| throw new GemFireConfigException(e.getLocalizedMessage(), e); |
| } |
| } |
| |
| /** |
| * When called, clusterProps and serverProps and key could not be null |
| */ |
| static boolean isMisConfigured(Properties clusterProps, Properties serverProps, String key) { |
| String clusterPropValue = clusterProps.getProperty(key); |
| String serverPropValue = serverProps.getProperty(key); |
| |
| // if this server prop is not specified, this is always OK. |
| if (StringUtils.isBlank(serverPropValue)) |
| return false; |
| |
| // server props is not blank, but cluster props is blank, NOT OK. |
| if (StringUtils.isBlank(clusterPropValue)) |
| return true; |
| |
| // at this point check for equality |
| return !clusterPropValue.equals(serverPropValue); |
| } |
| |
| /** |
| * Used by unit tests to force cache creation to use a test generated cache.xml |
| */ |
| @MutableForTesting |
| public static File testCacheXml = null; |
| |
| /** |
| * @return true if cache is created using a ClientCacheFactory |
| * @see #hasPool() |
| */ |
| @Override |
| public boolean isClient() { |
| return isClient; |
| } |
| |
| /** |
| * Method to check for GemFire client. In addition to checking for ClientCacheFactory, this method |
| * checks for any defined pools. |
| * |
| * @return true if the cache has pools declared |
| */ |
| @Override |
| public boolean hasPool() { |
| return isClient || !getAllPools().isEmpty(); |
| } |
| |
| private static Collection<Pool> getAllPools() { |
| Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values(); |
| for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) { |
| PoolImpl pool = (PoolImpl) itr.next(); |
| if (pool.isUsedByGateway()) { |
| itr.remove(); |
| } |
| } |
| return pools; |
| } |
| |
| /** |
| * May return null (even on a client). |
| */ |
| @Override |
| public synchronized Pool getDefaultPool() { |
| if (defaultPool == null) { |
| determineDefaultPool(); |
| } |
| return defaultPool; |
| } |
| |
| /** |
| * Perform initialization, solve the early escaped reference problem by putting publishing |
| * references to this instance in this method (vs. the constructor). |
| */ |
| @Override |
| public void initialize() { |
| for (CacheLifecycleListener listener : cacheLifecycleListeners) { |
| listener.cacheCreated(this); |
| } |
| |
| if (isClient()) { |
| initializeClientRegionShortcuts(this); |
| } else { |
| initializeRegionShortcuts(this); |
| } |
| |
| // set ClassPathLoader and then deploy cluster config jars |
| ClassPathLoader.setLatestToDefault(system.getConfig().getDeployWorkingDir()); |
| |
| try { |
| ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse); |
| } catch (IOException | ClassNotFoundException e) { |
| throw new GemFireConfigException( |
| "Exception while deploying the jars received as a part of cluster Configuration", |
| e); |
| } |
| |
| SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE); |
| resourceAdvisor.initializationGate(); |
| |
| // Register function that we need to execute to fetch available REST service endpoints in DS |
| FunctionService.registerFunction(new FindRestEnabledServersFunction()); |
| |
| // moved this after initializeDeclarativeCache because in the future |
| // distributed system creation will not happen until we have read |
| // cache.xml file. |
| // For now this needs to happen before cache.xml otherwise |
| // we will not be ready for all the events that cache.xml |
| // processing can deliver (region creation, etc.). |
| // This call may need to be moved inside initializeDeclarativeCache. |
| jmxAdvisor.initializationGate(); // Entry to GemFire Management service |
| |
| // this starts up the ManagementService, register and federate the internal beans |
| system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this); |
| |
| initializeServices(); |
| |
| boolean completedCacheXml = false; |
| try { |
| if (!isClient) { |
| applyJarAndXmlFromClusterConfig(); |
| } |
| initializeDeclarativeCache(); |
| completedCacheXml = true; |
| } catch (RuntimeException e) { |
| logger.error("Cache initialization for {} failed because: {}", this, e); // fix GEODE-3038 |
| throw e; |
| } finally { |
| if (!completedCacheXml) { |
| // so initializeDeclarativeCache threw an exception |
| try { |
| close(); // fix for bug 34041 |
| } catch (Throwable ignore) { |
| // I don't want init to throw an exception that came from the close. |
| // I want it to throw the original exception that came from initializeDeclarativeCache. |
| } |
| configurationResponse = null; |
| } |
| } |
| |
| startColocatedJmxManagerLocator(); |
| |
| startRestAgentServer(this); |
| |
| isInitialized = true; |
| } |
| |
| void applyJarAndXmlFromClusterConfig() { |
| if (configurationResponse == null) { |
| // Deploy all the jars from the deploy working dir. |
| ClassPathLoader.getLatest().getJarDeployer().loadPreviouslyDeployedJarsFromDisk(); |
| } |
| ccLoader.applyClusterXmlConfiguration(this, configurationResponse, |
| system.getConfig().getGroups()); |
| } |
| |
| /** |
| * Initialize any services that provided as extensions to the cache using the service loader |
| * mechanism. |
| */ |
| private void initializeServices() { |
| ServiceLoader<CacheService> loader = ServiceLoader.load(CacheService.class); |
| for (CacheService service : loader) { |
| service.init(this); |
| services.put(service.getInterface(), service); |
| system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service); |
| logger.info("Initialized cache service {}", service.getClass().getName()); |
| } |
| } |
| |
| private boolean isServerNode() { |
| return system.getDistributedMember() |
| .getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE |
| && system.getDistributedMember() |
| .getVmKind() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE |
| && !isClient(); |
| } |
| |
| private void startRestAgentServer(GemFireCacheImpl cache) { |
| if (system.getConfig().getStartDevRestApi() && isServerNode()) { |
| restAgent = new RestAgent(system.getConfig(), securityService); |
| restAgent.start(cache); |
| } else { |
| restAgent = null; |
| } |
| } |
| |
| |
| |
| @Override |
| public URL getCacheXmlURL() { |
| if (getMyId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) { |
| return null; |
| } |
| File xmlFile = testCacheXml; |
| if (xmlFile == null) { |
| xmlFile = system.getConfig().getCacheXmlFile(); |
| } |
| if (xmlFile.getName().isEmpty()) { |
| return null; |
| } |
| |
| URL url; |
| if (!xmlFile.exists() || !xmlFile.isFile()) { |
| // do a resource search |
| String resource = xmlFile.getPath(); |
| resource = DOUBLE_BACKSLASH.matcher(resource).replaceAll("/"); |
| if (resource.length() > 1 && resource.startsWith("/")) { |
| resource = resource.substring(1); |
| } |
| url = ClassPathLoader.getLatest().getResource(getClass(), resource); |
| } else { |
| try { |
| url = xmlFile.toURL(); |
| } catch (MalformedURLException ex) { |
| throw new CacheXmlException( |
| String.format("Could not convert XML file %s to an URL.", |
| xmlFile), |
| ex); |
| } |
| } |
| if (url == null) { |
| File defaultFile = DistributionConfig.DEFAULT_CACHE_XML_FILE; |
| if (!xmlFile.equals(defaultFile)) { |
| if (!xmlFile.exists()) { |
| throw new CacheXmlException( |
| String.format("Declarative Cache XML file/resource %s does not exist.", |
| xmlFile)); |
| } else { |
| throw new CacheXmlException( |
| String.format("Declarative XML file %s is not a file.", |
| xmlFile)); |
| } |
| } |
| } |
| |
| return url; |
| } |
| |
| /** |
| * Initializes the contents of this {@code Cache} according to the declarative caching XML file |
| * specified by the given {@code DistributedSystem}. Note that this operation cannot be performed |
| * in the constructor because creating regions in the cache, etc. uses the cache itself (which |
| * isn't initialized until the constructor returns). |
| * |
| * @throws CacheXmlException If something goes wrong while parsing the declarative caching XML |
| * file. |
| * @throws TimeoutException If a {@link Region#put(Object, Object)}times out while initializing |
| * the cache. |
| * @throws CacheWriterException If a {@code CacheWriterException} is thrown while initializing the |
| * cache. |
| * @throws RegionExistsException If the declarative caching XML file describes a region that |
| * already exists (including the root region). |
| * @throws GatewayException If a {@code GatewayException} is thrown while initializing the cache. |
| * |
| * @see #loadCacheXml |
| */ |
| private void initializeDeclarativeCache() |
| throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException { |
| URL url = getCacheXmlURL(); |
| String cacheXmlDescription = cacheConfig.getCacheXMLDescription(); |
| if (url == null && cacheXmlDescription == null) { |
| initializePdxRegistry(); |
| readyDynamicRegionFactory(); |
| return; // nothing needs to be done |
| } |
| |
| InputStream stream = null; |
| try { |
| logCacheXML(url, cacheXmlDescription); |
| if (cacheXmlDescription != null) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("initializing cache with generated XML: {}", cacheXmlDescription); |
| } |
| stream = new StringBufferInputStream(cacheXmlDescription); |
| } else { |
| stream = url.openStream(); |
| } |
| loadCacheXml(stream); |
| |
| } catch (IOException ex) { |
| throw new CacheXmlException( |
| String.format("While opening Cache XML %s the following error occurred %s", |
| url.toString(), ex)); |
| |
| } catch (CacheXmlException ex) { |
| throw new CacheXmlException(String.format("While reading Cache XML %s. %s", |
| url, ex.getMessage()), ex.getCause()); |
| |
| } finally { |
| closeQuietly(stream); |
| } |
| } |
| |
| private static void logCacheXML(URL url, String cacheXmlDescription) { |
| if (cacheXmlDescription == null) { |
| StringBuilder sb = new StringBuilder(); |
| BufferedReader br = null; |
| try { |
| final String lineSeparator = System.getProperty("line.separator"); |
| br = new BufferedReader(new InputStreamReader(url.openStream())); |
| String line = br.readLine(); |
| while (line != null) { |
| if (!line.isEmpty()) { |
| sb.append(lineSeparator).append(line); |
| } |
| line = br.readLine(); |
| } |
| } catch (IOException ignore) { |
| } finally { |
| closeQuietly(br); |
| } |
| logger.info("Initializing cache using {}:{}", |
| new Object[] {url.toString(), sb.toString()}); |
| } else { |
| logger.info( |
| "Initializing cache using {}:{}", |
| new Object[] {"generated description from old cache", cacheXmlDescription}); |
| } |
| } |
| |
| @Override |
| public synchronized void initializePdxRegistry() { |
| if (pdxRegistry == null) { |
| // The member with locator is initialized with a NullTypePdxRegistration |
| if (getMyId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) { |
| pdxRegistry = new TypeRegistry(this, true); |
| } else { |
| pdxRegistry = new TypeRegistry(this, false); |
| } |
| pdxRegistry.initialize(); |
| } |
| } |
| |
| /** |
| * Call to make this vm's dynamic region factory ready. Public so it can be called from |
| * CacheCreation during xml processing |
| */ |
| @Override |
| public void readyDynamicRegionFactory() { |
| try { |
| ((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).internalInit(this); |
| } catch (CacheException ce) { |
| throw new GemFireCacheException( |
| "dynamic region initialization failed", |
| ce); |
| } |
| } |
| |
| /** |
| * create diskStore factory with default attributes |
| * |
| * @since GemFire prPersistSprint2 |
| */ |
| @Override |
| public DiskStoreFactory createDiskStoreFactory() { |
| return new DiskStoreFactoryImpl(this); |
| } |
| |
| /** |
| * create diskStore factory with predefined attributes |
| * |
| * @since GemFire prPersistSprint2 |
| */ |
| @Override |
| public DiskStoreFactory createDiskStoreFactory(DiskStoreAttributes attrs) { |
| return new DiskStoreFactoryImpl(this, attrs); |
| } |
| |
| class Stopper extends CancelCriterion { |
| |
| @Override |
| public String cancelInProgress() { |
| String reason = getDistributedSystem().getCancelCriterion().cancelInProgress(); |
| if (reason != null) { |
| return reason; |
| } |
| if (disconnectCause != null) { |
| return disconnectCause.getMessage(); |
| } |
| if (isClosing) { |
| return "The cache is closed."; // this + ": closed"; |
| } |
| return null; |
| } |
| |
| @Override |
| public RuntimeException generateCancelledException(Throwable throwable) { |
| String reason = cancelInProgress(); |
| if (reason == null) { |
| return null; |
| } |
| RuntimeException result = |
| getDistributedSystem().getCancelCriterion().generateCancelledException(throwable); |
| if (result != null) { |
| return result; |
| } |
| if (disconnectCause == null) { |
| // No root cause, specify the one given and be done with it. |
| return new CacheClosedException(reason, throwable); |
| } |
| |
| if (throwable == null) { |
| // Caller did not specify any root cause, so just use our own. |
| return new CacheClosedException(reason, disconnectCause); |
| } |
| |
| // Attempt to stick rootCause at tail end of the exception chain. |
| try { |
| ThrowableUtils.setRootCause(throwable, disconnectCause); |
| return new CacheClosedException(reason, throwable); |
| } catch (IllegalStateException ignore) { |
| // Bug 39496 (JRockit related) Give up. The following |
| // error is not entirely sane but gives the correct general picture. |
| return new CacheClosedException(reason, disconnectCause); |
| } |
| } |
| } |
| |
| private final Stopper stopper = new Stopper(); |
| |
| @Override |
| public CancelCriterion getCancelCriterion() { |
| return stopper; |
| } |
| |
| /** return true if the cache was closed due to being shunned by other members */ |
| @Override |
| public boolean forcedDisconnect() { |
| return forcedDisconnect || system.forcedDisconnect(); |
| } |
| |
| /** return a CacheClosedException with the given reason */ |
| @Override |
| public CacheClosedException getCacheClosedException(String reason) { |
| return getCacheClosedException(reason, null); |
| } |
| |
| /** return a CacheClosedException with the given reason and cause */ |
| @Override |
| public CacheClosedException getCacheClosedException(String reason, Throwable cause) { |
| CacheClosedException result; |
| if (cause != null) { |
| result = new CacheClosedException(reason, cause); |
| } else if (disconnectCause != null) { |
| result = new CacheClosedException(reason, disconnectCause); |
| } else { |
| result = new CacheClosedException(reason); |
| } |
| return result; |
| } |
| |
| /** if the cache was forcibly closed this exception will reflect the cause */ |
| @Override |
| public Throwable getDisconnectCause() { |
| return disconnectCause; |
| } |
| |
| /** |
| * Set to true during a cache close if user requested durable subscriptions to be kept. |
| * |
| * @since GemFire 5.7 |
| */ |
| private boolean keepAlive; |
| |
| /** |
| * Returns true if durable subscriptions (registrations and queries) should be preserved. |
| * |
| * @since GemFire 5.7 |
| */ |
| @Override |
| public boolean keepDurableSubscriptionsAlive() { |
| return keepAlive; |
| } |
| |
| /** |
| * break any potential circularity in {@link #loadEmergencyClasses()} |
| */ |
| @MakeNotStatic |
| private static volatile boolean emergencyClassesLoaded = false; |
| |
| /** |
| * Ensure that all the necessary classes for closing the cache are loaded |
| * |
| * @see SystemFailure#loadEmergencyClasses() |
| */ |
| public static void loadEmergencyClasses() { |
| if (emergencyClassesLoaded) |
| return; |
| emergencyClassesLoaded = true; |
| InternalDistributedSystem.loadEmergencyClasses(); |
| AcceptorImpl.loadEmergencyClasses(); |
| PoolManagerImpl.loadEmergencyClasses(); |
| } |
| |
| /** |
| * Close the distributed system, cache servers, and gateways. Clears the rootRegions and |
| * partitionedRegions map. Marks the cache as closed. |
| * |
| * @see SystemFailure#emergencyClose() |
| */ |
| public static void emergencyClose() { |
| GemFireCacheImpl cache = getInstance(); |
| if (cache == null) { |
| return; |
| } |
| |
| // leave the PdxSerializer set if we have one to prevent 43412 |
| |
| // Shut down messaging first |
| InternalDistributedSystem ids = cache.system; |
| if (ids != null) { |
| ids.emergencyClose(); |
| } |
| |
| cache.disconnectCause = SystemFailure.getFailure(); |
| cache.isClosing = true; |
| |
| for (InternalCacheServer cacheServer : cache.allCacheServers) { |
| Acceptor acceptor = cacheServer.getAcceptor(); |
| if (acceptor != null) { |
| acceptor.emergencyClose(); |
| } |
| } |
| |
| InternalCacheServer receiverServer = cache.gatewayReceiverServer.get(); |
| Acceptor acceptor = receiverServer.getAcceptor(); |
| if (acceptor != null) { |
| acceptor.emergencyClose(); |
| } |
| |
| PoolManagerImpl.emergencyClose(); |
| |
| // rootRegions is intentionally *not* synchronized. The |
| // implementation of clear() does not currently allocate objects. |
| cache.rootRegions.clear(); |
| |
| // partitionedRegions is intentionally *not* synchronized, The |
| // implementation of clear() does not currently allocate objects. |
| cache.partitionedRegions.clear(); |
| } |
| |
| @Override |
| public boolean isCacheAtShutdownAll() { |
| return isShutDownAll.get(); |
| } |
| |
| /** |
| * Number of threads used to close PRs in shutdownAll. By default is the number of PRs in the |
| * cache |
| */ |
| private static final int shutdownAllPoolSize = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SHUTDOWN_ALL_POOL_SIZE", -1); |
| |
| private void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) { |
| for (final PartitionedRegion pr : prSubMap.values()) { |
| shutDownOnePRGracefully(pr); |
| } |
| } |
| |
| @Override |
| public void shutDownAll() { |
| if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { |
| try { |
| CacheObserverHolder.getInstance().beforeShutdownAll(); |
| } finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| if (!isShutDownAll.compareAndSet(false, true)) { |
| // it's already doing shutdown by another thread |
| try { |
| shutDownAllFinished.await(); |
| } catch (InterruptedException ignore) { |
| logger.debug( |
| "Shutdown all interrupted while waiting for another thread to do the shutDownAll"); |
| Thread.currentThread().interrupt(); |
| } |
| return; |
| } |
| synchronized (GemFireCacheImpl.class) { |
| try { |
| boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); |
| |
| if (testIGE) { |
| throw new InternalGemFireError( |
| "unexpected exception"); |
| } |
| |
| // bug 44031 requires multithread shutDownAll should be grouped |
| // by root region. However, shutDownAllDuringRecovery.conf test revealed that |
| // we have to close colocated child regions first. |
| // Now check all the PR, if anyone has colocate-with attribute, sort all the |
| // PRs by colocation relationship and close them sequentially, otherwise still |
| // group them by root region. |
| SortedMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); |
| if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { |
| ExecutorService es = getShutdownAllExecutorService(prTrees.size()); |
| for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { |
| es.execute(() -> { |
| ConnectionTable.threadWantsSharedResources(); |
| shutdownSubTreeGracefully(prSubMap); |
| }); |
| } // for each root |
| es.shutdown(); |
| try { |
| es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); |
| } catch (InterruptedException ignore) { |
| logger |
| .debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); |
| } |
| |
| } else { |
| for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { |
| shutdownSubTreeGracefully(prSubMap); |
| } |
| } |
| |
| close("Shut down all members", null, false, true); |
| } finally { |
| shutDownAllFinished.countDown(); |
| } |
| } |
| } |
| |
| private ExecutorService getShutdownAllExecutorService(int size) { |
| return LoggingExecutors |
| .newFixedThreadPool("ShutdownAll-", true, |
| shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize); |
| } |
| |
| private void shutDownOnePRGracefully(PartitionedRegion partitionedRegion) { |
| boolean acquiredLock = false; |
| try { |
| partitionedRegion.acquireDestroyLock(); |
| acquiredLock = true; |
| |
| synchronized (partitionedRegion.getRedundancyProvider()) { |
| if (partitionedRegion.isDataStore() && partitionedRegion.getDataStore() != null |
| && partitionedRegion.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) { |
| int numBuckets = partitionedRegion.getTotalNumberOfBuckets(); |
| @SuppressWarnings("unchecked") |
| Map<InternalDistributedMember, PersistentMemberID>[] bucketMaps = new Map[numBuckets]; |
| PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore(); |
| |
| // lock all the primary buckets |
| Set<Entry<Integer, BucketRegion>> bucketEntries = dataStore.getAllLocalBuckets(); |
| for (Entry e : bucketEntries) { |
| BucketRegion bucket = (BucketRegion) e.getValue(); |
| if (bucket == null || bucket.isDestroyed) { |
| // bucket region could be destroyed in race condition |
| continue; |
| } |
| bucket.getBucketAdvisor().tryLockIfPrimary(); |
| |
| // get map <InternalDistributedMember, persistentID> for this bucket's |
| // remote members |
| bucketMaps[bucket.getId()] = |
| bucket.getBucketAdvisor().adviseInitializedPersistentMembers(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: PR {}: initialized persistent members for {}:{}", |
| partitionedRegion.getName(), bucket.getId(), bucketMaps[bucket.getId()]); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: All buckets for PR {} are locked.", |
| partitionedRegion.getName()); |
| } |
| |
| // send lock profile update to other members |
| partitionedRegion.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED); |
| new UpdateAttributesProcessor(partitionedRegion).distribute(false); |
| partitionedRegion.getRegionAdvisor() |
| .waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED); |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: PR {}: all bucketLock profiles received.", |
| partitionedRegion.getName()); |
| } |
| |
| // if async write, do flush |
| if (!partitionedRegion.getAttributes().isDiskSynchronous()) { |
| // several PRs might share the same diskStore, we will only flush once |
| // even flush is called several times. |
| partitionedRegion.getDiskStore().forceFlush(); |
| // send flush profile update to other members |
| partitionedRegion.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED); |
| new UpdateAttributesProcessor(partitionedRegion).distribute(false); |
| partitionedRegion.getRegionAdvisor() |
| .waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED); |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: PR {}: all flush profiles received.", |
| partitionedRegion.getName()); |
| } |
| } // async write |
| |
| // persist other members to OFFLINE_EQUAL for each bucket region |
| // iterate through all the bucketMaps and exclude the items whose |
| // idm is no longer online |
| Set<InternalDistributedMember> membersToPersistOfflineEqual = |
| partitionedRegion.getRegionAdvisor().adviseDataStore(); |
| for (Entry e : bucketEntries) { |
| BucketRegion bucket = (BucketRegion) e.getValue(); |
| if (bucket == null || bucket.isDestroyed) { |
| // bucket region could be destroyed in race condition |
| continue; |
| } |
| Map<InternalDistributedMember, PersistentMemberID> persistMap = |
| getSubMapForLiveMembers(membersToPersistOfflineEqual, bucketMaps[bucket.getId()]); |
| if (persistMap != null) { |
| bucket.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap); |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: PR {}: persisting bucket {}:{}", |
| partitionedRegion.getName(), bucket.getId(), persistMap); |
| } |
| } |
| } |
| |
| // send persisted profile update to other members, let all members to persist |
| // before close the region |
| partitionedRegion.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED); |
| new UpdateAttributesProcessor(partitionedRegion).distribute(false); |
| partitionedRegion.getRegionAdvisor() |
| .waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED); |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", |
| partitionedRegion.getName()); |
| } |
| } // dataStore |
| |
| // after done all steps for buckets, close partitionedRegion |
| // close accessor directly |
| RegionEventImpl event = new RegionEventImpl(partitionedRegion, Operation.REGION_CLOSE, null, |
| false, getMyId(), true); |
| try { |
| // not to acquire lock |
| partitionedRegion.basicDestroyRegion(event, false, false, true); |
| } catch (CacheWriterException e) { |
| // not possible with local operation, CacheWriter not called |
| throw new Error( |
| "CacheWriterException should not be thrown in localDestroyRegion", |
| e); |
| } catch (TimeoutException e) { |
| // not possible with local operation, no distributed locks possible |
| throw new Error( |
| "TimeoutException should not be thrown in localDestroyRegion", |
| e); |
| } |
| } // synchronized |
| } catch (CacheClosedException cce) { |
| logger.debug("Encounter CacheClosedException when shutDownAll is closing PR: {}:{}", |
| partitionedRegion.getFullPath(), cce.getMessage()); |
| } catch (CancelException ce) { |
| logger.debug("Encounter CancelException when shutDownAll is closing PR: {}:{}", |
| partitionedRegion.getFullPath(), ce.getMessage()); |
| } catch (RegionDestroyedException rde) { |
| logger.debug("Encounter CacheDestroyedException when shutDownAll is closing PR: {}:{}", |
| partitionedRegion.getFullPath(), rde.getMessage()); |
| } finally { |
| if (acquiredLock) { |
| partitionedRegion.releaseDestroyLock(); |
| } |
| } |
| } |
| |
| private static Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers( |
| Set<InternalDistributedMember> membersToPersistOfflineEqual, |
| Map<InternalDistributedMember, PersistentMemberID> bucketMap) { |
| if (bucketMap == null) { |
| return null; |
| } |
| Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap<>(); |
| for (InternalDistributedMember member : membersToPersistOfflineEqual) { |
| if (bucketMap.containsKey(member)) { |
| persistMap.put(member, bucketMap.get(member)); |
| } |
| } |
| return persistMap; |
| } |
| |
| @Override |
| public void close() { |
| close(false); |
| } |
| |
| @Override |
| public void close(String reason, boolean keepAlive, boolean keepDS) { |
| close(reason, null, keepAlive, keepDS); |
| } |
| |
| @Override |
| public void close(boolean keepAlive) { |
| close("Normal disconnect", null, keepAlive, false); |
| } |
| |
| @Override |
| public void close(String reason, Throwable optionalCause) { |
| close(reason, optionalCause, false, false); |
| } |
| |
| /** |
| * Gets or lazily creates the PartitionedRegion distributed lock service. This call will |
| * synchronize on this GemFireCache. |
| * |
| * @return the PartitionedRegion distributed lock service |
| */ |
| @Override |
| public DistributedLockService getPartitionedRegionLockService() { |
| synchronized (prLockServiceLock) { |
| stopper.checkCancelInProgress(null); |
| if (prLockService == null) { |
| try { |
| prLockService = |
| DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME, |
| getInternalDistributedSystem(), true /* distributed */, |
| true /* destroyOnDisconnect */, true /* automateFreeResources */); |
| } catch (IllegalArgumentException e) { |
| prLockService = DistributedLockService |
| .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); |
| if (prLockService == null) { |
| throw e; // PARTITION_LOCK_SERVICE_NAME must be illegal! |
| } |
| } |
| } |
| return prLockService; |
| } |
| } |
| |
| /** |
| * Gets or lazily creates the GatewaySender distributed lock service. |
| * |
| * @return the GatewaySender distributed lock service |
| */ |
| @Override |
| public DistributedLockService getGatewaySenderLockService() { |
| if (gatewayLockService == null) { |
| synchronized (gatewayLockServiceLock) { |
| stopper.checkCancelInProgress(null); |
| if (gatewayLockService == null) { |
| try { |
| gatewayLockService = DLockService.create(AbstractGatewaySender.LOCK_SERVICE_NAME, |
| getInternalDistributedSystem(), true /* distributed */, |
| true /* destroyOnDisconnect */, true /* automateFreeResources */); |
| } catch (IllegalArgumentException e) { |
| gatewayLockService = |
| DistributedLockService.getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME); |
| if (gatewayLockService == null) { |
| throw e; // AbstractGatewaySender.LOCK_SERVICE_NAME must be illegal! |
| } |
| } |
| } |
| } |
| } |
| return gatewayLockService; |
| } |
| |
| /** |
| * Destroys the PartitionedRegion distributed lock service when closing the cache. Caller must be |
| * synchronized on this GemFireCache. |
| */ |
| private void destroyPartitionedRegionLockService() { |
| try { |
| DistributedLockService.destroy(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); |
| } catch (IllegalArgumentException ignore) { |
| // DistributedSystem.disconnect may have already destroyed the DLS |
| } |
| } |
| |
| /** |
| * Destroys the GatewaySender distributed lock service when closing the cache. Caller must be |
| * synchronized on this GemFireCache. |
| */ |
| private void destroyGatewaySenderLockService() { |
| if (DistributedLockService.getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME) != null) { |
| try { |
| DistributedLockService.destroy(AbstractGatewaySender.LOCK_SERVICE_NAME); |
| } catch (IllegalArgumentException ignore) { |
| // DistributedSystem.disconnect may have already destroyed the DLS |
| } |
| } |
| } |
| |
| public HeapEvictor getHeapEvictor() { |
| synchronized (heapEvictorLock) { |
| stopper.checkCancelInProgress(null); |
| if (heapEvictor == null) { |
| heapEvictor = new HeapEvictor(this); |
| } |
| return heapEvictor; |
| } |
| } |
| |
| public OffHeapEvictor getOffHeapEvictor() { |
| synchronized (offHeapEvictorLock) { |
| stopper.checkCancelInProgress(null); |
| if (offHeapEvictor == null) { |
| offHeapEvictor = new OffHeapEvictor(this); |
| } |
| return offHeapEvictor; |
| } |
| } |
| |
| /** Used by test to inject an evictor */ |
| void setOffHeapEvictor(OffHeapEvictor evictor) { |
| offHeapEvictor = evictor; |
| } |
| |
| /** Used by test to inject an evictor */ |
| void setHeapEvictor(HeapEvictor evictor) { |
| heapEvictor = evictor; |
| } |
| |
| @Override |
| public PersistentMemberManager getPersistentMemberManager() { |
| return persistentMemberManager; |
| } |
| |
| @Override |
| public ClientMetadataService getClientMetadataService() { |
| stopper.checkCancelInProgress(null); |
| |
| return clientMetadataService; |
| } |
| |
| private final boolean DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE = Boolean |
| .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE"); |
| |
| @Override |
| public void close(String reason, Throwable systemFailureCause, boolean keepAlive, |
| boolean keepDS) { |
| securityService.close(); |
| |
| if (isClosed()) { |
| return; |
| } |
| |
| if (!keepDS && systemFailureCause == null // normal cache close |
| && (isReconnecting() || system.getReconnectedSystem() != null)) { |
| logger.debug( |
| "Cache is shutting down distributed system connection. " |
| + "isReconnecting={} reconnectedSystem={} keepAlive={} keepDS={}", |
| isReconnecting(), system.getReconnectedSystem(), keepAlive, keepDS); |
| |
| system.stopReconnectingNoDisconnect(); |
| if (system.getReconnectedSystem() != null) { |
| system.getReconnectedSystem().disconnect(); |
| } |
| return; |
| } |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| synchronized (GemFireCacheImpl.class) { |
| // fix for bug 36512 "GemFireCache.close is not thread safe" |
| // ALL CODE FOR CLOSE SHOULD NOW BE UNDER STATIC SYNCHRONIZATION |
| // OF synchronized (GemFireCache.class) { |
| // static synchronization is necessary due to static resources |
| if (isClosed()) { |
| return; |
| } |
| |
| /* |
| * First close the ManagementService as it uses a lot of infra which will be closed by |
| * cache.close() |
| */ |
| system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this); |
| if (resourceEventsListener != null) { |
| system.removeResourceListener(resourceEventsListener); |
| resourceEventsListener = null; |
| } |
| |
| if (systemFailureCause != null) { |
| forcedDisconnect = systemFailureCause instanceof ForcedDisconnectException; |
| if (forcedDisconnect) { |
| disconnectCause = new ForcedDisconnectException(reason); |
| } else { |
| disconnectCause = systemFailureCause; |
| } |
| } |
| |
| this.keepAlive = keepAlive; |
| isClosing = true; |
| logger.info("{}: Now closing.", this); |
| |
| // we don't clear the prID map if there is a system failure. Other |
| // threads may be hung trying to communicate with the map locked |
| if (systemFailureCause == null) { |
| PartitionedRegion.clearPRIdMap(); |
| } |
| TXStateProxy tx = null; |
| try { |
| |
| if (transactionManager != null) { |
| tx = transactionManager.pauseTransaction(); |
| } |
| |
| // do this before closing regions |
| resourceManager.close(); |
| |
| try { |
| resourceAdvisor.close(); |
| } catch (CancelException ignore) { |
| // ignore |
| } |
| try { |
| jmxAdvisor.close(); |
| } catch (CancelException ignore) { |
| // ignore |
| } |
| |
| for (GatewaySender sender : allGatewaySenders) { |
| try { |
| sender.stop(); |
| GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor(); |
| if (advisor != null) { |
| if (isDebugEnabled) { |
| logger.debug("Stopping the GatewaySender advisor"); |
| } |
| advisor.close(); |
| } |
| } catch (CancelException ignore) { |
| } |
| } |
| |
| destroyGatewaySenderLockService(); |
| |
| if (eventThreadPool != null) { |
| if (isDebugEnabled) { |
| logger.debug("{}: stopping event thread pool...", this); |
| } |
| eventThreadPool.shutdown(); |
| } |
| |
| /* |
| * IMPORTANT: any operation during shut down that can time out (create a CancelException) |
| * must be inside of this try block. If all else fails, we *must* ensure that the cache gets |
| * closed! |
| */ |
| try { |
| stopServers(); |
| |
| stopServices(); |
| |
| httpService.ifPresent(HttpService::stop); |
| |
| // no need to track PR instances since we won't create any more |
| // cacheServers or gatewayHubs |
| if (isDebugEnabled) { |
| logger.debug("{}: clearing partitioned regions...", this); |
| } |
| synchronized (partitionedRegions) { |
| int prSize = -partitionedRegions.size(); |
| partitionedRegions.clear(); |
| getCachePerfStats().incPartitionedRegions(prSize); |
| } |
| |
| prepareDiskStoresForClose(); |
| |
| List<InternalRegion> rootRegionValues; |
| synchronized (rootRegions) { |
| rootRegionValues = new ArrayList<>(rootRegions.values()); |
| } |
| { |
| final Operation op; |
| if (forcedDisconnect) { |
| op = Operation.FORCED_DISCONNECT; |
| } else if (isReconnecting()) { |
| op = Operation.CACHE_RECONNECT; |
| } else { |
| op = Operation.CACHE_CLOSE; |
| } |
| |
| InternalRegion prRoot = null; |
| |
| for (InternalRegion lr : rootRegionValues) { |
| if (isDebugEnabled) { |
| logger.debug("{}: processing region {}", this, lr.getFullPath()); |
| } |
| if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) { |
| prRoot = lr; |
| } else { |
| if (lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)) { |
| continue; // this region will be closed internally by parent region |
| } |
| if (isDebugEnabled) { |
| logger.debug("{}: closing region {}...", this, lr.getFullPath()); |
| } |
| try { |
| lr.handleCacheClose(op); |
| } catch (RuntimeException e) { |
| if (isDebugEnabled || !forcedDisconnect) { |
| logger.warn(String.format("%s: error closing region %s", |
| this, lr.getFullPath()), e); |
| } |
| } |
| } |
| } // for |
| |
| try { |
| if (isDebugEnabled) { |
| logger.debug("{}: finishing partitioned region close...", this); |
| } |
| PartitionedRegion.afterRegionsClosedByCacheClose(this); |
| if (prRoot != null) { |
| // do the PR meta root region last |
| prRoot.handleCacheClose(op); |
| } |
| } catch (CancelException e) { |
| logger.warn(String.format("%s: error in last stage of PartitionedRegion cache close", |
| this), |
| e); |
| } |
| destroyPartitionedRegionLockService(); |
| } |
| |
| closeDiskStores(); |
| diskMonitor.close(); |
| |
| // Close the CqService Handle. |
| try { |
| if (isDebugEnabled) { |
| logger.debug("{}: closing CQ service...", this); |
| } |
| cqService.close(); |
| } catch (RuntimeException ignore) { |
| logger.info("Failed to get the CqService, to close during cache close (1)."); |
| } |
| |
| PoolManager.close(keepAlive); |
| |
| if (isDebugEnabled) { |
| logger.debug("{}: notifying admins of close...", this); |
| } |
| try { |
| SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CLOSE); |
| } catch (CancelException ignore) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Ignored cancellation while notifying admins"); |
| } |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("{}: stopping destroyed entries processor...", this); |
| } |
| tombstoneService.stop(); |
| |
| // NOTICE: the CloseCache message is the *last* message you can send! |
| DistributionManager distributionManager = null; |
| try { |
| distributionManager = system.getDistributionManager(); |
| distributionManager.removeMembershipListener(transactionManager); |
| } catch (CancelException ignore) { |
| // distributionManager = null; |
| } |
| |
| if (distributionManager != null) { // Send CacheClosedMessage (and NOTHING ELSE) here |
| if (isDebugEnabled) { |
| logger.debug("{}: sending CloseCache to peers...", this); |
| } |
| Set<? extends DistributedMember> otherMembers = |
| distributionManager.getOtherDistributionManagerIds(); |
| ReplyProcessor21 processor = new ReplyProcessor21(system, otherMembers); |
| CloseCacheMessage msg = new CloseCacheMessage(); |
| msg.setRecipients(otherMembers); |
| msg.setProcessorId(processor.getProcessorId()); |
| distributionManager.putOutgoing(msg); |
| try { |
| processor.waitForReplies(); |
| } catch (InterruptedException ignore) { |
| // Thread.currentThread().interrupt(); // TODO ??? should we reset this bit later? |
| // Keep going, make best effort to shut down. |
| } catch (ReplyException ignore) { |
| // keep going |
| } |
| // set closed state after telling others and getting responses |
| // to avoid complications with others still in the process of |
| // sending messages |
| } |
| // NO MORE Distributed Messaging AFTER THIS POINT!!!! |
| |
| ClientMetadataService cms = clientMetadataService; |
| if (cms != null) { |
| cms.close(); |
| } |
| closeHeapEvictor(); |
| closeOffHeapEvictor(); |
| } catch (CancelException ignore) { |
| // make sure the disk stores get closed |
| closeDiskStores(); |
| // NO DISTRIBUTED MESSAGING CAN BE DONE HERE! |
| } |
| |
| // Close the CqService Handle. |
| try { |
| cqService.close(); |
| } catch (RuntimeException ignore) { |
| logger.info("Failed to get the CqService, to close during cache close (2)."); |
| } |
| |
| cachePerfStats.close(); |
| TXLockService.destroyServices(); |
| getEventTrackerTask().cancel(); |
| |
| synchronized (ccpTimerMutex) { |
| if (ccpTimer != null) { |
| ccpTimer.cancel(); |
| } |
| } |
| |
| expirationScheduler.cancel(); |
| |
| // Stop QueryMonitor if running. |
| if (queryMonitor != null) { |
| queryMonitor.stopMonitoring(); |
| } |
| |
| } finally { |
| // NO DISTRIBUTED MESSAGING CAN BE DONE HERE! |
| if (transactionManager != null) { |
| transactionManager.close(); |
| } |
| ((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close(); |
| if (transactionManager != null) { |
| transactionManager.unpauseTransaction(tx); |
| } |
| TXCommitMessage.getTracker().clearForCacheClose(); |
| } |
| // Added to close the TransactionManager's cleanup thread |
| TransactionManagerImpl.refresh(); |
| |
| if (!keepDS) { |
| // keepDS is used by ShutdownAll. It will override DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE |
| if (!DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE) { |
| system.disconnect(); |
| } |
| } |
| TypeRegistry.close(); |
| // do this late to prevent 43412 |
| TypeRegistry.setPdxSerializer(null); |
| |
| for (CacheLifecycleListener listener : cacheLifecycleListeners) { |
| listener.cacheClosed(this); |
| } |
| // Fix for #49856 |
| SequenceLoggerImpl.signalCacheClose(); |
| SystemFailure.signalCacheClose(); |
| |
| } // static synchronization on GemFireCache.class |
| |
| } |
| |
| private void stopServices() { |
| for (CacheService service : services.values()) { |
| try { |
| service.close(); |
| } catch (Throwable t) { |
| logger.warn("Error stopping service " + service, t); |
| } |
| } |
| } |
| |
| private void closeOffHeapEvictor() { |
| OffHeapEvictor evictor = offHeapEvictor; |
| if (evictor != null) { |
| evictor.close(); |
| } |
| } |
| |
| private void closeHeapEvictor() { |
| HeapEvictor evictor = heapEvictor; |
| if (evictor != null) { |
| evictor.close(); |
| } |
| } |
| |
| @Override |
| public boolean isReconnecting() { |
| return system.isReconnecting(); |
| } |
| |
| @Override |
| public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException { |
| try { |
| boolean systemReconnected = system.waitUntilReconnected(time, units); |
| if (!systemReconnected) { |
| return false; |
| } |
| GemFireCacheImpl cache = getInstance(); |
| return cache != null && cache.isInitialized(); |
| } catch (CancelException e) { |
| throw new CacheClosedException("Cache could not be recreated", e); |
| } |
| } |
| |
| @Override |
| public void stopReconnecting() { |
| system.stopReconnecting(); |
| } |
| |
| @Override |
| public Cache getReconnectedCache() { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| if (cache == this || cache != null && !cache.isInitialized()) { |
| cache = null; |
| } |
| return cache; |
| } |
| |
| private void prepareDiskStoresForClose() { |
| String pdxDSName = TypeRegistry.getPdxDiskStoreName(this); |
| DiskStoreImpl pdxDiskStore = null; |
| for (DiskStoreImpl dsi : diskStores.values()) { |
| if (dsi.getName().equals(pdxDSName)) { |
| pdxDiskStore = dsi; |
| } else { |
| dsi.prepareForClose(); |
| } |
| } |
| if (pdxDiskStore != null) { |
| pdxDiskStore.prepareForClose(); |
| } |
| } |
| |
| private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<>(); |
| |
| private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores = |
| new ConcurrentHashMap<>(); |
| |
| @Override |
| public void addDiskStore(DiskStoreImpl dsi) { |
| diskStores.put(dsi.getName(), dsi); |
| if (!dsi.isOffline()) { |
| diskMonitor.addDiskStore(dsi); |
| } |
| } |
| |
| @Override |
| public void removeDiskStore(DiskStoreImpl diskStore) { |
| diskStores.remove(diskStore.getName()); |
| regionOwnedDiskStores.remove(diskStore.getName()); |
| // Added for M&M |
| if (!diskStore.getOwnedByRegion()) |
| system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore); |
| } |
| |
| @Override |
| public void addRegionOwnedDiskStore(DiskStoreImpl dsi) { |
| regionOwnedDiskStores.put(dsi.getName(), dsi); |
| if (!dsi.isOffline()) { |
| diskMonitor.addDiskStore(dsi); |
| } |
| } |
| |
| @Override |
| public void closeDiskStores() { |
| Iterator<DiskStoreImpl> it = diskStores.values().iterator(); |
| while (it.hasNext()) { |
| try { |
| DiskStoreImpl dsi = it.next(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("closing {}", dsi); |
| } |
| dsi.close(); |
| // Added for M&M |
| system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi); |
| } catch (RuntimeException e) { |
| logger.fatal("Cache close caught an exception during disk store close", e); |
| } |
| it.remove(); |
| } |
| } |
| |
| /** |
| * Used by unit tests to allow them to change the default disk store name. |
| */ |
| public static void setDefaultDiskStoreName(String dsName) { |
| defaultDiskStoreName = dsName; |
| } |
| |
| public static String getDefaultDiskStoreName() { |
| return defaultDiskStoreName; |
| } |
| |
| // TODO: remove static from defaultDiskStoreName and move methods to InternalCache |
| @MakeNotStatic |
| private static String defaultDiskStoreName = DiskStoreFactory.DEFAULT_DISK_STORE_NAME; |
| |
| @Override |
| public DiskStoreImpl getOrCreateDefaultDiskStore() { |
| DiskStoreImpl result = (DiskStoreImpl) findDiskStore(null); |
| if (result == null) { |
| synchronized (this) { |
| result = (DiskStoreImpl) findDiskStore(null); |
| if (result == null) { |
| result = (DiskStoreImpl) createDiskStoreFactory().create(defaultDiskStoreName); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Returns the DiskStore by name |
| * |
| * @since GemFire prPersistSprint2 |
| */ |
| @Override |
| public DiskStore findDiskStore(String name) { |
| if (name == null) { |
| name = defaultDiskStoreName; |
| } |
| return diskStores.get(name); |
| } |
| |
| /** |
| * Returns the DiskStore list |
| * |
| * @since GemFire prPersistSprint2 |
| */ |
| @Override |
| public Collection<DiskStore> listDiskStores() { |
| return Collections.unmodifiableCollection(diskStores.values()); |
| } |
| |
| @Override |
| public Collection<DiskStore> listDiskStoresIncludingRegionOwned() { |
| Collection<DiskStore> allDiskStores = new HashSet<>(); |
| allDiskStores.addAll(diskStores.values()); |
| allDiskStores.addAll(regionOwnedDiskStores.values()); |
| return allDiskStores; |
| } |
| |
| private void stopServers() { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("{}: stopping cache servers...", this); |
| } |
| |
| boolean stoppedCacheServer = false; |
| |
| for (InternalCacheServer cacheServer : allCacheServers) { |
| if (isDebugEnabled) { |
| logger.debug("stopping bridge {}", cacheServer); |
| } |
| try { |
| cacheServer.stop(); |
| } catch (CancelException e) { |
| if (isDebugEnabled) { |
| logger.debug("Ignored cache closure while closing bridge {}", cacheServer, e); |
| } |
| } |
| allCacheServers.remove(cacheServer); |
| stoppedCacheServer = true; |
| } |
| |
| InternalCacheServer receiverServer = gatewayReceiverServer.getAndSet(null); |
| if (receiverServer != null) { |
| if (isDebugEnabled) { |
| logger.debug("stopping gateway receiver server {}", receiverServer); |
| } |
| try { |
| receiverServer.stop(); |
| } catch (CancelException e) { |
| if (isDebugEnabled) { |
| logger.debug("Ignored cache closure while closing gateway receiver server {}", |
| receiverServer, e); |
| } |
| } |
| stoppedCacheServer = true; |
| } |
| |
| if (stoppedCacheServer) { |
| // now that all the cache servers have stopped empty the static pool of commBuffers it might |
| // have used. |
| ServerConnection.emptyCommBufferPool(); |
| } |
| |
| // stop HA services if they had been started |
| if (isDebugEnabled) { |
| logger.debug("{}: stopping HA services...", this); |
| } |
| try { |
| HARegionQueue.stopHAServices(); |
| } catch (CancelException e) { |
| if (isDebugEnabled) { |
| logger.debug("Ignored cache closure while closing HA services", e); |
| } |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("{}: stopping client health monitor...", this); |
| } |
| try { |
| ClientHealthMonitor.shutdownInstance(); |
| } catch (CancelException e) { |
| if (isDebugEnabled) { |
| logger.debug("Ignored cache closure while closing client health monitor", e); |
| } |
| } |
| |
| // Reset the unique id counter for durable clients. |
| // If a durable client stops/starts its cache, it needs |
| // to maintain the same unique id. |
| ClientProxyMembershipID.resetUniqueIdCounter(); |
| } |
| |
| @Override |
| public DistributedSystem getDistributedSystem() { |
| return system; |
| } |
| |
| @Override |
| public InternalDistributedSystem getInternalDistributedSystem() { |
| return system; |
| } |
| |
| /** |
| * Returns the member id of my distributed system |
| * |
| * @since GemFire 5.0 |
| */ |
| @Override |
| public InternalDistributedMember getMyId() { |
| return system.getDistributedMember(); |
| } |
| |
| @Override |
| public Set<DistributedMember> getMembers() { |
| return Collections |
| .unmodifiableSet(dm.getOtherNormalDistributionManagerIds()); |
| } |
| |
| @Override |
| public Set<DistributedMember> getAdminMembers() { |
| return asDistributedMemberSet(dm.getAdminMemberSet()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Set<DistributedMember> asDistributedMemberSet( |
| Set<InternalDistributedMember> internalDistributedMembers) { |
| return (Set) internalDistributedMembers; |
| } |
| |
| @Override |
| public Set<DistributedMember> getMembers(Region region) { |
| if (region instanceof DistributedRegion) { |
| DistributedRegion distributedRegion = (DistributedRegion) region; |
| return asDistributedMemberSet(distributedRegion.getDistributionAdvisor().adviseCacheOp()); |
| } else if (region instanceof PartitionedRegion) { |
| PartitionedRegion partitionedRegion = (PartitionedRegion) region; |
| return asDistributedMemberSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| |
| @Override |
| public Set<InetSocketAddress> getCurrentServers() { |
| Map<String, Pool> pools = PoolManager.getAll(); |
| Set<InetSocketAddress> result = null; |
| for (Pool pool : pools.values()) { |
| PoolImpl poolImpl = (PoolImpl) pool; |
| for (ServerLocation serverLocation : poolImpl.getCurrentServers()) { |
| if (result == null) { |
| result = new HashSet<>(); |
| } |
| result.add(new InetSocketAddress(serverLocation.getHostName(), serverLocation.getPort())); |
| } |
| } |
| if (result == null) { |
| return Collections.emptySet(); |
| } else { |
| return result; |
| } |
| } |
| |
| @Override |
| public LogWriter getLogger() { |
| return system.getLogWriter(); |
| } |
| |
| @Override |
| public LogWriter getSecurityLogger() { |
| return system.getSecurityLogWriter(); |
| } |
| |
| @Override |
| public LogWriterI18n getLoggerI18n() { |
| return system.getInternalLogWriter(); |
| } |
| |
| @Override |
| public LogWriterI18n getSecurityLoggerI18n() { |
| return system.getSecurityInternalLogWriter(); |
| } |
| |
| @Override |
| public InternalLogWriter getInternalLogWriter() { |
| return system.getInternalLogWriter(); |
| } |
| |
| @Override |
| public InternalLogWriter getSecurityInternalLogWriter() { |
| return system.getSecurityInternalLogWriter(); |
| } |
| |
| /** |
| * get the threadId/sequenceId sweeper task for this cache |
| * |
| * @return the sweeper task |
| */ |
| @Override |
| public EventTrackerExpiryTask getEventTrackerTask() { |
| return recordedEventSweeper; |
| } |
| |
| @Override |
| public CachePerfStats getCachePerfStats() { |
| return cachePerfStats; |
| } |
| |
| @Override |
| public String getName() { |
| return system.getName(); |
| } |
| |
| /** |
| * Get the list of all instances of properties for Declarables with the given class name. |
| * |
| * @param className Class name of the declarable |
| * @return List of all instances of properties found for the given declarable |
| */ |
| @Override |
| public List<Properties> getDeclarableProperties(final String className) { |
| List<Properties> propertiesList = new ArrayList<>(); |
| synchronized (declarablePropertiesMap) { |
| for (Entry<Declarable, Properties> entry : declarablePropertiesMap.entrySet()) { |
| if (entry.getKey().getClass().getName().equals(className)) { |
| propertiesList.add(entry.getValue()); |
| } |
| } |
| } |
| return propertiesList; |
| } |
| |
| /** |
| * Get the properties for the given declarable. |
| * |
| * @param declarable The declarable |
| * @return Properties found for the given declarable |
| */ |
| @Override |
| public Properties getDeclarableProperties(final Declarable declarable) { |
| return declarablePropertiesMap.get(declarable); |
| } |
| |
| /** |
| * Returns the number of seconds that have elapsed since the Cache was created. |
| * |
| * @since GemFire 3.5 |
| */ |
| @Override |
| public int getUpTime() { |
| return (int) (System.currentTimeMillis() - creationDate.getTime()) / 1000; |
| } |
| |
| /** |
| * All entry and region operations should be using this time rather than |
| * System.currentTimeMillis(). Specially all version stamps/tags must be populated with this |
| * timestamp. |
| * |
| * @return distributed cache time. |
| */ |
| @Override |
| public long cacheTimeMillis() { |
| if (system != null) { |
| return system.getClock().cacheTimeMillis(); |
| } else { |
| return System.currentTimeMillis(); |
| } |
| } |
| |
| @Override |
| public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> aRegionAttributes) |
| throws RegionExistsException, TimeoutException { |
| return createRegion(name, aRegionAttributes); |
| } |
| |
| private PoolFactory createDefaultPF() { |
| PoolFactory defaultPoolFactory = PoolManager.createFactory(); |
| try { |
| String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost()); |
| defaultPoolFactory.addServer(localHostName, CacheServer.DEFAULT_PORT); |
| } catch (UnknownHostException ex) { |
| throw new IllegalStateException("Could not determine local host name", ex); |
| } |
| return defaultPoolFactory; |
| } |
| |
| private Pool findFirstCompatiblePool(Map<String, Pool> pools) { |
| // act as if the default pool was configured |
| // and see if we can find an existing one that is compatible |
| PoolFactoryImpl pfi = (PoolFactoryImpl) createDefaultPF(); |
| for (Pool p : pools.values()) { |
| if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) { |
| return p; |
| } |
| } |
| return null; |
| } |
| |
| private void addLocalHostAsServer(PoolFactory poolFactory) { |
| PoolFactoryImpl poolFactoryImpl = (PoolFactoryImpl) poolFactory; |
| if (poolFactoryImpl.getPoolAttributes().locators.isEmpty() |
| && poolFactoryImpl.getPoolAttributes().servers.isEmpty()) { |
| try { |
| String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost()); |
| poolFactoryImpl.addServer(localHostName, CacheServer.DEFAULT_PORT); |
| } catch (UnknownHostException ex) { |
| throw new IllegalStateException("Could not determine local host name", ex); |
| } |
| } |
| } |
| |
| /** |
| * Used to set the default pool on a new GemFireCache. |
| */ |
| @Override |
| public synchronized void determineDefaultPool() { |
| if (!isClient()) { |
| throw new UnsupportedOperationException(); |
| } |
| PoolFactory defaultPoolFactory = poolFactory; |
| |
| Pool pool = null; |
| // create the pool if it does not already exist |
| if (defaultPoolFactory == null) { |
| Map<String, Pool> pools = PoolManager.getAll(); |
| if (pools.isEmpty()) { |
| defaultPoolFactory = createDefaultPF(); |
| } else if (pools.size() == 1) { |
| // otherwise use a singleton. |
| pool = pools.values().iterator().next(); |
| } else { |
| pool = findFirstCompatiblePool(pools); |
| if (pool == null) { |
| // if pool is still null then we will not have a default pool for this ClientCache |
| defaultPool = null; |
| return; |
| } |
| } |
| } else { |
| addLocalHostAsServer(defaultPoolFactory); |
| |
| // look for a pool that already exists that is compatible with |
| // our PoolFactory. |
| // If we don't find one we will create a new one that meets our needs. |
| Map<String, Pool> pools = PoolManager.getAll(); |
| for (Pool p : pools.values()) { |
| if (((PoolImpl) p) |
| .isCompatible(((PoolFactoryImpl) defaultPoolFactory).getPoolAttributes())) { |
| pool = p; |
| break; |
| } |
| } |
| } |
| if (pool == null) { |
| // create our pool with a unique name |
| String poolName = DEFAULT_POOL_NAME; |
| int count = 1; |
| Map<String, Pool> pools = PoolManager.getAll(); |
| while (pools.containsKey(poolName)) { |
| poolName = DEFAULT_POOL_NAME + count; |
| count++; |
| } |
| pool = defaultPoolFactory.create(poolName); |
| } |
| defaultPool = pool; |
| } |
| |
| /** |
| * Determine whether the specified pool factory matches the pool factory used by this cache. |
| * |
| * @param poolFactory Prospective pool factory. |
| * @throws IllegalStateException When the specified pool factory does not match. |
| */ |
| @Override |
| public void validatePoolFactory(PoolFactory poolFactory) { |
| // If the specified pool factory is null, by definition there is no pool factory to validate. |
| if (poolFactory != null && !Objects.equals(this.poolFactory, poolFactory)) { |
| throw new IllegalStateException("Existing cache's default pool was not compatible"); |
| } |
| } |
| |
| @Override |
| public <K, V> Region<K, V> createRegion(String name, RegionAttributes<K, V> aRegionAttributes) |
| throws RegionExistsException, TimeoutException { |
| throwIfClient(); |
| return basicCreateRegion(name, aRegionAttributes); |
| } |
| |
| @Override |
| public <K, V> Region<K, V> basicCreateRegion(String name, RegionAttributes<K, V> attrs) |
| throws RegionExistsException, TimeoutException { |
| try { |
| InternalRegionArguments ira = new InternalRegionArguments().setDestroyLockFlag(true) |
| .setRecreateFlag(false).setSnapshotInputStream(null).setImageTarget(null); |
| |
| if (attrs instanceof UserSpecifiedRegionAttributes) { |
| ira.setIndexes(((UserSpecifiedRegionAttributes) attrs).getIndexes()); |
| } |
| return createVMRegion(name, attrs, ira); |
| } catch (IOException | ClassNotFoundException e) { |
| // only if loading snapshot, not here |
| throw new InternalGemFireError( |
| "unexpected exception", e); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <K, V> Region<K, V> uncheckedRegion(Region region) { |
| return region; |
| } |
| |
| @Override |
| public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs, |
| InternalRegionArguments internalRegionArgs) |
| throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException { |
| |
| if (getMyId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) { |
| if (!internalRegionArgs.isUsedForMetaRegion() |
| && internalRegionArgs.getInternalMetaRegion() == null) { |
| throw new IllegalStateException("Regions can not be created in a locator."); |
| } |
| } |
| stopper.checkCancelInProgress(null); |
| RegionNameValidation.validate(name, internalRegionArgs); |
| RegionAttributes<K, V> attrs = p_attrs; |
| attrs = invokeRegionBefore(null, name, attrs, internalRegionArgs); |
| if (attrs == null) { |
| throw new IllegalArgumentException( |
| "Attributes must not be null"); |
| } |
| |
| InternalRegion region; |
| final InputStream snapshotInputStream = internalRegionArgs.getSnapshotInputStream(); |
| InternalDistributedMember imageTarget = internalRegionArgs.getImageTarget(); |
| final boolean recreate = internalRegionArgs.getRecreateFlag(); |
| |
| final boolean isPartitionedRegion = attrs.getPartitionAttributes() != null; |
| final boolean isReInitCreate = snapshotInputStream != null || imageTarget != null || recreate; |
| |
| try { |
| for (;;) { |
| getCancelCriterion().checkCancelInProgress(null); |
| |
| Future<InternalRegion> future = null; |
| synchronized (rootRegions) { |
| region = rootRegions.get(name); |
| if (region != null) { |
| throw new RegionExistsException(region); |
| } |
| // check for case where a root region is being reinitialized and we |
| // didn't |
| // find a region, i.e. the new region is about to be created |
| |
| if (!isReInitCreate) { // fix bug 33523 |
| String fullPath = Region.SEPARATOR + name; |
| future = reinitializingRegions.get(fullPath); |
| } |
| if (future == null) { |
| if (internalRegionArgs.getInternalMetaRegion() != null) { |
| region = internalRegionArgs.getInternalMetaRegion(); |
| } else if (isPartitionedRegion) { |
| region = new PartitionedRegion(name, attrs, null, this, internalRegionArgs); |
| } else { |
| // Abstract region depends on the default pool existing so lazily initialize it |
| // if necessary. |
| if (Objects.equals(attrs.getPoolName(), DEFAULT_POOL_NAME)) { |
| determineDefaultPool(); |
| } |
| if (attrs.getScope().isLocal()) { |
| region = new LocalRegion(name, attrs, null, this, internalRegionArgs); |
| } else { |
| region = new DistributedRegion(name, attrs, null, this, internalRegionArgs); |
| } |
| } |
| |
| rootRegions.put(name, region); |
| if (isReInitCreate) { |
| regionReinitialized(region); |
| } |
| break; |
| } |
| } // synchronized |
| |
| boolean interrupted = Thread.interrupted(); |
| try { // future != null |
| throw new RegionExistsException(future.get()); |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } catch (ExecutionException e) { |
| throw new Error("unexpected exception", |
| e); |
| } catch (CancellationException e) { |
| // future was cancelled |
| if (logger.isTraceEnabled()) { |
| logger.trace("future cancelled", e); |
| } |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| boolean success = false; |
| try { |
| setRegionByPath(region.getFullPath(), region); |
| region.preInitialize(); |
| region.initialize(snapshotInputStream, imageTarget, internalRegionArgs); |
| success = true; |
| } catch (CancelException | RedundancyAlreadyMetException e) { |
| // don't print a call stack |
| throw e; |
| } catch (RuntimeException validationException) { |
| logger.warn(String.format("Initialization failed for Region %s", region.getFullPath()), |
| validationException); |
| throw validationException; |
| } finally { |
| if (!success) { |
| try { |
| // do this before removing the region from |
| // the root set to fix bug 41982. |
| region.cleanupFailedInitialization(); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| SystemFailure.checkFailure(); |
| stopper.checkCancelInProgress(t); |
| |
| // bug #44672 - log the failure but don't override the original exception |
| logger.warn(String.format("Initialization failed for Region %s", |
| region.getFullPath()), |
| t); |
| |
| } finally { |
| // clean up if initialize fails for any reason |
| setRegionByPath(region.getFullPath(), null); |
| synchronized (rootRegions) { |
| Region rootRegion = rootRegions.get(name); |
| if (rootRegion == region) { |
| rootRegions.remove(name); |
| } |
| } // synchronized |
| } |
| } // success |
| } |
| |
| region.postCreateRegion(); |
| } catch (RegionExistsException ex) { |
| // outside of sync make sure region is initialized to fix bug 37563 |
| InternalRegion internalRegion = (InternalRegion) ex.getRegion(); |
| internalRegion.waitOnInitialization(); // don't give out ref until initialized |
| throw ex; |
| } |
| |
| invokeRegionAfter(region); |
| |
| // Added for M&M . Putting the callback here to avoid creating RegionMBean in case of Exception |
| if (!region.isInternalRegion()) { |
| system.handleResourceEvent(ResourceEvent.REGION_CREATE, region); |
| } |
| |
| return uncheckedRegion(region); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <K, V> RegionAttributes<K, V> uncheckedRegionAttributes(RegionAttributes region) { |
| return region; |
| } |
| |
| @Override |
| public <K, V> RegionAttributes<K, V> invokeRegionBefore(InternalRegion parent, String name, |
| RegionAttributes<K, V> attrs, InternalRegionArguments internalRegionArgs) { |
| for (RegionListener listener : regionListeners) { |
| attrs = |
| uncheckedRegionAttributes(listener.beforeCreate(parent, name, attrs, internalRegionArgs)); |
| } |
| return attrs; |
| } |
| |
| @Override |
| public void invokeRegionAfter(InternalRegion region) { |
| for (RegionListener listener : regionListeners) { |
| listener.afterCreate(region); |
| } |
| } |
| |
| @Override |
| public void invokeBeforeDestroyed(InternalRegion region) { |
| for (RegionListener listener : regionListeners) { |
| listener.beforeDestroyed(region); |
| } |
| } |
| |
| @Override |
| public void invokeCleanupFailedInitialization(InternalRegion region) { |
| for (RegionListener listener : regionListeners) { |
| listener.cleanupFailedInitialization(region); |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Region getRegion(String path) { |
| return getRegion(path, false); |
| } |
| |
| /** |
| * returns a set of all current regions in the cache, including buckets |
| * |
| * @since GemFire 6.0 |
| */ |
| @Override |
| public Set<InternalRegion> getAllRegions() { |
| Set<InternalRegion> result = new HashSet<>(); |
| synchronized (rootRegions) { |
| for (Region region : rootRegions.values()) { |
| if (region instanceof PartitionedRegion) { |
| PartitionedRegion partitionedRegion = (PartitionedRegion) region; |
| PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore(); |
| if (dataStore != null) { |
| Set<Entry<Integer, BucketRegion>> bucketEntries = |
| partitionedRegion.getDataStore().getAllLocalBuckets(); |
| for (Entry entry : bucketEntries) { |
| result.add((InternalRegion) entry.getValue()); |
| } |
| } |
| } else if (region instanceof InternalRegion) { |
| InternalRegion internalRegion = (InternalRegion) region; |
| result.add(internalRegion); |
| result.addAll(internalRegion.basicSubregions(true)); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public Set<InternalRegion> getApplicationRegions() { |
| Set<InternalRegion> result = new HashSet<>(); |
| synchronized (rootRegions) { |
| for (Object region : rootRegions.values()) { |
| InternalRegion internalRegion = (InternalRegion) region; |
| if (internalRegion.isInternalRegion()) { |
| continue; // Skip internal regions |
| } |
| result.add(internalRegion); |
| result.addAll(internalRegion.basicSubregions(true)); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean hasPersistentRegion() { |
| synchronized (rootRegions) { |
| for (InternalRegion region : rootRegions.values()) { |
| if (region.getDataPolicy().withPersistence()) { |
| return true; |
| } |
| for (InternalRegion subRegion : region.basicSubregions(true)) { |
| if (subRegion.getDataPolicy().withPersistence()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| } |
| |
| @Override |
| public void setRegionByPath(String path, InternalRegion r) { |
| if (r == null) { |
| pathToRegion.remove(path); |
| } else { |
| pathToRegion.put(path, r); |
| } |
| } |
| |
| /** |
| * @throws IllegalArgumentException if path is not valid |
| */ |
| private static void validatePath(String path) { |
| if (path == null) { |
| throw new IllegalArgumentException( |
| "path cannot be null"); |
| } |
| if (path.isEmpty()) { |
| throw new IllegalArgumentException( |
| "path cannot be empty"); |
| } |
| if (path.equals(Region.SEPARATOR)) { |
| throw new IllegalArgumentException( |
| String.format("path cannot be ' %s '", Region.SEPARATOR)); |
| } |
| } |
| |
| @Override |
| public InternalRegion getRegionByPath(String path) { |
| validatePath(path); // fix for bug 34892 |
| |
| // do this before checking the pathToRegion map |
| InternalRegion result = getReinitializingRegion(path); |
| if (result != null) { |
| return result; |
| } |
| return pathToRegion.get(path); |
| } |
| |
| @Override |
| public InternalRegion getRegionByPathForProcessing(String path) { |
| InternalRegion result = getRegionByPath(path); |
| if (result == null) { |
| stopper.checkCancelInProgress(null); |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(InitializationLevel.ANY_INIT); // go through |
| // initialization latches |
| try { |
| String[] pathParts = parsePath(path); |
| InternalRegion rootRegion; |
| synchronized (rootRegions) { |
| rootRegion = rootRegions.get(pathParts[0]); |
| if (rootRegion == null) |
| return null; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("GemFireCache.getRegion, calling getSubregion on rootRegion({}): {}", |
| pathParts[0], pathParts[1]); |
| } |
| result = (InternalRegion) rootRegion.getSubregion(pathParts[1], true); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * @param returnDestroyedRegion if true, okay to return a destroyed region |
| */ |
| @Override |
| public Region getRegion(String path, boolean returnDestroyedRegion) { |
| stopper.checkCancelInProgress(null); |
| |
| InternalRegion result = getRegionByPath(path); |
| // Do not waitOnInitialization() for PR |
| if (result != null) { |
| result.waitOnInitialization(); |
| if (!returnDestroyedRegion && result.isDestroyed()) { |
| stopper.checkCancelInProgress(null); |
| return null; |
| } else { |
| return result; |
| } |
| } |
| |
| String[] pathParts = parsePath(path); |
| InternalRegion rootRegion; |
| synchronized (rootRegions) { |
| rootRegion = rootRegions.get(pathParts[0]); |
| if (rootRegion == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("GemFireCache.getRegion, no region found for {}", pathParts[0]); |
| } |
| stopper.checkCancelInProgress(null); |
| return null; |
| } |
| if (!returnDestroyedRegion && rootRegion.isDestroyed()) { |
| stopper.checkCancelInProgress(null); |
| return null; |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("GemFireCache.getRegion, calling getSubregion on rootRegion({}): {}", |
| pathParts[0], pathParts[1]); |
| } |
| return rootRegion.getSubregion(pathParts[1], returnDestroyedRegion); |
| } |
| |
| /** Return true if this region is initializing */ |
| @Override |
| public boolean isGlobalRegionInitializing(String fullPath) { |
| stopper.checkCancelInProgress(null); |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(InitializationLevel.ANY_INIT); // go through |
| // initialization latches |
| try { |
| return isGlobalRegionInitializing((InternalRegion) getRegion(fullPath)); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| |
| /** Return true if this region is initializing */ |
| private boolean isGlobalRegionInitializing(InternalRegion region) { |
| boolean result = region != null && region.getScope().isGlobal() && !region.isInitialized(); |
| if (result) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("GemFireCache.isGlobalRegionInitializing ({})", region.getFullPath()); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public Set<Region<?, ?>> rootRegions() { |
| return rootRegions(false); |
| } |
| |
| @Override |
| public Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions) { |
| return rootRegions(includePRAdminRegions, true); |
| } |
| |
| private Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions, boolean waitForInit) { |
| stopper.checkCancelInProgress(null); |
| Set<Region<?, ?>> regions = new HashSet<>(); |
| synchronized (rootRegions) { |
| for (InternalRegion region : rootRegions.values()) { |
| // If this is an internal meta-region, don't return it to end user |
| if (region.isSecret() || region.isUsedForMetaRegion() |
| || !includePRAdminRegions && (region.isUsedForPartitionedRegionAdmin() |
| || region.isUsedForPartitionedRegionBucket())) { |
| continue; // Skip administrative PartitionedRegions |
| } |
| regions.add(region); |
| } |
| } |
| if (waitForInit) { |
| for (Iterator<Region<?, ?>> iterator = regions.iterator(); iterator.hasNext();) { |
| InternalRegion region = (InternalRegion) iterator.next(); |
| if (!region.checkForInitialization()) { |
| iterator.remove(); |
| } |
| } |
| } |
| return Collections.unmodifiableSet(regions); |
| } |
| |
| /** |
| * Called by notifier when a client goes away |
| * |
| * @since GemFire 5.7 |
| */ |
| @Override |
| public void cleanupForClient(CacheClientNotifier ccn, ClientProxyMembershipID client) { |
| try { |
| if (isClosed()) { |
| return; |
| } |
| for (Object region : rootRegions(false, false)) { |
| InternalRegion internalRegion = (InternalRegion) region; |
| internalRegion.cleanupForClient(ccn, client); |
| } |
| } catch (DistributedSystemDisconnectedException ignore) { |
| } |
| } |
| |
| private boolean isInitialized() { |
| return isInitialized; |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return isClosing; |
| } |
| |
| @Override |
| public int getLockTimeout() { |
| return lockTimeout; |
| } |
| |
| @Override |
| public void setLockTimeout(int seconds) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| lockTimeout = seconds; |
| } |
| |
| @Override |
| public int getLockLease() { |
| return lockLease; |
| } |
| |
| @Override |
| public void setLockLease(int seconds) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| lockLease = seconds; |
| } |
| |
| @Override |
| public int getSearchTimeout() { |
| return searchTimeout; |
| } |
| |
| @Override |
| public void setSearchTimeout(int seconds) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| searchTimeout = seconds; |
| } |
| |
| @Override |
| public int getMessageSyncInterval() { |
| return HARegionQueue.getMessageSyncInterval(); |
| } |
| |
| @Override |
| public void setMessageSyncInterval(int seconds) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| if (seconds < 0) { |
| throw new IllegalArgumentException( |
| "The 'messageSyncInterval' property for cache cannot be negative"); |
| } |
| HARegionQueue.setMessageSyncInterval(seconds); |
| } |
| |
| /** |
| * Get a reference to a Region that is reinitializing, or null if that Region is not |
| * reinitializing or this thread is interrupted. If a reinitializing region is found, then this |
| * method blocks until reinitialization is complete and then returns the region. |
| */ |
| @Override |
| public InternalRegion getReinitializingRegion(String fullPath) { |
| Future<InternalRegion> future = reinitializingRegions.get(fullPath); |
| if (future == null) { |
| return null; |
| } |
| try { |
| InternalRegion region = future.get(); |
| region.waitOnInitialization(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Returning manifested future for: {}", fullPath); |
| } |
| return region; |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| return null; |
| } catch (ExecutionException e) { |
| throw new Error("unexpected exception", e); |
| } catch (CancellationException ignore) { |
| // future was cancelled |
| logger.debug("future cancelled, returning null"); |
| return null; |
| } |
| } |
| |
| /** |
| * Register the specified region name as reinitializing, creating and adding a Future for it to |
| * the map. |
| * |
| * @throws IllegalStateException if there is already a region by that name registered. |
| */ |
| @Override |
| public void regionReinitializing(String fullPath) { |
| Object old = reinitializingRegions.putIfAbsent(fullPath, new FutureResult<>(stopper)); |
| if (old != null) { |
| throw new IllegalStateException( |
| String.format("Found an existing reinitalizing region named %s", |
| fullPath)); |
| } |
| } |
| |
| /** |
| * Set the reinitialized region and unregister it as reinitializing. |
| * |
| * @throws IllegalStateException if there is no region by that name registered as reinitializing. |
| */ |
| @Override |
| public void regionReinitialized(Region region) { |
| String regionName = region.getFullPath(); |
| FutureResult<InternalRegion> future = reinitializingRegions.get(regionName); |
| if (future == null) { |
| throw new IllegalStateException( |
| String.format("Could not find a reinitializing region named %s", |
| regionName)); |
| } |
| future.set((InternalRegion) region); |
| unregisterReinitializingRegion(regionName); |
| } |
| |
| /** |
| * Clear a reinitializing region, e.g. reinitialization failed. |
| * |
| * @throws IllegalStateException if cannot find reinitializing region registered by that name. |
| */ |
| @Override |
| public void unregisterReinitializingRegion(String fullPath) { |
| reinitializingRegions.remove(fullPath); |
| } |
| |
| /** |
| * Returns true if get should give a copy; false if a reference. |
| * |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public boolean isCopyOnRead() { |
| return copyOnRead; |
| } |
| |
| /** |
| * Implementation of {@link Cache#setCopyOnRead} |
| * |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public void setCopyOnRead(boolean copyOnRead) { |
| this.copyOnRead = copyOnRead; |
| } |
| |
| /** |
| * Implementation of {@link Cache#getCopyOnRead} |
| * |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public boolean getCopyOnRead() { |
| return copyOnRead; |
| } |
| |
| /** |
| * Remove the specified root region |
| * |
| * @param rootRgn the region to be removed |
| * @return true if root region was removed, false if not found |
| */ |
| @Override |
| public boolean removeRoot(InternalRegion rootRgn) { |
| synchronized (rootRegions) { |
| String regionName = rootRgn.getName(); |
| InternalRegion found = rootRegions.get(regionName); |
| if (found == rootRgn) { |
| InternalRegion previous = rootRegions.remove(regionName); |
| Assert.assertTrue(previous == rootRgn); |
| return true; |
| } else |
| return false; |
| } |
| } |
| |
| /** |
| * @return array of two Strings, the root name and the relative path from root. If there is no |
| * relative path from root, then String[1] will be an empty string |
| */ |
| static String[] parsePath(String path) { |
| validatePath(path); |
| String[] result = new String[2]; |
| result[1] = ""; |
| // strip off root name from path |
| int slashIndex = path.indexOf(Region.SEPARATOR_CHAR); |
| if (slashIndex == 0) { |
| path = path.substring(1); |
| slashIndex = path.indexOf(Region.SEPARATOR_CHAR); |
| } |
| result[0] = path; |
| if (slashIndex > 0) { |
| result[0] = path.substring(0, slashIndex); |
| result[1] = path.substring(slashIndex + 1); |
| } |
| return result; |
| } |
| |
| /** |
| * Makes note of a {@code CacheLifecycleListener} |
| */ |
| public static void addCacheLifecycleListener(CacheLifecycleListener listener) { |
| synchronized (GemFireCacheImpl.class) { |
| cacheLifecycleListeners.add(listener); |
| } |
| } |
| |
| /** |
| * Removes a {@code CacheLifecycleListener} |
| * |
| * @return Whether or not the listener was removed |
| */ |
| public static boolean removeCacheLifecycleListener(CacheLifecycleListener listener) { |
| synchronized (GemFireCacheImpl.class) { |
| return cacheLifecycleListeners.remove(listener); |
| } |
| } |
| |
| @Override |
| public void addRegionListener(RegionListener regionListener) { |
| regionListeners.add(regionListener); |
| } |
| |
| @Override |
| public void removeRegionListener(RegionListener regionListener) { |
| regionListeners.remove(regionListener); |
| } |
| |
| @Override |
| public Set<RegionListener> getRegionListeners() { |
| return Collections.unmodifiableSet(regionListeners); |
| } |
| |
| @Override |
| public <T extends CacheService> T getService(Class<T> clazz) { |
| return clazz.cast(services.get(clazz)); |
| } |
| |
| @Override |
| public Collection<CacheService> getServices() { |
| return Collections.unmodifiableCollection(services.values()); |
| } |
| |
| /** |
| * Creates the single instance of the Transaction Manager for this cache. Returns the existing one |
| * upon request. |
| * |
| * @return the CacheTransactionManager instance. |
| * |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public CacheTransactionManager getCacheTransactionManager() { |
| return transactionManager; |
| } |
| |
| /** |
| * GuardedBy {@link #ccpTimerMutex} |
| * |
| * @see CacheClientProxy |
| */ |
| private SystemTimer ccpTimer; |
| |
| /** |
| * @see #ccpTimer |
| */ |
| private final Object ccpTimerMutex = new Object(); |
| |
| /** |
| * Get cache-wide CacheClientProxy SystemTimer |
| * |
| * @return the timer, lazily created |
| */ |
| @Override |
| public SystemTimer getCCPTimer() { |
| synchronized (ccpTimerMutex) { |
| if (ccpTimer != null) { |
| return ccpTimer; |
| } |
| ccpTimer = new SystemTimer(getDistributedSystem(), true); |
| if (isClosing) { |
| ccpTimer.cancel(); // poison it, don't throw. |
| } |
| return ccpTimer; |
| } |
| } |
| |
| /** |
| * For use by unit tests to inject a mocked ccpTimer |
| */ |
| void setCCPTimer(SystemTimer ccpTimer) { |
| this.ccpTimer = ccpTimer; |
| } |
| |
| static final int PURGE_INTERVAL = 1000; |
| |
| private int cancelCount = 0; |
| |
| /** |
| * Does a periodic purge of the CCPTimer to prevent a large number of cancelled tasks from |
| * building up in it. See GEODE-2485. |
| */ |
| @Override |
| public void purgeCCPTimer() { |
| synchronized (ccpTimerMutex) { |
| if (ccpTimer != null) { |
| cancelCount++; |
| if (cancelCount == PURGE_INTERVAL) { |
| cancelCount = 0; |
| ccpTimer.timerPurge(); |
| } |
| } |
| } |
| } |
| |
| private final ExpirationScheduler expirationScheduler; |
| |
| /** |
| * Get cache-wide ExpirationScheduler |
| * |
| * @return the scheduler, lazily created |
| */ |
| @Override |
| public ExpirationScheduler getExpirationScheduler() { |
| return expirationScheduler; |
| } |
| |
| @Override |
| public TXManagerImpl getTXMgr() { |
| return transactionManager; |
| } |
| |
| /** |
| * Returns the {@code Executor} (thread pool) that is used to execute cache event listeners. |
| * Returns {@code null} if no pool exists. |
| * |
| * @since GemFire 3.5 |
| */ |
| @Override |
| public Executor getEventThreadPool() { |
| return eventThreadPool; |
| } |
| |
| @Override |
| public CacheServer addCacheServer() { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| |
| InternalCacheServer server = new ServerBuilder(this, securityService).createServer(); |
| allCacheServers.add(server); |
| |
| sendAddCacheServerProfileMessage(); |
| return server; |
| } |
| |
| @Override |
| public boolean removeCacheServer(CacheServer cacheServer) { |
| boolean removed = allCacheServers.remove(cacheServer); |
| sendRemoveCacheServerProfileMessage(); |
| return removed; |
| } |
| |
| @Override |
| public void addGatewaySender(GatewaySender sender) { |
| throwIfClient(); |
| |
| stopper.checkCancelInProgress(null); |
| |
| synchronized (allGatewaySendersLock) { |
| if (!allGatewaySenders.contains(sender)) { |
| new UpdateAttributesProcessor((DistributionAdvisee) sender).distribute(true); |
| Set<GatewaySender> newSenders = new HashSet<>(allGatewaySenders.size() + 1); |
| if (!allGatewaySenders.isEmpty()) { |
| newSenders.addAll(allGatewaySenders); |
| } |
| newSenders.add(sender); |
| allGatewaySenders = Collections.unmodifiableSet(newSenders); |
| } else { |
| throw new IllegalStateException( |
| String.format("A GatewaySender with id %s is already defined in this cache.", |
| sender.getId())); |
| } |
| } |
| |
| synchronized (rootRegions) { |
| Set<InternalRegion> applicationRegions = getApplicationRegions(); |
| for (InternalRegion region : applicationRegions) { |
| Set<String> senders = region.getAllGatewaySenderIds(); |
| if (senders.contains(sender.getId()) && !sender.isParallel()) { |
| region.senderCreated(); |
| } |
| } |
| } |
| |
| if (!sender.isParallel()) { |
| Region dynamicMetaRegion = getRegion(DynamicRegionFactory.DYNAMIC_REGION_LIST_NAME); |
| if (dynamicMetaRegion == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug(" The dynamic region is null. "); |
| } |
| } else { |
| dynamicMetaRegion.getAttributesMutator().addGatewaySenderId(sender.getId()); |
| } |
| } |
| if (!(sender.getRemoteDSId() < 0)) { |
| system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_CREATE, sender); |
| } |
| } |
| |
| @Override |
| public void removeGatewaySender(GatewaySender sender) { |
| throwIfClient(); |
| |
| stopper.checkCancelInProgress(null); |
| |
| synchronized (allGatewaySendersLock) { |
| if (allGatewaySenders.contains(sender)) { |
| new UpdateAttributesProcessor((DistributionAdvisee) sender, true).distribute(true); |
| Set<GatewaySender> newSenders = new HashSet<>(allGatewaySenders.size() - 1); |
| if (!allGatewaySenders.isEmpty()) { |
| newSenders.addAll(allGatewaySenders); |
| } |
| newSenders.remove(sender); |
| allGatewaySenders = Collections.unmodifiableSet(newSenders); |
| } |
| } |
| if (!(sender.getRemoteDSId() < 0)) { |
| system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_REMOVE, sender); |
| } |
| } |
| |
| @Override |
| public InternalCacheServer addGatewayReceiverServer(GatewayReceiver receiver) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| |
| requireNonNull(receiver, "GatewayReceiver must be supplied to add a server endpoint."); |
| requireNonNull(gatewayReceiver.get(), |
| "GatewayReceiver must be added before adding a server endpoint."); |
| |
| InternalCacheServer receiverServer = new ServerBuilder(this, securityService) |
| .forGatewayReceiver(receiver).createServer(); |
| gatewayReceiverServer.set(receiverServer); |
| |
| sendAddCacheServerProfileMessage(); |
| return receiverServer; |
| } |
| |
| @Override |
| public boolean removeGatewayReceiverServer(InternalCacheServer receiverServer) { |
| boolean removed = gatewayReceiverServer.compareAndSet(receiverServer, null); |
| sendRemoveCacheServerProfileMessage(); |
| return removed; |
| } |
| |
| @Override |
| public void addGatewayReceiver(GatewayReceiver receiver) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| requireNonNull(receiver, "GatewayReceiver must be supplied."); |
| gatewayReceiver.set(receiver); |
| } |
| |
| @Override |
| public void removeGatewayReceiver(GatewayReceiver receiver) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| gatewayReceiver.set(null); |
| } |
| |
| @Override |
| public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) { |
| allAsyncEventQueues.add(asyncQueue); |
| if (!asyncQueue.isMetaQueue()) { |
| allVisibleAsyncEventQueues.add(asyncQueue); |
| } |
| system.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue); |
| } |
| |
| /** |
| * Returns List of GatewaySender (excluding the senders for internal use) |
| * |
| * @return List List of GatewaySender objects |
| */ |
| @Override |
| public Set<GatewaySender> getGatewaySenders() { |
| Set<GatewaySender> senders = new HashSet<>(); |
| for (GatewaySender sender : allGatewaySenders) { |
| if (!((AbstractGatewaySender) sender).isForInternalUse()) { |
| senders.add(sender); |
| } |
| } |
| return senders; |
| } |
| |
| /** |
| * Returns List of all GatewaySenders (including the senders for internal use) |
| * |
| * @return List List of GatewaySender objects |
| */ |
| @Override |
| public Set<GatewaySender> getAllGatewaySenders() { |
| return allGatewaySenders; |
| } |
| |
| @Override |
| public GatewaySender getGatewaySender(String id) { |
| for (GatewaySender sender : allGatewaySenders) { |
| if (sender.getId().equals(id)) { |
| return sender; |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Set<GatewayReceiver> getGatewayReceivers() { |
| GatewayReceiver receiver = gatewayReceiver.get(); |
| if (receiver == null) { |
| return Collections.emptySet(); |
| } |
| return Collections.singleton(receiver); |
| } |
| |
| @Override |
| public Set<AsyncEventQueue> getAsyncEventQueues() { |
| return getAsyncEventQueues(true); |
| } |
| |
| @Override |
| public Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly) { |
| return visibleOnly ? allVisibleAsyncEventQueues : allAsyncEventQueues; |
| } |
| |
| @Override |
| public AsyncEventQueue getAsyncEventQueue(String id) { |
| for (AsyncEventQueue asyncEventQueue : allAsyncEventQueues) { |
| if (asyncEventQueue.getId().equals(id)) { |
| return asyncEventQueue; |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public void removeAsyncEventQueue(AsyncEventQueue asyncQueue) { |
| throwIfClient(); |
| // first remove the gateway sender of the queue |
| if (asyncQueue instanceof AsyncEventQueueImpl) { |
| removeGatewaySender(((AsyncEventQueueImpl) asyncQueue).getSender()); |
| } |
| // using gateway senders lock since async queue uses a gateway sender |
| synchronized (allGatewaySendersLock) { |
| allAsyncEventQueues.remove(asyncQueue); |
| allVisibleAsyncEventQueues.remove(asyncQueue); |
| } |
| system.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_REMOVE, asyncQueue); |
| } |
| |
| /** get the conflict resolver for WAN */ |
| @Override |
| public GatewayConflictResolver getGatewayConflictResolver() { |
| synchronized (allGatewayHubsLock) { |
| return gatewayConflictResolver; |
| } |
| } |
| |
| /** set the conflict resolver for WAN */ |
| @Override |
| public void setGatewayConflictResolver(GatewayConflictResolver resolver) { |
| synchronized (allGatewayHubsLock) { |
| gatewayConflictResolver = resolver; |
| } |
| } |
| |
| @Override |
| public List<CacheServer> getCacheServers() { |
| return unmodifiableAllCacheServers; |
| } |
| |
| @Override |
| public List<InternalCacheServer> getCacheServersAndGatewayReceiver() { |
| List<InternalCacheServer> allServers = new ArrayList<>(allCacheServers); |
| |
| InternalCacheServer receiverServer = gatewayReceiverServer.get(); |
| if (receiverServer != null) { |
| allServers.add(receiverServer); |
| } |
| |
| return Collections.unmodifiableList(allServers); |
| } |
| |
| /** |
| * add a partitioned region to the set of tracked partitioned regions. This is used to notify the |
| * regions when this cache requires, or does not require notification of all region/entry events. |
| */ |
| @Override |
| public void addPartitionedRegion(PartitionedRegion region) { |
| synchronized (partitionedRegions) { |
| if (region.isDestroyed()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", region); |
| } |
| return; |
| } |
| if (partitionedRegions.add(region)) { |
| getCachePerfStats().incPartitionedRegions(1); |
| } |
| } |
| } |
| |
| /** |
| * Returns a set of all current partitioned regions for test hook. |
| */ |
| @Override |
| public Set<PartitionedRegion> getPartitionedRegions() { |
| synchronized (partitionedRegions) { |
| return new HashSet<>(partitionedRegions); |
| } |
| } |
| |
| private SortedMap<String, Map<String, PartitionedRegion>> getPRTrees() { |
| // prTree will save a sublist of PRs who are under the same root |
| SortedMap<String, PartitionedRegion> prMap = getPartitionedRegionMap(); |
| boolean hasColocatedRegion = false; |
| for (PartitionedRegion pr : prMap.values()) { |
| List<PartitionedRegion> childList = ColocationHelper.getColocatedChildRegions(pr); |
| if (childList != null && !childList.isEmpty()) { |
| hasColocatedRegion = true; |
| break; |
| } |
| } |
| |
| TreeMap<String, Map<String, PartitionedRegion>> prTrees = new TreeMap<>(); |
| if (hasColocatedRegion) { |
| Map<String, PartitionedRegion> orderedPrMap = orderByColocation(prMap); |
| prTrees.put("ROOT", orderedPrMap); |
| } else { |
| for (PartitionedRegion pr : prMap.values()) { |
| String rootName = pr.getRoot().getName(); |
| Map<String, PartitionedRegion> prSubMap = |
| prTrees.computeIfAbsent(rootName, k -> new TreeMap<>()); |
| prSubMap.put(pr.getFullPath(), pr); |
| } |
| } |
| |
| return prTrees; |
| } |
| |
| private SortedMap<String, PartitionedRegion> getPartitionedRegionMap() { |
| SortedMap<String, PartitionedRegion> prMap = new TreeMap<>(); |
| for (Entry<String, InternalRegion> entry : pathToRegion.entrySet()) { |
| String regionName = entry.getKey(); |
| InternalRegion region = entry.getValue(); |
| |
| // Don't wait for non partitioned regions |
| if (!(region instanceof PartitionedRegion)) { |
| continue; |
| } |
| // Do a getRegion to ensure that we wait for the partitioned region |
| // to finish initialization |
| try { |
| Region pr = getRegion(regionName); |
| if (pr instanceof PartitionedRegion) { |
| prMap.put(regionName, (PartitionedRegion) pr); |
| } |
| } catch (CancelException ignore) { |
| // if some region throws cancel exception during initialization, |
| // then no need to shutDownAll them gracefully |
| } |
| } |
| |
| return prMap; |
| } |
| |
| private Map<String, PartitionedRegion> orderByColocation(Map<String, PartitionedRegion> prMap) { |
| LinkedHashMap<String, PartitionedRegion> orderedPrMap = new LinkedHashMap<>(); |
| for (PartitionedRegion pr : prMap.values()) { |
| addColocatedChildRecursively(orderedPrMap, pr); |
| } |
| return orderedPrMap; |
| } |
| |
| private void addColocatedChildRecursively(LinkedHashMap<String, PartitionedRegion> prMap, |
| PartitionedRegion pr) { |
| for (PartitionedRegion colocatedRegion : ColocationHelper.getColocatedChildRegions(pr)) { |
| addColocatedChildRecursively(prMap, colocatedRegion); |
| } |
| prMap.put(pr.getFullPath(), pr); |
| } |
| |
| /** |
| * check to see if any cache components require notification from a partitioned region. |
| * Notification adds to the messaging a PR must do on each put/destroy/invalidate operation and |
| * should be kept to a minimum |
| * |
| * @param region the partitioned region |
| * @return true if the region should deliver all of its events to this cache |
| */ |
| @Override |
| public boolean requiresNotificationFromPR(PartitionedRegion region) { |
| boolean hasSerialSenders = hasSerialSenders(region); |
| |
| if (!hasSerialSenders) { |
| for (InternalCacheServer server : allCacheServers) { |
| if (!server.getNotifyBySubscription()) { |
| hasSerialSenders = true; |
| break; |
| } |
| } |
| } |
| |
| if (!hasSerialSenders) { |
| InternalCacheServer receiverServer = gatewayReceiverServer.get(); |
| if (receiverServer != null && !receiverServer.getNotifyBySubscription()) { |
| hasSerialSenders = true; |
| } |
| } |
| |
| return hasSerialSenders; |
| } |
| |
| private boolean hasSerialSenders(PartitionedRegion region) { |
| boolean hasSenders = false; |
| Set<String> senders = region.getAllGatewaySenderIds(); |
| for (String sender : senders) { |
| GatewaySender gatewaySender = getGatewaySender(sender); |
| if (gatewaySender != null && !gatewaySender.isParallel()) { |
| hasSenders = true; |
| break; |
| } |
| } |
| return hasSenders; |
| } |
| |
| /** |
| * remove a partitioned region from the set of tracked instances. |
| * |
| * @see #addPartitionedRegion(PartitionedRegion) |
| */ |
| @Override |
| public void removePartitionedRegion(PartitionedRegion region) { |
| synchronized (partitionedRegions) { |
| if (partitionedRegions.remove(region)) { |
| getCachePerfStats().incPartitionedRegions(-1); |
| } |
| } |
| } |
| |
| @Override |
| public void setIsServer(boolean isServer) { |
| throwIfClient(); |
| stopper.checkCancelInProgress(null); |
| |
| this.isServer = isServer; |
| } |
| |
| @Override |
| public boolean isServer() { |
| if (isClient()) { |
| return false; |
| } |
| stopper.checkCancelInProgress(null); |
| |
| return isServer || !allCacheServers.isEmpty(); |
| } |
| |
| @Override |
| public InternalQueryService getQueryService() { |
| if (!isClient()) { |
| return new DefaultQueryService(this); |
| } |
| Pool defaultPool = getDefaultPool(); |
| if (defaultPool == null) { |
| throw new IllegalStateException( |
| "Client cache does not have a default pool. Use getQueryService(String poolName) instead."); |
| } |
| return (InternalQueryService) defaultPool.getQueryService(); |
| } |
| |
| @Override |
| public JSONFormatter getJsonFormatter() { |
| // only ProxyCache implementation needs a JSONFormatter that has reference to userAttributes |
| return new JSONFormatter(); |
| } |
| |
| @Override |
| public QueryService getLocalQueryService() { |
| return new DefaultQueryService(this); |
| } |
| |
| /** |
| * @return Context jndi context associated with the Cache. |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public Context getJNDIContext() { |
| return JNDIInvoker.getJNDIContext(); |
| } |
| |
| /** |
| * @return JTA TransactionManager associated with the Cache. |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public TransactionManager getJTATransactionManager() { |
| return JNDIInvoker.getTransactionManager(); |
| } |
| |
| /** |
| * return the cq/interest information for a given region name, creating one if it doesn't exist |
| */ |
| @Override |
| public FilterProfile getFilterProfile(String regionName) { |
| InternalRegion r = (InternalRegion) getRegion(regionName, true); |
| if (r != null) { |
| return r.getFilterProfile(); |
| } |
| return null; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <K, V> Map<String, RegionAttributes<K, V>> uncheckedCast( |
| Map<String, RegionAttributes<?, ?>> namedRegionAttributes) { |
| return (Map) namedRegionAttributes; |
| } |
| |
| @Override |
| public <K, V> RegionAttributes<K, V> getRegionAttributes(String id) { |
| return GemFireCacheImpl.<K, V>uncheckedCast(namedRegionAttributes).get(id); |
| } |
| |
| @Override |
| public <K, V> void setRegionAttributes(String id, RegionAttributes<K, V> attrs) { |
| if (attrs == null) { |
| namedRegionAttributes.remove(id); |
| } else { |
| namedRegionAttributes.put(id, attrs); |
| } |
| } |
| |
| @Override |
| public <K, V> Map<String, RegionAttributes<K, V>> listRegionAttributes() { |
| return Collections.unmodifiableMap(uncheckedCast(namedRegionAttributes)); |
| } |
| |
| private static final ThreadLocal<GemFireCacheImpl> xmlCache = new ThreadLocal<>(); |
| |
| @Override |
| public void loadCacheXml(InputStream is) |
| throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException { |
| // make this cache available to callbacks being initialized during xml create |
| final GemFireCacheImpl oldValue = xmlCache.get(); |
| xmlCache.set(this); |
| |
| Reader reader = null; |
| Writer stringWriter = null; |
| OutputStreamWriter writer = null; |
| |
| try { |
| CacheXmlParser xml; |
| |
| if (XML_PARAMETERIZATION_ENABLED) { |
| char[] buffer = new char[1024]; |
| reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.ISO_8859_1)); |
| stringWriter = new StringWriter(); |
| |
| int numChars; |
| while ((numChars = reader.read(buffer)) != -1) { |
| stringWriter.write(buffer, 0, numChars); |
| } |
| |
| /* |
| * Now replace all replaceable system properties here using {@code PropertyResolver} |
| */ |
| String replacedXmlString = resolver.processUnresolvableString(stringWriter.toString()); |
| /* |
| * Turn the string back into the default encoding so that the XML parser can work correctly |
| * in the presence of an "encoding" attribute in the XML prolog. |
| */ |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| writer = new OutputStreamWriter(baos, StandardCharsets.ISO_8859_1); |
| writer.write(replacedXmlString); |
| writer.flush(); |
| |
| xml = CacheXmlParser.parse(new ByteArrayInputStream(baos.toByteArray())); |
| } else { |
| xml = CacheXmlParser.parse(is); |
| } |
| xml.create(this); |
| } catch (IOException e) { |
| throw new CacheXmlException( |
| "Input Stream could not be read for system property substitutions.", e); |
| } finally { |
| xmlCache.set(oldValue); |
| closeQuietly(reader); |
| closeQuietly(stringWriter); |
| closeQuietly(writer); |
| } |
| } |
| |
| private static void closeQuietly(Closeable closeable) { |
| try { |
| if (closeable != null) { |
| closeable.close(); |
| } |
| } catch (IOException ignore) { |
| } |
| } |
| |
| @Override |
| public void readyForEvents() { |
| if (isClient()) { |
| // If a durable client has been configured... |
| if (Objects.nonNull(system) && Objects.nonNull(system.getConfig()) |
| && !Objects.equals(DistributionConfig.DEFAULT_DURABLE_CLIENT_ID, |
| Objects.toString(system.getConfig().getDurableClientId(), |
| DistributionConfig.DEFAULT_DURABLE_CLIENT_ID))) { |
| // Ensure that there is a pool to use for readyForEvents(). |
| if (Objects.isNull(defaultPool)) { |
| determineDefaultPool(); |
| } |
| } |
| } |
| PoolManagerImpl.readyForEvents(system, false); |
| } |
| |
| private List<File> backupFiles = Collections.emptyList(); |
| |
| @Override |
| public ResourceManager getResourceManager() { |
| return getInternalResourceManager(true); |
| } |
| |
| @Override |
| public InternalResourceManager getInternalResourceManager() { |
| return getInternalResourceManager(true); |
| } |
| |
| @Override |
| public InternalResourceManager getInternalResourceManager(boolean checkCancellationInProgress) { |
| if (checkCancellationInProgress) { |
| stopper.checkCancelInProgress(null); |
| } |
| return resourceManager; |
| } |
| |
| @Override |
| public void setBackupFiles(List<File> backups) { |
| backupFiles = backups; |
| } |
| |
| @Override |
| public List<File> getBackupFiles() { |
| return Collections.unmodifiableList(backupFiles); |
| } |
| |
| @Override |
| public BackupService getBackupService() { |
| return backupService; |
| } |
| |
| // TODO make this a simple int guarded by riWaiters and get rid of the double-check |
| private final AtomicInteger registerInterestsInProgress = new AtomicInteger(); |
| |
| private final List<SimpleWaiter> riWaiters = new ArrayList<>(); |
| |
| // never changes but is currently only initialized in constructor by unit tests |
| private TypeRegistry pdxRegistry; |
| |
| /** |
| * update stats for completion of a registerInterest operation |
| */ |
| @Override |
| public void registerInterestCompleted() { |
| // Don't do a cancellation check, it's just a moot point, that's all |
| if (isClosing) { |
| return; // just get out, all of the SimpleWaiters will die of their own accord |
| } |
| int numInProgress = registerInterestsInProgress.decrementAndGet(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("registerInterestCompleted: new value = {}", numInProgress); |
| } |
| if (numInProgress == 0) { |
| synchronized (riWaiters) { |
| // TODO: get rid of double-check |
| numInProgress = registerInterestsInProgress.get(); |
| if (numInProgress == 0) { // all clear |
| if (logger.isDebugEnabled()) { |
| logger.debug("registerInterestCompleted: Signalling end of register-interest"); |
| } |
| for (SimpleWaiter sw : riWaiters) { |
| sw.doNotify(); |
| } |
| riWaiters.clear(); |
| } // all clear |
| } // synchronized |
| } |
| } |
| |
| @Override |
| public void registerInterestStarted() { |
| // Don't do a cancellation check, it's just a moot point, that's all |
| int newVal = registerInterestsInProgress.incrementAndGet(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("registerInterestsStarted: new count = {}", newVal); |
| } |
| } |
| |
| /** |
| * Blocks until no register interests are in progress. |
| */ |
| @Override |
| public void waitForRegisterInterestsInProgress() { |
| // In *this* particular context, let the caller know that |
| // its cache has been cancelled. doWait below would do that as |
| // well, so this is just an early out. |
| getCancelCriterion().checkCancelInProgress(null); |
| |
| int count = registerInterestsInProgress.get(); |
| if (count > 0) { |
| SimpleWaiter simpleWaiter = null; |
| synchronized (riWaiters) { |
| // TODO double-check |
| count = registerInterestsInProgress.get(); |
| if (count > 0) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("waitForRegisterInterestsInProgress: count ={}", count); |
| } |
| simpleWaiter = new SimpleWaiter(); |
| riWaiters.add(simpleWaiter); |
| } |
| } // synchronized |
| if (simpleWaiter != null) { |
| simpleWaiter.doWait(); |
| } |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") |
| public void setQueryMonitorRequiredForResourceManager(boolean required) { |
| queryMonitorRequiredForResourceManager = required; |
| } |
| |
| @Override |
| public boolean isQueryMonitorDisabledForLowMemory() { |
| return queryMonitorDisabledForLowMem; |
| } |
| |
| /** |
| * Returns the QueryMonitor instance based on system property MAX_QUERY_EXECUTION_TIME. |
| * |
| * @since GemFire 6.0 |
| */ |
| @Override |
| public QueryMonitor getQueryMonitor() { |
| // Check to see if monitor is required if ResourceManager critical heap percentage is set |
| // or whether we override it with the system variable; |
| boolean monitorRequired = |
| !queryMonitorDisabledForLowMem && queryMonitorRequiredForResourceManager; |
| // Added for DUnit test purpose, which turns-on and off the this.testMaxQueryExecutionTime. |
| if (!(MAX_QUERY_EXECUTION_TIME > 0 || monitorRequired)) { |
| // if this.testMaxQueryExecutionTime is set, send the QueryMonitor. |
| // Else send null, so that the QueryMonitor is turned-off. |
| return null; |
| } |
| |
| // Return the QueryMonitor service if MAX_QUERY_EXECUTION_TIME is set or it is required by the |
| // ResourceManager and not overridden by system property. |
| if (queryMonitor == null) { |
| synchronized (queryMonitorLock) { |
| if (queryMonitor == null) { |
| int maxTime = MAX_QUERY_EXECUTION_TIME; |
| |
| if (monitorRequired && maxTime < 0) { |
| // this means that the resource manager is being used and we need to monitor query |
| // memory usage |
| // If no max execution time has been set, then we will default to five hours |
| maxTime = FIVE_HOURS; |
| } |
| |
| queryMonitor = |
| new QueryMonitor((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( |
| QUERY_MONITOR_THREAD_POOL_SIZE, |
| (runnable) -> new LoggingThread("QueryMonitor Thread", runnable)), |
| this, |
| maxTime); |
| if (logger.isDebugEnabled()) { |
| logger.debug("QueryMonitor thread started."); |
| } |
| } |
| } |
| } |
| return queryMonitor; |
| } |
| |
| /** |
| * Simple class to allow waiters for register interest. Has at most one thread that ever calls |
| * wait. |
| * |
| * @since GemFire 5.7 |
| */ |
| private class SimpleWaiter { |
| private boolean notified; |
| |
| SimpleWaiter() {} |
| |
| void doWait() { |
| synchronized (this) { |
| while (!notified) { |
| getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| wait(1000); |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| |
| void doNotify() { |
| synchronized (this) { |
| notified = true; |
| notifyAll(); |
| } |
| } |
| } |
| |
| private void sendAddCacheServerProfileMessage() { |
| Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds(); |
| AddCacheServerProfileMessage message = new AddCacheServerProfileMessage(); |
| message.operateOnLocalCache(this); |
| if (!otherMembers.isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Sending add cache server profile message to other members."); |
| } |
| ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers); |
| message.setRecipients(otherMembers); |
| message.processorId = replyProcessor.getProcessorId(); |
| dm.putOutgoing(message); |
| |
| // Wait for replies. |
| try { |
| replyProcessor.waitForReplies(); |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| |
| private void sendRemoveCacheServerProfileMessage() { |
| Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds(); |
| RemoveCacheServerProfileMessage message = new RemoveCacheServerProfileMessage(); |
| message.operateOnLocalCache(this); |
| |
| // Remove this while loop when we release GEODE 2.0 |
| // This block prevents sending a message to old members that do not know about |
| // the RemoveCacheServerProfileMessage |
| otherMembers.removeIf(member -> Version.GEODE_1_5_0.compareTo(member.getVersionObject()) > 0); |
| |
| if (!otherMembers.isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Sending remove cache server profile message to other members."); |
| } |
| ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers); |
| message.setRecipients(otherMembers); |
| message.processorId = replyProcessor.getProcessorId(); |
| dm.putOutgoing(message); |
| |
| // Wait for replies. |
| try { |
| replyProcessor.waitForReplies(); |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @Override |
| public TXManagerImpl getTxManager() { |
| return transactionManager; |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public <K, V> RegionFactory<K, V> createRegionFactory(RegionShortcut shortcut) { |
| throwIfClient(); |
| return new RegionFactoryImpl<>(this, shortcut); |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public <K, V> RegionFactory<K, V> createRegionFactory() { |
| throwIfClient(); |
| return new RegionFactoryImpl<>(this); |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public <K, V> RegionFactory<K, V> createRegionFactory(String regionAttributesId) { |
| throwIfClient(); |
| return new RegionFactoryImpl<>(this, regionAttributesId); |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public <K, V> RegionFactory<K, V> createRegionFactory(RegionAttributes<K, V> regionAttributes) { |
| throwIfClient(); |
| return new RegionFactoryImpl<>(this, regionAttributes); |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(ClientRegionShortcut shortcut) { |
| return new ClientRegionFactoryImpl<>(this, shortcut); |
| } |
| |
| @Override |
| public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(String regionAttributesId) { |
| return new ClientRegionFactoryImpl<>(this, regionAttributesId); |
| } |
| |
| /** |
| * @since GemFire 6.5 |
| */ |
| @Override |
| public QueryService getQueryService(String poolName) { |
| Pool pool = PoolManager.find(poolName); |
| if (pool == null) { |
| throw new IllegalStateException("Could not find a pool named " + poolName); |
| } else { |
| return pool.getQueryService(); |
| } |
| } |
| |
| @Override |
| public RegionService createAuthenticatedView(Properties userSecurityProperties) { |
| Pool pool = getDefaultPool(); |
| if (pool == null) { |
| throw new IllegalStateException("This cache does not have a default pool"); |
| } |
| return createAuthenticatedCacheView(pool, userSecurityProperties); |
| } |
| |
| @Override |
| public RegionService createAuthenticatedView(Properties userSecurityProperties, String poolName) { |
| Pool pool = PoolManager.find(poolName); |
| if (pool == null) { |
| throw new IllegalStateException("Pool " + poolName + " does not exist"); |
| } |
| return createAuthenticatedCacheView(pool, userSecurityProperties); |
| } |
| |
| private static RegionService createAuthenticatedCacheView(Pool pool, Properties properties) { |
| if (pool.getMultiuserAuthentication()) { |
| return ((PoolImpl) pool).createAuthenticatedCacheView(properties); |
| } else { |
| throw new IllegalStateException( |
| "The pool " + pool.getName() + " did not have multiuser-authentication set to true"); |
| } |
| } |
| |
| public static void initializeRegionShortcuts(Cache cache) { |
| for (RegionShortcut shortcut : RegionShortcut.values()) { |
| switch (shortcut) { |
| case PARTITION: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_REDUNDANT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_PERSISTENT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_REDUNDANT_PERSISTENT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_REDUNDANT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_PERSISTENT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_REDUNDANT_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE_PERSISTENT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE_PERSISTENT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setScope(Scope.LOCAL); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_PERSISTENT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| af.setScope(Scope.LOCAL); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setScope(Scope.LOCAL); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setScope(Scope.LOCAL); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_PERSISTENT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| af.setScope(Scope.LOCAL); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_PROXY: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setLocalMaxMemory(0); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PARTITION_PROXY_REDUNDANT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setLocalMaxMemory(0); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case REPLICATE_PROXY: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.EMPTY); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| default: |
| throw new IllegalStateException("unhandled enum " + shortcut); |
| } |
| } |
| } |
| |
| public static void initializeClientRegionShortcuts(Cache cache) { |
| for (ClientRegionShortcut shortcut : ClientRegionShortcut.values()) { |
| switch (shortcut) { |
| case LOCAL: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_PERSISTENT: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case LOCAL_PERSISTENT_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| cache.setRegionAttributes(shortcut.toString(), af.create()); |
| break; |
| } |
| case PROXY: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.EMPTY); |
| UserSpecifiedRegionAttributes<?, ?> attributes = |
| (UserSpecifiedRegionAttributes) af.create(); |
| attributes.requiresPoolName = true; |
| cache.setRegionAttributes(shortcut.toString(), attributes); |
| break; |
| } |
| case CACHING_PROXY: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| UserSpecifiedRegionAttributes<?, ?> attributes = |
| (UserSpecifiedRegionAttributes) af.create(); |
| attributes.requiresPoolName = true; |
| cache.setRegionAttributes(shortcut.toString(), attributes); |
| break; |
| } |
| case CACHING_PROXY_HEAP_LRU: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes()); |
| UserSpecifiedRegionAttributes<?, ?> attributes = |
| (UserSpecifiedRegionAttributes) af.create(); |
| attributes.requiresPoolName = true; |
| cache.setRegionAttributes(shortcut.toString(), attributes); |
| break; |
| } |
| case CACHING_PROXY_OVERFLOW: { |
| AttributesFactory<?, ?> af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setEvictionAttributes( |
| EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK)); |
| UserSpecifiedRegionAttributes<?, ?> attributes = |
| (UserSpecifiedRegionAttributes) af.create(); |
| attributes.requiresPoolName = true; |
| cache.setRegionAttributes(shortcut.toString(), attributes); |
| break; |
| } |
| default: |
| throw new IllegalStateException("unhandled enum " + shortcut); |
| } |
| } |
| } |
| |
| @Override |
| public void beginDestroy(String path, DistributedRegion region) { |
| regionsInDestroy.putIfAbsent(path, region); |
| } |
| |
| @Override |
| public void endDestroy(String path, DistributedRegion region) { |
| regionsInDestroy.remove(path, region); |
| } |
| |
| @Override |
| public DistributedRegion getRegionInDestroy(String path) { |
| return regionsInDestroy.get(path); |
| } |
| |
| @Override |
| public TombstoneService getTombstoneService() { |
| return tombstoneService; |
| } |
| |
| @Override |
| public TypeRegistry getPdxRegistry() { |
| return pdxRegistry; |
| } |
| |
| @Override |
| public boolean getPdxReadSerialized() { |
| return cacheConfig.pdxReadSerialized; |
| } |
| |
| @Override |
| public PdxSerializer getPdxSerializer() { |
| return cacheConfig.pdxSerializer; |
| } |
| |
| @Override |
| public String getPdxDiskStore() { |
| return cacheConfig.pdxDiskStore; |
| } |
| |
| @Override |
| public boolean getPdxPersistent() { |
| return cacheConfig.pdxPersistent; |
| } |
| |
| @Override |
| public boolean getPdxIgnoreUnreadFields() { |
| return cacheConfig.pdxIgnoreUnreadFields; |
| } |
| |
| /** |
| * Returns true if any of the GemFire services prefers PdxInstance. And application has not |
| * requested getObject() on the PdxInstance. |
| */ |
| @Override |
| public boolean getPdxReadSerializedByAnyGemFireServices() { |
| TypeRegistry pdxRegistry = getPdxRegistry(); |
| boolean pdxReadSerializedOverriden = false; |
| if (pdxRegistry != null) { |
| pdxReadSerializedOverriden = pdxRegistry.getPdxReadSerializedOverride(); |
| } |
| |
| return (getPdxReadSerialized() || pdxReadSerializedOverriden) |
| && PdxInstanceImpl.getPdxReadSerialized(); |
| } |
| |
| @Override |
| public CacheConfig getCacheConfig() { |
| return cacheConfig; |
| } |
| |
| @Override |
| public DistributionManager getDistributionManager() { |
| return dm; |
| } |
| |
| @Override |
| public GatewaySenderFactory createGatewaySenderFactory() { |
| return WANServiceProvider.createGatewaySenderFactory(this); |
| } |
| |
| @Override |
| public GatewayReceiverFactory createGatewayReceiverFactory() { |
| return WANServiceProvider.createGatewayReceiverFactory(this); |
| } |
| |
| @Override |
| public AsyncEventQueueFactory createAsyncEventQueueFactory() { |
| return new AsyncEventQueueFactoryImpl(this); |
| } |
| |
| @Override |
| public DistributionAdvisor getDistributionAdvisor() { |
| return getResourceAdvisor(); |
| } |
| |
| @Override |
| public ResourceAdvisor getResourceAdvisor() { |
| return resourceAdvisor; |
| } |
| |
| @Override |
| public Profile getProfile() { |
| return resourceAdvisor.createProfile(); |
| } |
| |
| @Override |
| public DistributionAdvisee getParentAdvisee() { |
| return null; |
| } |
| |
| @Override |
| public InternalDistributedSystem getSystem() { |
| return system; |
| } |
| |
| @Override |
| public String getFullPath() { |
| return "ResourceManager"; |
| } |
| |
| @Override |
| public void fillInProfile(Profile profile) { |
| resourceManager.fillInProfile(profile); |
| } |
| |
| @Override |
| public int getSerialNumber() { |
| return serialNumber; |
| } |
| |
| @Override |
| public TXEntryStateFactory getTXEntryStateFactory() { |
| return txEntryStateFactory; |
| } |
| |
| // test hook |
| public void setPdxSerializer(PdxSerializer serializer) { |
| cacheConfig.setPdxSerializer(serializer); |
| basicSetPdxSerializer(serializer); |
| } |
| |
| private void basicSetPdxSerializer(PdxSerializer serializer) { |
| TypeRegistry.setPdxSerializer(serializer); |
| if (serializer instanceof ReflectionBasedAutoSerializer) { |
| AutoSerializableManager autoSerializableManager = |
| (AutoSerializableManager) ((ReflectionBasedAutoSerializer) serializer).getManager(); |
| if (autoSerializableManager != null) { |
| autoSerializableManager.setRegionService(this); |
| } |
| } |
| } |
| |
| @Override |
| public void setReadSerializedForCurrentThread(boolean value) { |
| PdxInstanceImpl.setPdxReadSerialized(value); |
| setPdxReadSerializedOverride(value); |
| } |
| |
| // test hook |
| @Override |
| public void setReadSerializedForTest(boolean value) { |
| cacheConfig.setPdxReadSerialized(value); |
| } |
| |
| @Override |
| public void setDeclarativeCacheConfig(CacheConfig cacheConfig) { |
| this.cacheConfig.setDeclarativeConfig(cacheConfig); |
| basicSetPdxSerializer(this.cacheConfig.getPdxSerializer()); |
| } |
| |
| /** |
| * Add to the map of declarable properties. Any properties that exactly match existing properties |
| * for a class in the list will be discarded (no duplicate Properties allowed). |
| * |
| * @param mapOfNewDeclarableProps Map of the declarable properties to add |
| */ |
| @Override |
| public void addDeclarableProperties(final Map<Declarable, Properties> mapOfNewDeclarableProps) { |
| synchronized (declarablePropertiesMap) { |
| for (Entry<Declarable, Properties> newEntry : mapOfNewDeclarableProps.entrySet()) { |
| // Find and remove a Declarable from the map if an "equal" version is already stored |
| Class<? extends Declarable> clazz = newEntry.getKey().getClass(); |
| |
| Declarable matchingDeclarable = null; |
| for (Entry<Declarable, Properties> oldEntry : declarablePropertiesMap.entrySet()) { |
| |
| BiPredicate<Declarable, Declarable> isKeyIdentifiableAndSameIdPredicate = |
| (Declarable oldKey, Declarable newKey) -> newKey instanceof Identifiable |
| && ((Identifiable) oldKey).getId().equals(((Identifiable) newKey).getId()); |
| |
| Supplier<Boolean> isKeyClassSame = |
| () -> clazz.getName().equals(oldEntry.getKey().getClass().getName()); |
| Supplier<Boolean> isValueEqual = () -> newEntry.getValue().equals(oldEntry.getValue()); |
| Supplier<Boolean> isKeyIdentifiableAndSameId = |
| () -> isKeyIdentifiableAndSameIdPredicate.test(oldEntry.getKey(), newEntry.getKey()); |
| |
| if (isKeyClassSame.get() && (isValueEqual.get() || isKeyIdentifiableAndSameId.get())) { |
| matchingDeclarable = oldEntry.getKey(); |
| break; |
| } |
| } |
| if (matchingDeclarable != null) { |
| declarablePropertiesMap.remove(matchingDeclarable); |
| } |
| |
| // Now add the new/replacement properties to the map |
| declarablePropertiesMap.put(newEntry.getKey(), newEntry.getValue()); |
| } |
| } |
| } |
| |
| private Declarable initializer; |
| |
| private Properties initializerProps; |
| |
| @Override |
| public Declarable getInitializer() { |
| return initializer; |
| } |
| |
| @Override |
| public Properties getInitializerProps() { |
| return initializerProps; |
| } |
| |
| @Override |
| public void setInitializer(Declarable initializer, Properties initializerProps) { |
| this.initializer = initializer; |
| this.initializerProps = initializerProps; |
| } |
| |
| @Override |
| public PdxInstanceFactory createPdxInstanceFactory(String className) { |
| return PdxInstanceFactoryImpl.newCreator(className, true, this); |
| } |
| |
| @Override |
| public PdxInstanceFactory createPdxInstanceFactory(String className, boolean expectDomainClass) { |
| return PdxInstanceFactoryImpl.newCreator(className, expectDomainClass, this); |
| } |
| |
| @Override |
| public PdxInstance createPdxEnum(String className, String enumName, int enumOrdinal) { |
| return PdxInstanceFactoryImpl.createPdxEnum(className, enumName, enumOrdinal, this); |
| } |
| |
| @Override |
| public JmxManagerAdvisor getJmxManagerAdvisor() { |
| return jmxAdvisor; |
| } |
| |
| @Override |
| public CacheSnapshotService getSnapshotService() { |
| return new CacheSnapshotServiceImpl(this); |
| } |
| |
| private void startColocatedJmxManagerLocator() { |
| InternalLocator loc = InternalLocator.getLocator(); |
| if (loc != null) { |
| loc.startJmxManagerLocationService(this); |
| } |
| } |
| |
| @Override |
| public MemoryAllocator getOffHeapStore() { |
| return getSystem().getOffHeapStore(); |
| } |
| |
| @Override |
| public DiskStoreMonitor getDiskStoreMonitor() { |
| return diskMonitor; |
| } |
| |
| /** |
| * @see Extensible#getExtensionPoint() |
| * @since GemFire 8.1 |
| */ |
| @Override |
| public ExtensionPoint<Cache> getExtensionPoint() { |
| return extensionPoint; |
| } |
| |
| @Override |
| public CqService getCqService() { |
| return cqService; |
| } |
| |
| private void addRegionEntrySynchronizationListener(RegionEntrySynchronizationListener listener) { |
| synchronizationListeners.add(listener); |
| } |
| |
| @Override |
| public void invokeRegionEntrySynchronizationListenersAfterSynchronization( |
| InternalDistributedMember sender, InternalRegion region, |
| List<InitialImageOperation.Entry> entriesToSynchronize) { |
| for (RegionEntrySynchronizationListener listener : synchronizationListeners) { |
| try { |
| listener.afterSynchronization(sender, region, entriesToSynchronize); |
| } catch (Throwable t) { |
| logger.warn(String.format( |
| "Caught the following exception attempting to synchronize events from member=%s; regionPath=%s; entriesToSynchronize=%s:", |
| sender, region.getFullPath(), entriesToSynchronize), t); |
| } |
| } |
| } |
| |
| @Override |
| public Object convertPdxInstanceIfNeeded(Object obj, boolean preferCD) { |
| Object result = obj; |
| if (obj instanceof InternalPdxInstance) { |
| InternalPdxInstance pdxInstance = (InternalPdxInstance) obj; |
| if (preferCD) { |
| try { |
| result = new PreferBytesCachedDeserializable(pdxInstance.toBytes()); |
| } catch (IOException ignore) { |
| // Could not convert pdx to bytes here; it will be tried again later |
| // and an exception will be thrown there. |
| } |
| } else if (!getPdxReadSerialized()) { |
| result = pdxInstance.getObject(); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public Boolean getPdxReadSerializedOverride() { |
| TypeRegistry pdxRegistry = getPdxRegistry(); |
| if (pdxRegistry != null) { |
| return pdxRegistry.getPdxReadSerializedOverride(); |
| } |
| return false; |
| } |
| |
| @Override |
| public void setPdxReadSerializedOverride(boolean pdxReadSerialized) { |
| TypeRegistry pdxRegistry = getPdxRegistry(); |
| if (pdxRegistry != null) { |
| pdxRegistry.setPdxReadSerializedOverride(pdxReadSerialized); |
| } |
| } |
| |
| @Override |
| public void registerPdxMetaData(Object instance) { |
| try { |
| byte[] blob = BlobHelper.serializeToBlob(instance); |
| if (blob.length == 0 || blob[0] != DSCODE.PDX.toByte()) { |
| throw new SerializationException("The instance is not PDX serializable"); |
| } |
| } catch (IOException e) { |
| throw new SerializationException("Serialization failed", e); |
| } |
| } |
| |
| private void throwIfClient() { |
| if (isClient()) { |
| throw new UnsupportedOperationException("operation is not supported on a client cache"); |
| } |
| } |
| |
| private final InternalCacheForClientAccess cacheForClients = |
| new InternalCacheForClientAccess(this); |
| |
| @Override |
| public InternalCacheForClientAccess getCacheForProcessingClientRequests() { |
| return cacheForClients; |
| } |
| |
| private ThreadsMonitoring getThreadMonitorObj() { |
| if (dm != null) { |
| return dm.getThreadMonitoring(); |
| } else { |
| return null; |
| } |
| } |
| |
| } |