blob: 76488dd65cc87ce23b0b9facc5963eb5806291f1 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Reader;
import java.io.StringBufferInputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.naming.Context;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.ForcedDisconnectException;
import com.gemstone.gemfire.GemFireCacheException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.admin.internal.SystemMemberCacheEventProcessor;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheExistsException;
import com.gemstone.gemfire.cache.CacheRuntimeException;
import com.gemstone.gemfire.cache.CacheTransactionManager;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.CacheXmlException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.DynamicRegionFactory;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.GatewayException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionService;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.ClientMetadataService;
import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
import com.gemstone.gemfire.cache.lucene.LuceneService;
import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.snapshot.CacheSnapshotService;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.cache.wan.GatewayReceiver;
import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
import com.gemstone.gemfire.distributed.DistributedLockService;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ResourceEvent;
import com.gemstone.gemfire.distributed.internal.ResourceEventsListener;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.distributed.internal.locks.DLockService;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ClassPathLoader;
import com.gemstone.gemfire.internal.JarDeployer;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SocketIOWithTimeout;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor;
import com.gemstone.gemfire.internal.cache.execute.util.FindRestEnabledServersFunction;
import com.gemstone.gemfire.internal.cache.extension.Extensible;
import com.gemstone.gemfire.internal.cache.extension.ExtensionPoint;
import com.gemstone.gemfire.internal.cache.extension.SimpleExtensionPoint;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.locks.TXLockService;
import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
import com.gemstone.gemfire.internal.cache.lru.OffHeapEvictor;
import com.gemstone.gemfire.internal.cache.partitioned.RedundancyAlreadyMetException;
import com.gemstone.gemfire.internal.cache.persistence.BackupManager;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
import com.gemstone.gemfire.internal.cache.persistence.query.TemporaryResultSetFactory;
import com.gemstone.gemfire.internal.cache.snapshot.CacheSnapshotServiceImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
import com.gemstone.gemfire.internal.cache.wan.WANServiceProvider;
import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlParser;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlPropertyResolver;
import com.gemstone.gemfire.internal.cache.xmlcache.PropertyResolver;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.jndi.JNDIInvoker;
import com.gemstone.gemfire.internal.jta.TransactionManagerImpl;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
import com.gemstone.gemfire.internal.process.ClusterConfigurationNotAvailableException;
import com.gemstone.gemfire.internal.sequencelog.SequenceLoggerImpl;
import com.gemstone.gemfire.internal.tcp.ConnectionTable;
import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
import com.gemstone.gemfire.lang.Identifiable;
import com.gemstone.gemfire.management.internal.JmxManagerAdvisee;
import com.gemstone.gemfire.management.internal.JmxManagerAdvisor;
import com.gemstone.gemfire.management.internal.RestAgent;
import com.gemstone.gemfire.management.internal.beans.ManagementListener;
import com.gemstone.gemfire.management.internal.configuration.messages.ConfigurationResponse;
import com.gemstone.gemfire.memcached.GemFireMemcachedServer;
import com.gemstone.gemfire.memcached.GemFireMemcachedServer.Protocol;
import com.gemstone.gemfire.pdx.PdxInstance;
import com.gemstone.gemfire.pdx.PdxInstanceFactory;
import com.gemstone.gemfire.pdx.PdxSerializer;
import com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer;
import com.gemstone.gemfire.pdx.internal.AutoSerializableManager;
import com.gemstone.gemfire.pdx.internal.PdxInstanceFactoryImpl;
import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
import com.gemstone.gemfire.pdx.internal.TypeRegistry;
import com.gemstone.gemfire.redis.GemFireRedisServer;
import com.sun.jna.Native;
import com.sun.jna.Platform;
// @todo somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc.
/**
* GemFire's implementation of a distributed {@link com.gemstone.gemfire.cache.Cache}.
*
* @author Darrel Schneider
*/
@SuppressWarnings("deprecation")
public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, Extensible<Cache> {
private static final Logger logger = LogService.getLogger();
// moved *SERIAL_NUMBER stuff to DistributionAdvisor
/** The default number of seconds to wait for a distributed lock */
public static final int DEFAULT_LOCK_TIMEOUT = Integer.getInteger("gemfire.Cache.defaultLockTimeout", 60).intValue();
/**
* The default duration (in seconds) of a lease on a distributed lock
*/
public static final int DEFAULT_LOCK_LEASE = Integer.getInteger("gemfire.Cache.defaultLockLease", 120).intValue();
/** The default "copy on read" attribute value */
public static final boolean DEFAULT_COPY_ON_READ = false;
/** the last instance of GemFireCache created */
private static volatile GemFireCacheImpl instance = null;
/**
* Just like instance but is valid for a bit longer so that pdx can still find the cache during a close.
*/
private static volatile GemFireCacheImpl pdxInstance = null;
/**
* The default amount of time to wait for a <code>netSearch</code> to complete
*/
public static final int DEFAULT_SEARCH_TIMEOUT = Integer.getInteger("gemfire.Cache.defaultSearchTimeout", 300).intValue();
/**
* The <code>CacheLifecycleListener</code> s that have been registered in this VM
*/
private static final Set<CacheLifecycleListener> cacheLifecycleListeners = new HashSet<CacheLifecycleListener>();
/**
* Define LocalRegion.ASYNC_EVENT_LISTENERS=true to invoke event listeners in the background
*/
public static final boolean ASYNC_EVENT_LISTENERS = Boolean.getBoolean("gemfire.Cache.ASYNC_EVENT_LISTENERS");
/**
* If true then when a delta is applied the size of the entry value will be recalculated. If false (the default) then
* the size of the entry value is unchanged by a delta application. Not a final so that tests can change this value.
*
* @since hitachi 6.1.2.9
*/
public static boolean DELTAS_RECALCULATE_SIZE = Boolean.getBoolean("gemfire.DELTAS_RECALCULATE_SIZE");
public static final int EVENT_QUEUE_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_QUEUE_LIMIT", 4096).intValue();
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time is set in MiliSecs.
*/
public static final int MAX_QUERY_EXECUTION_TIME = Integer.getInteger("gemfire.Cache.MAX_QUERY_EXECUTION_TIME", -1).intValue();
/**
* System property to disable query monitor even if resource manager is in use
*/
public final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean.getBoolean("gemfire.Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
/**
* Property set to true if resource manager heap percentage is set and query monitor is required
*/
public static Boolean QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = Boolean.FALSE;
/**
* This property defines internal function that will get executed on each node to fetch active REST service endpoints (servers).
*/
public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID = FindRestEnabledServersFunction.class.getName();
/**
* True if the user is allowed lock when memory resources appear to be overcommitted.
*/
public static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED = Boolean.getBoolean("gemfire.Cache.ALLOW_MEMORY_OVERCOMMIT");
//time in ms
private static final int FIVE_HOURS = 5 * 60 * 60 * 1000;
/** To test MAX_QUERY_EXECUTION_TIME option. */
public int TEST_MAX_QUERY_EXECUTION_TIME = -1;
public boolean TEST_MAX_QUERY_EXECUTION_TIME_OVERRIDE_EXCEPTION = false;
// ///////////////////// Instance Fields ///////////////////////
private final InternalDistributedSystem system;
private final DM dm;
// This is a HashMap because I know that clear() on it does
// not allocate objects.
private final HashMap rootRegions;
/**
* True if this cache is being created by a ClientCacheFactory.
*/
private final boolean isClient;
private PoolFactory clientpf;
/**
* It is not final to allow cache.xml parsing to set it.
*/
private Pool defaultPool;
private final ConcurrentMap pathToRegion = new ConcurrentHashMap();
protected volatile boolean isClosing = false;
protected volatile boolean closingGatewaySendersByShutdownAll = false;
protected volatile boolean closingGatewayReceiversByShutdownAll = false;
/** Amount of time (in seconds) to wait for a distributed lock */
private int lockTimeout = DEFAULT_LOCK_TIMEOUT;
/** Amount of time a lease of a distributed lock lasts */
private int lockLease = DEFAULT_LOCK_LEASE;
/** Amount of time to wait for a <code>netSearch</code> to complete */
private int searchTimeout = DEFAULT_SEARCH_TIMEOUT;
private final CachePerfStats cachePerfStats;
/** Date on which this instances was created */
private final Date creationDate;
/** thread pool for event dispatching */
private final ThreadPoolExecutor eventThreadPool;
/** indicates whether this is a SQLFabric system */
private boolean sqlfSystem;
/**
* SQLFabric's static distribution advisee.
*/
private volatile DistributionAdvisee sqlfAdvisee;
/**
* the list of all bridge servers. CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval
* operations. It is assumed that the traversal operations on bridge servers list vastly outnumber the mutative
* operations such as add, remove.
*/
private volatile List allBridgeServers = new CopyOnWriteArrayList();
/**
* Controls updates to the list of all gateway senders
*
* @see #allGatewaySenders
*/
public final Object allGatewaySendersLock = new Object();
/**
* the set of all gateway senders. It may be fetched safely (for enumeration), but updates must by synchronized via
* {@link #allGatewaySendersLock}
*/
private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet();
/**
* The list of all async event queues added to the cache.
* CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations.
*/
private volatile Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>();
/**
* Controls updates to the list of all gateway receivers
*
* @see #allGatewayReceivers
*/
public final Object allGatewayReceiversLock = new Object();
/**
* the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must by synchronized via
* {@link #allGatewayReceiversLock}
*/
private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
/** PartitionedRegion instances (for required-events notification */
// This is a HashSet because I know that clear() on it does not
// allocate any objects.
private final HashSet<PartitionedRegion> partitionedRegions = new HashSet<PartitionedRegion>();
/**
* Fix for 42051 This is a map of regions that are in the process of being destroyed. We could potentially leave the
* regions in the pathToRegion map, but that would entail too many changes at this point in the release. We need to
* know which regions are being destroyed so that a profile exchange can get the persistent id of the destroying
* region and know not to persist that ID if it receives it as part of the persistent view.
*/
private final ConcurrentMap<String, DistributedRegion> regionsInDestroy = new ConcurrentHashMap<String, DistributedRegion>();
public final Object allGatewayHubsLock = new Object();
/**
* conflict resolver for WAN, if any
* @guarded.By {@link #allGatewayHubsLock}
*/
private GatewayConflictResolver gatewayConflictResolver;
/** Is this is "server" cache? */
private boolean isServer = false;
/** transaction manager for this cache */
private final TXManagerImpl txMgr;
private RestAgent restAgent;
private boolean isRESTServiceRunning = false;
/** Copy on Read feature for all read operations e.g. get */
private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ;
/** The named region attributes registered with this cache. */
private final Map namedRegionAttributes = Collections.synchronizedMap(new HashMap());
/**
* if this cache was forced to close due to a forced-disconnect, we retain a ForcedDisconnectException that can be
* used as the cause
*/
private boolean forcedDisconnect;
/**
* if this cache was forced to close due to a forced-disconnect or system failure, this keeps track of the reason
*/
protected volatile Throwable disconnectCause = null;
/** context where this cache was created -- for debugging, really... */
public Exception creationStack = null;
/**
* a system timer task for cleaning up old bridge thread event entries
*/
private EventTracker.ExpiryTask recordedEventSweeper;
private TombstoneService tombstoneService;
/**
* DistributedLockService for PartitionedRegions. Remains null until the first PartitionedRegion is created. Destroyed
* by GemFireCache when closing the cache. Protected by synchronization on this GemFireCache.
*
* @guarded.By prLockServiceLock
*/
private DistributedLockService prLockService;
/**
* lock used to access prLockService
*/
private final Object prLockServiceLock = new Object();
/**
* DistributedLockService for GatewaySenders. Remains null until the
* first GatewaySender is created. Destroyed by GemFireCache when closing
* the cache.
* @guarded.By gatewayLockServiceLock
*/
private volatile DistributedLockService gatewayLockService;
/**
* Lock used to access gatewayLockService
*/
private final Object gatewayLockServiceLock = new Object();
private final InternalResourceManager resourceManager;
private final AtomicReference<BackupManager> backupManager = new AtomicReference<BackupManager>();
private HeapEvictor heapEvictor = null;
private OffHeapEvictor offHeapEvictor = null;
private final Object heapEvictorLock = new Object();
private final Object offHeapEvictorLock = new Object();
private ResourceEventsListener listener;
/**
* Enabled when CacheExistsException issues arise in debugging
*
* @see #creationStack
*/
private static final boolean DEBUG_CREATION_STACK = false;
private volatile QueryMonitor queryMonitor;
private final Object queryMonitorLock = new Object();
private final PersistentMemberManager persistentMemberManager;
private ClientMetadataService clientMetadatService = null;
private final Object clientMetaDatServiceLock = new Object();
private volatile boolean isShutDownAll = false;
/**
* Set of members that are not yet ready. Currently used by SQLFabric during initial DDL replay to indicate that the
* member should not be chosen for primary buckets.
*/
private final HashSet<InternalDistributedMember> unInitializedMembers = new HashSet<InternalDistributedMember>();
/**
* Set of {@link BucketAdvisor}s for this node that are pending for volunteer for primary due to uninitialized node
* (SQLFabric DDL replay in progress).
*/
private final LinkedHashSet<BucketAdvisor> deferredVolunteerForPrimary = new LinkedHashSet<BucketAdvisor>();
private final ResourceAdvisor resourceAdvisor;
private final JmxManagerAdvisor jmxAdvisor;
private final int serialNumber;
/** system property to indicate SQLFabric product */
public static final String SQLFABRIC_PRODUCT_PROP = "sqlfabric.product";
private final TXEntryStateFactory txEntryStateFactory;
static final String SQLF_ENTRY_FACTORY_PROVIDER = "com.gemstone.sqlfabric."
+ "internal.engine.store.entry.RegionEntryFactoryProvider";
private final CacheConfig cacheConfig;
private final DiskStoreMonitor diskMonitor;
// Stores the properties used to initialize declarables.
private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<Declarable, Properties>();
/** {@link PropertyResolver} to resolve ${} type property strings */
protected static PropertyResolver resolver;
protected static boolean xmlParameterizationEnabled = !Boolean.getBoolean("gemfire.xml.parameterization.disabled");
/**
* the memcachedServer instance that is started when {@link DistributionConfig#getMemcachedPort()}
* is specified
*/
private GemFireMemcachedServer memcachedServer;
/**
* Redis server is started when {@link DistributionConfig#getRedisPort()} is set
*/
private GemFireRedisServer redisServer;
/**
* {@link ExtensionPoint} support.
* @since 8.1
*/
private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<Cache>(this, this);
private final CqService cqService;
private final LuceneService luceneService;
public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
private static int clientFunctionTimeout;
private final static Boolean DISABLE_AUTO_EVICTION = Boolean.getBoolean("gemfire.disableAutoEviction");
static {
// this works around jdk bug 6427854, reported in ticket #44434
String propertyName = "sun.nio.ch.bugLevel";
String value = System.getProperty(propertyName);
if (value == null) {
System.setProperty(propertyName, "");
}
}
/**
* Invokes mlockall(). Locks all pages mapped into the address space of the
* calling process. This includes the pages of the code, data and stack segment,
* as well as shared libraries, user space kernel data, shared memory, and
* memory-mapped files. All mapped pages are guaranteed to be resident in RAM
* when the call returns successfully; the pages are guaranteed to stay in RAM
* until later unlocked.
*
* @param flags
* MCL_CURRENT 1 - Lock all pages which are currently mapped into the
* address space of the process.
*
* MCL_FUTURE 2 - Lock all pages which will become mapped into the address
* space of the process in the future. These could be for instance new
* pages required by a growing heap and stack as well as new memory mapped
* files or shared memory regions.
*
* @return
* 0 if success, non-zero if error and errno set
*
*/
private static native int mlockall(int flags);
public static void lockMemory() {
int result = 0;
try {
Native.register(Platform.C_LIBRARY_NAME);
result = mlockall(1);
if (result == 0) {
return;
}
} catch (Throwable t) {
throw new IllegalStateException("Error trying to lock memory", t);
}
int errno = Native.getLastError();
String msg = "mlockall failed: " + errno;
if (errno == 1 || errno == 12) { // EPERM || ENOMEM
msg = "Unable to lock memory due to insufficient free space or privileges. "
+ "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and "
+ "increase the available memory if needed";
}
throw new IllegalStateException(msg);
}
/**
* This is for debugging cache-open issues (esp. {@link com.gemstone.gemfire.cache.CacheExistsException})
*/
@Override
public String toString() {
final StringBuffer sb = new StringBuffer();
sb.append("GemFireCache[");
sb.append("id = " + System.identityHashCode(this));
sb.append("; isClosing = " + this.isClosing);
sb.append("; isShutDownAll = " + this.isShutDownAll);
sb.append("; created = " + this.creationDate);
sb.append("; server = " + this.isServer);
sb.append("; copyOnRead = " + this.copyOnRead);
sb.append("; lockLease = " + this.lockLease);
sb.append("; lockTimeout = " + this.lockTimeout);
// sb.append("; rootRegions = (" + this.rootRegions + ")");
// sb.append("; bridgeServers = (" + this.bridgeServers + ")");
// sb.append("; regionAttributes = (" + this.listRegionAttributes());
// sb.append("; gatewayHub = " + gatewayHub);
if (this.creationStack != null) {
sb.append("\nCreation context:\n");
OutputStream os = new OutputStream() {
@Override
public void write(int i) {
sb.append((char) i);
}
};
PrintStream ps = new PrintStream(os);
this.creationStack.printStackTrace(ps);
}
sb.append("]");
return sb.toString();
}
// ////////////////////// Constructors /////////////////////////
/** Map of Futures used to track Regions that are being reinitialized */
private final ConcurrentMap reinitializingRegions = new ConcurrentHashMap();
/** Returns the last created instance of GemFireCache */
public static GemFireCacheImpl getInstance() {
return instance;
}
/**
* Returns an existing instance. If a cache does not exist
* throws a cache closed exception.
*
* @return the existing cache
* @throws CacheClosedException
* if an existing cache can not be found.
*/
public static final GemFireCacheImpl getExisting() {
final GemFireCacheImpl result = instance;
if (result != null && !result.isClosing) {
return result;
}
if (result != null) {
throw result.getCacheClosedException(LocalizedStrings
.CacheFactory_THE_CACHE_HAS_BEEN_CLOSED.toLocalizedString(), null);
}
throw new CacheClosedException(LocalizedStrings
.CacheFactory_A_CACHE_HAS_NOT_YET_BEEN_CREATED.toLocalizedString());
}
/**
* Returns an existing instance. If a cache does not exist throws an exception.
*
* @param reason
* the reason an existing cache is being requested.
* @return the existing cache
* @throws CacheClosedException
* if an existing cache can not be found.
*/
public static GemFireCacheImpl getExisting(String reason) {
GemFireCacheImpl result = getInstance();
if (result == null) {
throw new CacheClosedException(reason);
}
return result;
}
/**
* Pdx is allowed to obtain the cache even while it is being closed
*/
public static GemFireCacheImpl getForPdx(String reason) {
GemFireCacheImpl result = pdxInstance;
if (result == null) {
throw new CacheClosedException(reason);
}
return result;
}
// /**
// * @deprecated remove when Lise allows a Hydra VM to
// * be re-created
// */
// public static void clearInstance() {
// System.err.println("DEBUG: do not commit GemFireCache#clearInstance");
// instance = null;
// }
public static GemFireCacheImpl create(boolean isClient, PoolFactory pf, DistributedSystem system, CacheConfig cacheConfig) {
return new GemFireCacheImpl(true, pf, system, cacheConfig).init();
}
public static GemFireCacheImpl create(DistributedSystem system, CacheConfig cacheConfig) {
return new GemFireCacheImpl(false, null, system, cacheConfig).init();
}
public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig)
throws CacheExistsException, TimeoutException, CacheWriterException,
GatewayException,
RegionExistsException
{
GemFireCacheImpl instance = getInstance();
if (instance != null && !instance.isClosed()) {
if (existingOk) {
// Check if cache configuration matches.
cacheConfig.validateCacheConfig(instance);
return instance;
} else {
// instance.creationStack argument is for debugging...
throw new CacheExistsException(instance, LocalizedStrings.CacheFactory_0_AN_OPEN_CACHE_ALREADY_EXISTS.toLocalizedString(instance), instance.creationStack);
}
}
return create(system, cacheConfig);
}
/**
* Creates a new instance of GemFireCache and populates it according to the <code>cache.xml</code>, if appropriate.
*/
private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system, CacheConfig cacheConfig) {
this.isClient = isClient;
this.clientpf = pf;
this.cacheConfig = cacheConfig; // do early for bug 43213
// Synchronized to prevent a new cache from being created
// before an old one has finished closing
synchronized (GemFireCacheImpl.class) {
// start JTA transaction manager within this synchronized block
// to prevent race with cache close. fixes bug 43987
JNDIInvoker.mapTransactions();
this.system = (InternalDistributedSystem) system;
this.dm = this.system.getDistributionManager();
if (!this.isClient && PoolManager.getAll().isEmpty()) {
// We only support management on members of a distributed system
// Should do this: if (!getSystem().isLoner()) {
// but it causes quickstart.CqClientTest to hang
this.listener = new ManagementListener();
this.system.addResourceListener(listener);
} else {
this.listener = null;
}
// Don't let admin-only VMs create Cache's just yet.
DM dm = this.system.getDistributionManager();
if (dm instanceof DistributionManager) {
if (((DistributionManager) dm).getDMType() == DistributionManager.ADMIN_ONLY_DM_TYPE) {
throw new IllegalStateException(LocalizedStrings.GemFireCache_CANNOT_CREATE_A_CACHE_IN_AN_ADMINONLY_VM
.toLocalizedString());
}
}
this.rootRegions = new HashMap();
this.cqService = CqServiceProvider.create(this);
this.luceneService = LuceneServiceProvider.create(this);
initReliableMessageQueueFactory();
// Create the CacheStatistics
this.cachePerfStats = new CachePerfStats(system);
CachePerfStats.enableClockStats = this.system.getConfig().getEnableTimeStatistics();
this.txMgr = new TXManagerImpl(this.cachePerfStats, this);
dm.addMembershipListener(this.txMgr);
this.creationDate = new Date();
this.persistentMemberManager = new PersistentMemberManager();
if (ASYNC_EVENT_LISTENERS) {
final ThreadGroup group = LoggingThreadGroup.createThreadGroup("Message Event Threads",logger);
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(final Runnable command) {
final Runnable r = new Runnable() {
public void run() {
ConnectionTable.threadWantsSharedResources();
command.run();
}
};
Thread thread = new Thread(group, r, "Message Event Thread");
thread.setDaemon(true);
return thread;
}
};
// @todo darrel: add stats
// this.cachePerfStats.getEventQueueHelper());
ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
this.eventThreadPool = new PooledExecutorWithDMStats(q, 16, this.cachePerfStats.getEventPoolHelper(), tf, 1000,
new CallerRunsPolicy());
} else {
this.eventThreadPool = null;
}
// Initialize the advisor here, but wait to exchange profiles until cache is fully built
this.resourceAdvisor = ResourceAdvisor.createResourceAdvisor(this);
// Initialize the advisor here, but wait to exchange profiles until cache is fully built
this.jmxAdvisor = JmxManagerAdvisor.createJmxManagerAdvisor(new JmxManagerAdvisee(this));
resourceManager = InternalResourceManager.createResourceManager(this);
this.serialNumber = DistributionAdvisor.createSerialNumber();
getResourceManager().addResourceListener(ResourceType.HEAP_MEMORY, getHeapEvictor());
/*
* Only bother creating an off-heap evictor if we have off-heap memory enabled.
*/
if(null != getOffHeapStore()) {
getResourceManager().addResourceListener(ResourceType.OFFHEAP_MEMORY, getOffHeapEvictor());
}
recordedEventSweeper = EventTracker.startTrackerServices(this);
tombstoneService = TombstoneService.initialize(this);
TypeRegistry.init();
basicSetPdxSerializer(this.cacheConfig.getPdxSerializer());
TypeRegistry.open();
if (!isClient()) {
// Initialize the QRM thread freqeuncy to default (1 second )to prevent spill
// over from previous Cache , as the interval is stored in a static
// volatile field.
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
FunctionService.registerFunction(new PRContainsValueFunction());
FunctionService.registerFunction(new HDFSLastCompactionTimeFunction());
FunctionService.registerFunction(new HDFSForceCompactionFunction());
FunctionService.registerFunction(new HDFSFlushQueueFunction());
this.expirationScheduler = new ExpirationScheduler(this.system);
// uncomment following line when debugging CacheExistsException
if (DEBUG_CREATION_STACK) {
this.creationStack = new Exception(LocalizedStrings.GemFireCache_CREATED_GEMFIRECACHE_0.toLocalizedString(toString()));
}
// set custom entry factories for SQLFabric
this.sqlfSystem = Boolean.getBoolean(SQLFABRIC_PRODUCT_PROP);
if (this.sqlfSystem) {
String provider = SQLF_ENTRY_FACTORY_PROVIDER;
try {
Class<?> factoryProvider = Class.forName(provider);
Method method = factoryProvider.getDeclaredMethod("getTXEntryStateFactory", new Class[0]);
TXEntryStateFactory ref = (TXEntryStateFactory) method.invoke(null, new Object[0]);
this.txEntryStateFactory = ref;
} catch (Exception e) {
throw new CacheRuntimeException("Exception in obtaining SQLFabric " + "RegionEntry Factory provider class", e) {
private static final long serialVersionUID = -6456778743822843838L;
};
}
} else {
this.txEntryStateFactory = TXEntryState.getFactory();
}
if (xmlParameterizationEnabled) {
/** If gemfire prperties file is available replace properties from there */
Properties userProps = this.system.getConfig().getUserDefinedProps();
if (userProps != null && !userProps.isEmpty()) {
resolver = new CacheXmlPropertyResolver(false, PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, userProps);
} else {
resolver = new CacheXmlPropertyResolver(false, PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, null);
}
}
SystemFailure.signalCacheCreate();
diskMonitor = new DiskStoreMonitor();
} // synchronized
}
public boolean isRESTServiceRunning() {
return isRESTServiceRunning;
}
public void setRESTServiceRunning(boolean isRESTServiceRunning) {
this.isRESTServiceRunning = isRESTServiceRunning;
}
/**
* Used by Hydra tests to get handle of Rest Agent
* @return RestAgent
*/
public RestAgent getRestAgent() {
return restAgent;
}
/*****
* Request the shared configuration from the locator(s) which have the Cluster config service running
* Applies the shared configuration to this cache, only if its a GEMFIRE && NON-LOCATOR && NON-CLIENT cache
*/
public void requestAndApplySharedConfiguration() {
//Request the shared configuration from the locator(s)
final DistributionConfig config = this.system.getConfig();
if (dm instanceof DistributionManager) {
if (!this.sqlfSystem
&& ((DistributionManager) dm).getDMType() != DistributionManager.LOCATOR_DM_TYPE
&& !isClient
&& Locator.getLocator() == null
) {
boolean useSharedConfiguration = config.getUseSharedConfiguration();
if (useSharedConfiguration) {
Map<InternalDistributedMember, Collection<String>> scl = this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
//If there are no locators with Shared configuration, that means the system has been started without shared configuration
//then do not make requests to the locators
if (!scl.isEmpty()) {
String groupsString = config.getGroups();
ConfigurationResponse response = null;
List<String> locatorConnectionStrings = getSharedConfigLocatorConnectionStringList();
try {
response = ClusterConfigurationLoader.requestConfigurationFromLocators(ClusterConfigurationLoader.getGroups(groupsString), locatorConnectionStrings);
//log the configuration received from the locator
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_RECEIVED_SHARED_CONFIGURATION_FROM_LOCATORS));
logger.info(response.describeConfig());
//deploy the Jars
ClusterConfigurationLoader.deployJarsReceivedFromClusterConfiguration(this, response);
//Apply the xml configuration
ClusterConfigurationLoader.applyClusterConfiguration(this, response, ClusterConfigurationLoader.getGroups(groupsString));
} catch (ClusterConfigurationNotAvailableException e) {
throw new CacheRuntimeException(LocalizedStrings.GemFireCache_SHARED_CONFIGURATION_NOT_AVAILABLE.toLocalizedString(), e) {
private static final long serialVersionUID = 1L;
};
} catch (IOException e) {
throw new CacheRuntimeException(LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION.toLocalizedString(), e) {
private static final long serialVersionUID = 1L;
};
} catch (ClassNotFoundException e) {
throw new CacheRuntimeException(LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION.toLocalizedString(), e) {
private static final long serialVersionUID = 1L;
};
}
} else {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_NO_LOCATORS_FOUND_WITH_SHARED_CONFIGURATION));
}
} else {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_NOT_USING_SHARED_CONFIGURATION));
}
}
}
}
public List<String> getSharedConfigLocatorConnectionStringList() {
List<String> locatorConnectionStringList = new ArrayList<String>();
Map<InternalDistributedMember, Collection<String>> scl = this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
//If there are no locators with Shared configuration, that means the system has been started without shared configuration
//then do not make requests to the locators
if (!scl.isEmpty()) {
Set<Entry<InternalDistributedMember, Collection<String>>> locs = scl.entrySet();
for (Entry<InternalDistributedMember, Collection<String>> loc : locs) {
Collection<String> locStrings = loc.getValue();
Iterator<String> locStringIter = locStrings.iterator();
while (locStringIter.hasNext()) {
locatorConnectionStringList.add(locStringIter.next());
}
}
}
return locatorConnectionStringList;
}
/**
* Used by unit tests to force cache creation to use a test generated cache.xml
*/
public static File testCacheXml = null;
/**
* @return true if cache is created using a ClientCacheFactory
* @see #hasPool()
*/
public boolean isClient() {
return this.isClient;
}
/**
* Method to check for GemFire client. In addition to checking for ClientCacheFactory, this method checks for any
* defined pools.
*
* @return true if the cache has pools declared
*/
public boolean hasPool() {
return this.isClient || !getAllPools().isEmpty();
}
private Collection<Pool> getAllPools() {
Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values();
for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) {
PoolImpl pool = (PoolImpl) itr.next();
if (pool.isUsedByGateway()) {
itr.remove();
}
}
return pools;
}
/**
* May return null (even on a client).
*/
public Pool getDefaultPool() {
return this.defaultPool;
}
private void setDefaultPool(Pool v) {
this.defaultPool = v;
}
/**
* Perform initialization, solve the early escaped reference problem by putting publishing references to this instance
* in this method (vs. the constructor).
*
* @return the initialized instance of the cache
*/
private GemFireCacheImpl init() {
if (GemFireCacheImpl.instance != null) {
Assert.assertTrue(GemFireCacheImpl.instance == null, "Cache instance already in place: " + instance);
}
GemFireCacheImpl.instance = this;
GemFireCacheImpl.pdxInstance = this;
MinimumSystemRequirements.checkAndLog();
for (Iterator<CacheLifecycleListener> iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
listener.cacheCreated(this);
}
ClassPathLoader.setLatestToDefault();
SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
this.resourceAdvisor.initializationGate();
//Register function that we need to execute to fetch available REST service endpoints in DS
FunctionService.registerFunction(new FindRestEnabledServersFunction());
// moved this after initializeDeclarativeCache because in the future
// distributed system creation will not happen until we have read
// cache.xml file.
// For now this needs to happen before cache.xml otherwise
// we will not be ready for all the events that cache.xml
// processing can deliver (region creation, etc.).
// This call may need to be moved inside initializeDeclarativeCache.
/** Entry to GemFire Management service **/
this.jmxAdvisor.initializationGate();
system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
boolean completedCacheXml = false;
try {
//Deploy all the jars from the deploy working dir.
new JarDeployer(this.system.getConfig().getDeployWorkingDir()).loadPreviouslyDeployedJars();
requestAndApplySharedConfiguration();
initializeDeclarativeCache();
completedCacheXml = true;
} finally {
if (!completedCacheXml) {
// so initializeDeclarativeCache threw an exception
try {
close(); // fix for bug 34041
} catch (Throwable ignore) {
// I don't want init to throw an exception that came from the close.
// I want it to throw the original exception that came from initializeDeclarativeCache.
}
}
}
this.clientpf = null;
startColocatedJmxManagerLocator();
startMemcachedServer();
startRedisServer();
startRestAgentServer(this);
int time = Integer.getInteger("gemfire.CLIENT_FUNCTION_TIMEOUT",
DEFAULT_CLIENT_FUNCTION_TIMEOUT);
clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
return this;
}
private boolean isNotJmxManager(){
return (this.system.getConfig().getJmxManagerStart() != true);
}
private boolean isServerNode(){
return (this.system.getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
&& this.system.getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE
&& !isClient());
}
private void startRestAgentServer(GemFireCacheImpl cache) {
if (this.system.getConfig().getStartDevRestApi()
&& isNotJmxManager()
&& isServerNode()) {
this.restAgent = new RestAgent(this.system.getConfig());
restAgent.start(cache);
} else {
this.restAgent = null;
}
}
private void startMemcachedServer() {
int port = system.getConfig().getMemcachedPort();
if (port != 0) {
String protocol = system.getConfig().getMemcachedProtocol();
assert protocol != null;
String bindAddress = system.getConfig().getMemcachedBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_MEMCACHED_BIND_ADDRESS)) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_PORT_0_FOR_1_PROTOCOL,
new Object[] { port, protocol }));
} else {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_BIND_ADDRESS_0_PORT_1_FOR_2_PROTOCOL,
new Object[] { bindAddress, port, protocol }));
}
this.memcachedServer = new GemFireMemcachedServer(bindAddress, port, Protocol.valueOf(protocol.toUpperCase()));
this.memcachedServer.start();
}
}
private void startRedisServer() {
int port = system.getConfig().getRedisPort();
if (port != 0) {
String bindAddress = system.getConfig().getRedisBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_REDIS_BIND_ADDRESS)) {
getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_PORT_0,
new Object[] { port });
} else {
getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_BIND_ADDRESS_0_PORT_1,
new Object[] { bindAddress, port });
}
this.redisServer = new GemFireRedisServer(bindAddress, port);
this.redisServer.start();
}
}
public URL getCacheXmlURL() {
if (this.getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
return null;
}
File xmlFile = testCacheXml;
if (xmlFile == null) {
xmlFile = this.system.getConfig().getCacheXmlFile();
}
if ("".equals(xmlFile.getName())) {
return null;
}
URL url = null;
if (!xmlFile.exists() || !xmlFile.isFile()) {
// do a resource search
String resource = xmlFile.getPath();
resource = resource.replaceAll("\\\\", "/");
if (resource.length() > 1 && resource.startsWith("/")) {
resource = resource.substring(1);
}
url = ClassPathLoader.getLatest().getResource(getClass(), resource);
} else {
try {
url = xmlFile.toURL();
} catch (IOException ex) {
throw new CacheXmlException(
LocalizedStrings.GemFireCache_COULD_NOT_CONVERT_XML_FILE_0_TO_AN_URL.toLocalizedString(xmlFile), ex);
}
}
if (url == null) {
File defaultFile = DistributionConfig.DEFAULT_CACHE_XML_FILE;
if (!xmlFile.equals(defaultFile)) {
if (!xmlFile.exists()) {
throw new CacheXmlException(LocalizedStrings.GemFireCache_DECLARATIVE_CACHE_XML_FILERESOURCE_0_DOES_NOT_EXIST
.toLocalizedString(xmlFile));
} else /* if (!xmlFile.isFile()) */{
throw new CacheXmlException(LocalizedStrings.GemFireCache_DECLARATIVE_XML_FILE_0_IS_NOT_A_FILE.toLocalizedString(xmlFile));
}
}
}
return url;
}
/**
* Initializes the contents of this <code>Cache</code> according to the declarative caching XML file specified by the
* given <code>DistributedSystem</code>. Note that this operation cannot be performed in the constructor because
* creating regions in the cache, etc. uses the cache itself (which isn't initialized until the constructor returns).
*
* @throws CacheXmlException
* If something goes wrong while parsing the declarative caching XML file.
* @throws TimeoutException
* If a {@link com.gemstone.gemfire.cache.Region#put(Object, Object)}times out while initializing the cache.
* @throws CacheWriterException
* If a <code>CacheWriterException</code> is thrown while initializing the cache.
* @throws RegionExistsException
* If the declarative caching XML file desribes a region that already exists (including the root region).
* @throws GatewayException
* If a <code>GatewayException</code> is thrown while initializing the cache.
*
* @see #loadCacheXml
*/
private void initializeDeclarativeCache() throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
URL url = getCacheXmlURL();
String cacheXmlDescription = this.cacheConfig.getCacheXMLDescription();
if (url == null && cacheXmlDescription == null) {
if (isClient()) {
determineDefaultPool();
initializeClientRegionShortcuts(this);
} else {
initializeRegionShortcuts(this);
}
initializePdxRegistry();
readyDynamicRegionFactory();
return; // nothing needs to be done
}
try {
InputStream stream = null;
if (cacheXmlDescription != null) {
if (logger.isTraceEnabled()) {
logger.trace("initializing cache with generated XML: {}", cacheXmlDescription);
}
stream = new StringBufferInputStream(cacheXmlDescription);
} else {
stream = url.openStream();
}
loadCacheXml(stream);
try {
stream.close();
} catch (IOException ignore) {
}
if (cacheXmlDescription == null) {
StringBuilder sb = new StringBuilder();
try {
final String EOLN = System.getProperty("line.separator");
BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()));
String l = br.readLine();
while (l != null) {
if (!l.isEmpty()) {
sb.append(EOLN).append(l);
}
l = br.readLine();
}
br.close();
} catch (IOException ignore) {
}
logger.info(LocalizedMessage.create(
LocalizedStrings.GemFireCache_CACHE_INITIALIZED_USING__0__1, new Object[] {url.toString(), sb.toString()}));
} else {
logger.info(LocalizedMessage.create(
LocalizedStrings.GemFireCache_CACHE_INITIALIZED_USING__0__1, new Object[] {"generated description from old cache", cacheXmlDescription}));
}
} catch (IOException ex) {
throw new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_OPENING_CACHE_XML_0_THE_FOLLOWING_ERROR_OCCURRED_1
.toLocalizedString(new Object[] { url.toString(), ex }));
} catch (CacheXmlException ex) {
CacheXmlException newEx = new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1
.toLocalizedString(new Object[] { url, ex.getMessage() }));
newEx.setStackTrace(ex.getStackTrace());
newEx.initCause(ex.getCause());
throw newEx;
}
}
public synchronized void initializePdxRegistry() {
if (this.pdxRegistry == null) {
//The member with locator is initialized with a NullTypePdxRegistration
if (this.getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
this.pdxRegistry = new TypeRegistry(this, true);
} else {
this.pdxRegistry = new TypeRegistry(this, false);
}
this.pdxRegistry.initialize();
}
}
/**
* Call to make this vm's dynamic region factory ready. Public so it can be called from CacheCreation during xml
* processing
*/
public void readyDynamicRegionFactory() {
try {
((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).internalInit(this);
} catch (CacheException ce) {
throw new GemFireCacheException(LocalizedStrings.GemFireCache_DYNAMIC_REGION_INITIALIZATION_FAILED.toLocalizedString(), ce);
}
}
/**
* create diskstore factory with default attributes
*
* @since prPersistSprint2
*/
public DiskStoreFactory createDiskStoreFactory() {
return new DiskStoreFactoryImpl(this);
}
/**
* create diskstore factory with predefined attributes
*
* @since prPersistSprint2
*/
public DiskStoreFactory createDiskStoreFactory(DiskStoreAttributes attrs) {
return new DiskStoreFactoryImpl(this, attrs);
}
protected class Stopper extends CancelCriterion {
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.CancelCriterion#cancelInProgress()
*/
@Override
public String cancelInProgress() {
String reason = GemFireCacheImpl.this.getDistributedSystem().getCancelCriterion().cancelInProgress();
if (reason != null) {
return reason;
}
if (GemFireCacheImpl.this.disconnectCause != null) {
return disconnectCause.getMessage();
}
if (GemFireCacheImpl.this.isClosing) {
return "The cache is closed."; // this + ": closed";
}
return null;
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
*/
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
RuntimeException result = getDistributedSystem().getCancelCriterion().generateCancelledException(e);
if (result != null) {
return result;
}
if (GemFireCacheImpl.this.disconnectCause == null) {
// No root cause, specify the one given and be done with it.
return new CacheClosedException(reason, e);
}
if (e == null) {
// Caller did not specify any root cause, so just use our own.
return new CacheClosedException(reason, GemFireCacheImpl.this.disconnectCause);
}
// Attempt to stick rootCause at tail end of the exception chain.
Throwable nt = e;
while (nt.getCause() != null) {
nt = nt.getCause();
}
try {
nt.initCause(GemFireCacheImpl.this.disconnectCause);
return new CacheClosedException(reason, e);
} catch (IllegalStateException e2) {
// Bug 39496 (Jrockit related) Give up. The following
// error is not entirely sane but gives the correct general picture.
return new CacheClosedException(reason, GemFireCacheImpl.this.disconnectCause);
}
}
}
private final Stopper stopper = new Stopper();
public CancelCriterion getCancelCriterion() {
return stopper;
}
/** return true if the cache was closed due to being shunned by other members */
public boolean forcedDisconnect() {
return this.forcedDisconnect || this.system.forcedDisconnect();
}
/** return a CacheClosedException with the given reason */
public CacheClosedException getCacheClosedException(String reason, Throwable cause) {
CacheClosedException result;
if (cause != null) {
result = new CacheClosedException(reason, cause);
} else if (this.disconnectCause != null) {
result = new CacheClosedException(reason, this.disconnectCause);
} else {
result = new CacheClosedException(reason);
}
return result;
}
/** if the cache was forcibly closed this exception will reflect the cause */
public Throwable getDisconnectCause() {
return this.disconnectCause;
}
/**
* Set to true during a cache close if user requested durable subscriptions to be kept.
*
* @since 5.7
*/
private boolean keepAlive;
/**
* Returns true if durable subscriptions (registrations and queries) should be preserved.
*
* @since 5.7
*/
public boolean keepDurableSubscriptionsAlive() {
return this.keepAlive;
}
/**
* break any potential circularity in {@link #loadEmergencyClasses()}
*/
private static volatile boolean emergencyClassesLoaded = false;
/**
* Ensure that all the necessary classes for closing the cache are loaded
*
* @see SystemFailure#loadEmergencyClasses()
*/
static public void loadEmergencyClasses() {
if (emergencyClassesLoaded)
return;
emergencyClassesLoaded = true;
InternalDistributedSystem.loadEmergencyClasses();
AcceptorImpl.loadEmergencyClasses();
PoolManagerImpl.loadEmergencyClasses();
}
/**
* Close the distributed system, bridge servers, and gateways. Clears the rootRegions and partitionedRegions map.
* Marks the cache as closed.
*
* @see SystemFailure#emergencyClose()
*/
static public void emergencyClose() {
final boolean DEBUG = SystemFailure.TRACE_CLOSE;
GemFireCacheImpl inst = GemFireCacheImpl.instance;
if (inst == null) {
if (DEBUG) {
System.err.println("GemFireCache#emergencyClose: no instance");
}
return;
}
GemFireCacheImpl.instance = null;
GemFireCacheImpl.pdxInstance = null;
// leave the PdxSerializer set if we have one to prevent 43412
// TypeRegistry.setPdxSerializer(null);
// Shut down messaging first
InternalDistributedSystem ids = inst.system;
if (ids != null) {
if (DEBUG) {
System.err.println("DEBUG: emergencyClose InternalDistributedSystem");
}
ids.emergencyClose();
}
inst.disconnectCause = SystemFailure.getFailure();
inst.isClosing = true;
// Clear bridge servers
if (DEBUG) {
System.err.println("DEBUG: Close bridge servers");
}
{
Iterator allBridgeServersItr = inst.allBridgeServers.iterator();
while (allBridgeServersItr.hasNext()) {
BridgeServerImpl bs = (BridgeServerImpl) allBridgeServersItr.next();
AcceptorImpl ai = bs.getAcceptor();
if (ai != null) {
ai.emergencyClose();
}
}
}
if (DEBUG) {
System.err.println("DEBUG: closing client resources");
}
PoolManagerImpl.emergencyClose();
if (DEBUG) {
System.err.println("DEBUG: closing gateway hubs");
}
// These are synchronized sets -- avoid potential deadlocks
// instance.pathToRegion.clear(); // garbage collection
// instance.gatewayHubs.clear();
// rootRegions is intentionally *not* synchronized. The
// implementation of clear() does not currently allocate objects.
inst.rootRegions.clear();
// partitionedRegions is intentionally *not* synchronized, The
// implementation of clear() does not currently allocate objects.
inst.partitionedRegions.clear();
if (DEBUG) {
System.err.println("DEBUG: done with cache emergency close");
}
}
public boolean isCacheAtShutdownAll() {
return isShutDownAll;
}
/**
* Number of threads used to close PRs in shutdownAll. By default is the number of PRs in the cache
*/
private static final int shutdownAllPoolSize = Integer.getInteger("gemfire.SHUTDOWN_ALL_POOL_SIZE", -1);
void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) {
for (final PartitionedRegion pr : prSubMap.values()) {
shutDownOnePRGracefully(pr);
}
}
public synchronized void shutDownAll() {
boolean testIGE = Boolean.getBoolean("TestInternalGemFireError");
if (testIGE) {
InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
throw assErr;
}
if (isCacheAtShutdownAll()) {
// it's already doing shutdown by another thread
return;
}
this.isShutDownAll = true;
// bug 44031 requires multithread shutdownall should be grouped
// by root region. However, shutDownAllDuringRecovery.conf test revealed that
// we have to close colocated child regions first.
// Now check all the PR, if anyone has colocate-with attribute, sort all the
// PRs by colocation relationship and close them sequentially, otherwise still
// group them by root region.
TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees();
if (prTrees.size() > 1 && shutdownAllPoolSize != 1) {
ExecutorService es = getShutdownAllExecutorService(prTrees.size());
for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) {
es.execute(new Runnable() {
public void run() {
ConnectionTable.threadWantsSharedResources();
shutdownSubTreeGracefully(prSubMap);
}
});
} // for each root
es.shutdown();
try {
es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully.");
}
} else {
for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) {
shutdownSubTreeGracefully(prSubMap);
}
}
close("Shut down all members", null, false, true);
}
private ExecutorService getShutdownAllExecutorService(int size) {
final ThreadGroup thrGrp = LoggingThreadGroup.createThreadGroup("ShutdownAllGroup", logger);
ThreadFactory thrFactory = new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread t = new Thread(thrGrp, r, "ShutdownAll-" + threadCount.getAndIncrement());
t.setDaemon(true);
return t;
}
};
ExecutorService es = Executors.newFixedThreadPool(shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize, thrFactory);
return es;
}
private void shutDownOnePRGracefully(PartitionedRegion pr) {
boolean acquiredLock = false;
try {
pr.acquireDestroyLock();
acquiredLock = true;
synchronized(pr.getRedundancyProvider()) {
if (pr.isDataStore() && pr.getDataStore() != null && pr.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
int numBuckets = pr.getTotalNumberOfBuckets();
Map<InternalDistributedMember, PersistentMemberID> bucketMaps[] = new Map[numBuckets];
PartitionedRegionDataStore prds = pr.getDataStore();
// lock all the primary buckets
Set<Entry<Integer, BucketRegion>> bucketEntries = prds.getAllLocalBuckets();
for (Map.Entry e : bucketEntries) {
BucketRegion br = (BucketRegion) e.getValue();
if (br == null || br.isDestroyed) {
// bucket region could be destroyed in race condition
continue;
}
br.getBucketAdvisor().tryLockIfPrimary();
// get map <InternalDistriutedMemeber, persistentID> for this bucket's
// remote members
bucketMaps[br.getId()] = br.getBucketAdvisor().adviseInitializedPersistentMembers();
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: initialized persistent members for {}:{}", pr.getName(), br.getId(), bucketMaps[br.getId()]);
}
}
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: All buckets for PR {} are locked.", pr.getName());
}
// send lock profile update to other members
pr.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
new UpdateAttributesProcessor(pr).distribute(false);
pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: all bucketlock profiles received.", pr.getName());
}
// if async write, do flush
if (!pr.getAttributes().isDiskSynchronous()) {
// several PRs might share the same diskstore, we will only flush once
// even flush is called several times.
pr.getDiskStore().forceFlush();
// send flush profile update to other members
pr.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED);
new UpdateAttributesProcessor(pr).distribute(false);
pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED);
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: all flush profiles received.", pr.getName());
}
} // async write
// persist other members to OFFLINE_EQUAL for each bucket region
// iterate through all the bucketMaps and exclude the items whose
// idm is no longer online
Set<InternalDistributedMember> membersToPersistOfflineEqual = pr.getRegionAdvisor().adviseDataStore();
for (Map.Entry e : bucketEntries) {
BucketRegion br = (BucketRegion) e.getValue();
if (br == null || br.isDestroyed) {
// bucket region could be destroyed in race condition
continue;
}
Map<InternalDistributedMember, PersistentMemberID> persistMap = getSubMapForLiveMembers(pr, membersToPersistOfflineEqual,
bucketMaps[br.getId()]);
if (persistMap != null) {
br.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: pesisting bucket {}:{}", pr.getName(), br.getId(), persistMap);
}
}
}
// send persited profile update to other members, let all members to persist
// before close the region
pr.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
new UpdateAttributesProcessor(pr).distribute(false);
pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", pr.getName());
}
} // datastore
// after done all steps for buckets, close pr
// close accessor directly
RegionEventImpl event = new RegionEventImpl(pr, Operation.REGION_CLOSE, null, false, getMyId(), true);
try {
// not to acquire lock
pr.basicDestroyRegion(event, false, false, true);
} catch (CacheWriterException e) {
// not possible with local operation, CacheWriter not called
throw new Error(LocalizedStrings.LocalRegion_CACHEWRITEREXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION
.toLocalizedString(), e);
} catch (TimeoutException e) {
// not possible with local operation, no distributed locks possible
throw new Error(LocalizedStrings.LocalRegion_TIMEOUTEXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION
.toLocalizedString(), e);
}
// pr.close();
} // synchronized
} catch (CacheClosedException cce) {
logger.debug("Encounter CacheClosedException when shutDownAll is closing PR: {}:{}", pr.getFullPath(), cce.getMessage());
} catch (CancelException ce) {
logger.debug("Encounter CancelException when shutDownAll is closing PR: {}:{}", pr.getFullPath(), ce.getMessage());
} catch (RegionDestroyedException rde) {
logger.debug("Encounter CacheDestroyedException when shutDownAll is closing PR: {}:{}", pr.getFullPath(), rde.getMessage());
} finally {
if (acquiredLock) {
pr.releaseDestroyLock();
}
}
}
private Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(PartitionedRegion pr,
Set<InternalDistributedMember> membersToPersistOfflineEqual, Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
if (bucketMap == null) {
return null;
}
Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap();
Iterator itor = membersToPersistOfflineEqual.iterator();
while (itor.hasNext()) {
InternalDistributedMember idm = (InternalDistributedMember) itor.next();
if (bucketMap.containsKey(idm)) {
persistMap.put(idm, bucketMap.get(idm));
}
}
return persistMap;
}
public void close() {
close(false);
}
public void close(boolean keepalive) {
close("Normal disconnect", null, keepalive, false);
}
public void close(String reason, Throwable optionalCause) {
close(reason, optionalCause, false, false);
}
/**
* Gets or lazily creates the PartitionedRegion distributed lock service. This call will synchronize on this
* GemFireCache.
*
* @return the PartitionedRegion distributed lock service
*/
protected DistributedLockService getPartitionedRegionLockService() {
synchronized (this.prLockServiceLock) {
stopper.checkCancelInProgress(null);
if (this.prLockService == null) {
try {
this.prLockService = DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME, getDistributedSystem(),
true /* distributed */, true /* destroyOnDisconnect */, true /* automateFreeResources */);
} catch (IllegalArgumentException e) {
this.prLockService = DistributedLockService.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
if (this.prLockService == null) {
throw e; // PARTITION_LOCK_SERVICE_NAME must be illegal!
}
}
}
return this.prLockService;
}
}
/**
* Gets or lazily creates the GatewaySender distributed lock service.
* @return the GatewaySender distributed lock service
*/
public DistributedLockService getGatewaySenderLockService() {
if (this.gatewayLockService == null) {
synchronized (this.gatewayLockServiceLock) {
stopper.checkCancelInProgress(null);
if (this.gatewayLockService == null) {
try {
this.gatewayLockService = DLockService.create(
AbstractGatewaySender.LOCK_SERVICE_NAME,
getDistributedSystem(),
true /*distributed*/,
true /*destroyOnDisconnect*/,
true /*automateFreeResources*/);
}
catch (IllegalArgumentException e) {
this.gatewayLockService = DistributedLockService.getServiceNamed(
AbstractGatewaySender.LOCK_SERVICE_NAME);
if (this.gatewayLockService == null) {
throw e; // AbstractGatewaySender.LOCK_SERVICE_NAME must be illegal!
}
}
}
}
}
return this.gatewayLockService;
}
/**
* Destroys the PartitionedRegion distributed lock service when closing the cache. Caller must be synchronized on this
* GemFireCache.
*/
private void destroyPartitionedRegionLockService() {
try {
DistributedLockService.destroy(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
} catch (IllegalArgumentException e) {
// DistributedSystem.disconnect may have already destroyed the DLS
}
}
/**
* Destroys the GatewaySender distributed lock service when closing
* the cache. Caller must be synchronized on this GemFireCache.
*/
private void destroyGatewaySenderLockService() {
if (DistributedLockService
.getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME) != null) {
try {
DistributedLockService.destroy(AbstractGatewaySender.LOCK_SERVICE_NAME);
}
catch (IllegalArgumentException e) {
// DistributedSystem.disconnect may have already destroyed the DLS
}
}
}
public HeapEvictor getHeapEvictor() {
synchronized (this.heapEvictorLock) {
stopper.checkCancelInProgress(null);
if (this.heapEvictor == null) {
this.heapEvictor = new HeapEvictor(this);
}
return this.heapEvictor;
}
}
public OffHeapEvictor getOffHeapEvictor() {
synchronized (this.offHeapEvictorLock) {
stopper.checkCancelInProgress(null);
if (this.offHeapEvictor == null) {
this.offHeapEvictor = new OffHeapEvictor(this);
}
return this.offHeapEvictor;
}
}
public PersistentMemberManager getPersistentMemberManager() {
return persistentMemberManager;
}
public ClientMetadataService getClientMetadataService() {
synchronized (this.clientMetaDatServiceLock) {
stopper.checkCancelInProgress(null);
if (this.clientMetadatService == null) {
this.clientMetadatService = new ClientMetadataService(this);
}
return this.clientMetadatService;
}
}
private final boolean DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE = Boolean.getBoolean("gemfire.DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE");
/**
* close the cache
*
* @param reason
* the reason the cache is being closed
* @param systemFailureCause
* whether this member was ejected from the distributed system
* @param keepalive
* whoever added this should javadoc it
*/
public void close(String reason, Throwable systemFailureCause, boolean keepalive) {
close(reason, systemFailureCause, keepalive, false);
}
public void close(String reason, Throwable systemFailureCause, boolean keepalive, boolean keepDS) {
if (isClosed()) {
return;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
synchronized (GemFireCacheImpl.class) {
// bugfix for bug 36512 "GemFireCache.close is not thread safe"
// ALL CODE FOR CLOSE SHOULD NOW BE UNDER STATIC SYNCHRONIZATION
// OF synchronized (GemFireCache.class) {
// static synchronization is necessary due to static resources
if (isClosed()) {
return;
}
/**
* First close the ManagementService as it uses a lot of infra which will be closed by cache.close()
**/
system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
if (this.listener != null) {
this.system.removeResourceListener(listener);
this.listener = null;
}
if (systemFailureCause != null) {
this.forcedDisconnect = systemFailureCause instanceof ForcedDisconnectException;
if (this.forcedDisconnect) {
this.disconnectCause = new ForcedDisconnectException(reason);
} else {
this.disconnectCause = systemFailureCause;
}
}
isClosing = true;
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_NOW_CLOSING, this));
// Before anything else...make sure that this instance is not
// available to anyone "fishing" for a cache...
if (GemFireCacheImpl.instance == this) {
GemFireCacheImpl.instance = null;
}
// we don't clear the prID map if there is a system failure. Other
// threads may be hung trying to communicate with the map locked
if (systemFailureCause == null) {
PartitionedRegion.clearPRIdMap();
}
TXStateProxy tx = null;
try {
this.keepAlive = keepalive;
PoolManagerImpl.setKeepAlive(keepalive);
if (this.txMgr != null) {
tx = this.txMgr.internalSuspend();
}
// do this before closing regions
resourceManager.close();
try {
this.resourceAdvisor.close();
} catch (CancelException e) {
// ignore
}
try {
this.jmxAdvisor.close();
} catch (CancelException e) {
// ignore
}
try {
GatewaySenderAdvisor advisor = null;
for (GatewaySender sender : this.getAllGatewaySenders()) {
((AbstractGatewaySender) sender).stop();
advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
if (advisor != null) {
if (isDebugEnabled) {
logger.debug("Stopping the GatewaySender advisor");
}
advisor.close();
}
}
} catch (CancelException ce) {
}
destroyGatewaySenderLockService();
if (ASYNC_EVENT_LISTENERS) {
if (isDebugEnabled) {
logger.debug("{}: stopping event thread pool...", this);
}
this.eventThreadPool.shutdown();
}
/*
* IMPORTANT: any operation during shut down that can time out (create a CancelException) must be inside of this
* try block. If all else fails, we *must* ensure that the cache gets closed!
*/
try {
this.stopServers();
stopMemcachedServer();
stopRedisServer();
// no need to track PR instances since we won't create any more
// bridgeServers or gatewayHubs
if (this.partitionedRegions != null) {
if (isDebugEnabled) {
logger.debug("{}: clearing partitioned regions...", this);
}
synchronized (this.partitionedRegions) {
int prSize = -this.partitionedRegions.size();
this.partitionedRegions.clear();
getCachePerfStats().incPartitionedRegions(prSize);
}
}
prepareDiskStoresForClose();
if (GemFireCacheImpl.pdxInstance == this) {
GemFireCacheImpl.pdxInstance = null;
}
List rootRegionValues = null;
synchronized (this.rootRegions) {
rootRegionValues = new ArrayList(this.rootRegions.values());
}
{
final Operation op;
if (this.forcedDisconnect) {
op = Operation.FORCED_DISCONNECT;
} else if (isReconnecting()) {
op = Operation.CACHE_RECONNECT;
} else {
op = Operation.CACHE_CLOSE;
}
LocalRegion prRoot = null;
for (Iterator itr = rootRegionValues.iterator(); itr.hasNext();) {
LocalRegion lr = (LocalRegion) itr.next();
if (isDebugEnabled) {
logger.debug("{}: processing region {}", this, lr.getFullPath());
}
if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) {
prRoot = lr;
} else {
if(lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)){
continue; //this region will be closed internally by parent region
}
if (isDebugEnabled) {
logger.debug("{}: closing region {}...", this, lr.getFullPath());
}
try {
lr.handleCacheClose(op);
} catch (Exception e) {
if (isDebugEnabled || !forcedDisconnect) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
new Object[] { this, lr.getFullPath() }), e);
}
}
}
} // for
try {
if (isDebugEnabled) {
logger.debug("{}: finishing partitioned region close...", this);
}
PartitionedRegion.afterRegionsClosedByCacheClose(this);
if (prRoot != null) {
// do the PR meta root region last
prRoot.handleCacheClose(op);
}
} catch (CancelException e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_IN_LAST_STAGE_OF_PARTITIONEDREGION_CACHE_CLOSE,
this), e);
}
destroyPartitionedRegionLockService();
}
closeDiskStores();
diskMonitor.close();
closeHDFSStores();
// Close the CqService Handle.
try {
if (isDebugEnabled) {
logger.debug("{}: closing CQ service...", this);
}
cqService.close();
} catch (Exception ex) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_FAILED_TO_GET_THE_CQSERVICE_TO_CLOSE_DURING_CACHE_CLOSE_1));
}
PoolManager.close(keepalive);
if (isDebugEnabled) {
logger.debug("{}: closing reliable message queue...", this);
}
try {
getReliableMessageQueueFactory().close(true);
} catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cancellation while closing reliable message queue", e);
}
}
if (isDebugEnabled) {
logger.debug("{}: notifying admins of close...", this);
}
try {
SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CLOSE);
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("Ignored cancellation while notifying admins");
}
}
if (isDebugEnabled) {
logger.debug("{}: stopping destroyed entries processor...", this);
}
this.tombstoneService.stop();
// NOTICE: the CloseCache message is the *last* message you can send!
DM dm = null;
try {
dm = system.getDistributionManager();
dm.removeMembershipListener(this.txMgr);
} catch (CancelException e) {
// dm = null;
}
if (dm != null) { // Send CacheClosedMessage (and NOTHING ELSE) here
if (isDebugEnabled) {
logger.debug("{}: sending CloseCache to peers...", this);
}
Set otherMembers = dm.getOtherDistributionManagerIds();
ReplyProcessor21 processor = new ReplyProcessor21(system, otherMembers);
CloseCacheMessage msg = new CloseCacheMessage();
// [bruce] if multicast is available, use it to send the message to
// avoid race conditions with cache content operations that might
// also be multicast
msg.setMulticast(system.getConfig().getMcastPort() != 0);
msg.setRecipients(otherMembers);
msg.setProcessorId(processor.getProcessorId());
dm.putOutgoing(msg);
try {
processor.waitForReplies();
} catch (InterruptedException ex) {
// Thread.currentThread().interrupt(); // TODO ??? should we reset this bit later?
// Keep going, make best effort to shut down.
} catch (ReplyException ex) {
// keep going
}
// set closed state after telling others and getting responses
// to avoid complications with others still in the process of
// sending messages
}
// NO MORE Distributed Messaging AFTER THIS POINT!!!!
{
ClientMetadataService cms = this.clientMetadatService;
if (cms != null) {
cms.close();
}
HeapEvictor he = this.heapEvictor;
if (he != null) {
he.close();
}
}
} catch (CancelException e) {
// make sure the disk stores get closed
closeDiskStores();
closeHDFSStores();
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
// okay, we're taking too long to do this stuff, so let's
// be mean to other processes and skip the rest of the messaging
// phase
// [bruce] the following code is unnecessary since someone put the
// same actions in a finally block
// if (!this.closed) {
// this.closed = true;
// this.txMgr.close();
// if (GemFireCache.instance == this) {
// GemFireCache.instance = null;
// }
// ((DynamicRegionFactoryImpl)DynamicRegionFactory.get()).close();
// }
}
// Close the CqService Handle.
try {
cqService.close();
} catch (Exception ex) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_FAILED_TO_GET_THE_CQSERVICE_TO_CLOSE_DURING_CACHE_CLOSE_2));
}
this.cachePerfStats.close();
TXLockService.destroyServices();
EventTracker.stopTrackerServices(this);
synchronized (ccpTimerMutex) {
if (this.ccpTimer != null) {
this.ccpTimer.cancel();
}
}
this.expirationScheduler.cancel();
// Stop QueryMonitor if running.
if (this.queryMonitor != null) {
this.queryMonitor.stopMonitoring();
}
stopDiskStoreTaskPool();
} finally {
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
if (this.txMgr != null) {
this.txMgr.close();
}
((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
if (this.txMgr != null) {
this.txMgr.resume(tx);
}
TXCommitMessage.getTracker().clearForCacheClose();
}
// Added to close the TransactionManager's cleanup thread
TransactionManagerImpl.refresh();
if (!keepDS) {
// keepDS is used by ShutdownAll. It will override DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE
if (!DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE) {
this.system.disconnect();
}
}
TypeRegistry.close();
// do this late to prevent 43412
TypeRegistry.setPdxSerializer(null);
for (Iterator iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
listener.cacheClosed(this);
}
stopRestAgentServer();
// Fix for #49856
SequenceLoggerImpl.signalCacheClose();
SystemFailure.signalCacheClose();
SocketIOWithTimeout.stopSelectorCleanUpThread();
} // static synchronization on GemFireCache.class
}
// see Cache.isReconnecting()
public boolean isReconnecting() {
return this.system.isReconnecting();
}
// see Cache.waitUntilReconnected(long, TimeUnit)
public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
return this.system.waitUntilReconnected(time, units);
}
// see Cache.stopReconnecting()
public void stopReconnecting() {
this.system.stopReconnecting();
}
// see Cache.getReconnectedCache()
public Cache getReconnectedCache() {
Cache c = GemFireCacheImpl.getInstance();
if (c == this) {
c = null;
}
return c;
}
private void stopMemcachedServer() {
if (this.memcachedServer != null) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_MEMCACHED_SERVER_ON_PORT_0_IS_SHUTTING_DOWN,
new Object[] { this.system.getConfig().getMemcachedPort() }));
this.memcachedServer.shutdown();
}
}
private void stopRedisServer() {
if (redisServer != null)
this.redisServer.shutdown();
}
private void stopRestAgentServer() {
if ( this.restAgent != null) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_REST_SERVER_ON_PORT_0_IS_SHUTTING_DOWN,
new Object[] { this.system.getConfig().getHttpServicePort() }));
this.restAgent.stop();
}
}
private void prepareDiskStoresForClose() {
String pdxDSName = TypeRegistry.getPdxDiskStoreName(this);
DiskStoreImpl pdxdsi = null;
for (DiskStoreImpl dsi : this.diskStores.values()) {
if (dsi.getName().equals(pdxDSName)) {
pdxdsi = dsi;
} else {
dsi.prepareForClose();
}
}
if (pdxdsi != null) {
pdxdsi.prepareForClose();
}
}
/**
* Used to guard access to compactorPool and set to true when cache is shutdown.
*/
private final AtomicBoolean diskStoreTaskSync = new AtomicBoolean(false);
/**
* Lazily initialized.
*/
private ThreadPoolExecutor diskStoreTaskPool = null;
private void createDiskStoreTaskPool() {
int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
final ThreadGroup compactThreadGroup = LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
/*final ThreadFactory compactThreadFactory = new ThreadFactory() {
public Thread newThread(Runnable command) {
Thread thread = new Thread(compactThreadGroup, command, "Idle OplogCompactor");
thread.setDaemon(true);
return thread;
}
};*/
final ThreadFactory compactThreadFactory = GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 1, TimeUnit.SECONDS,
new LinkedBlockingQueue(),
compactThreadFactory);
}
private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<String, DiskStoreImpl>();
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores = new ConcurrentHashMap<String, DiskStoreImpl>();
public void addDiskStore(DiskStoreImpl dsi) {
this.diskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
}
}
public void removeDiskStore(DiskStoreImpl dsi) {
this.diskStores.remove(dsi.getName());
this.regionOwnedDiskStores.remove(dsi.getName());
/** Added for M&M **/
if(!dsi.getOwnedByRegion())
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
}
public void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
this.regionOwnedDiskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
}
}
public void closeDiskStores() {
Iterator<DiskStoreImpl> it = this.diskStores.values().iterator();
while (it.hasNext()) {
try {
DiskStoreImpl dsi = it.next();
if (logger.isDebugEnabled()) {
logger.debug("closing {}", dsi);
}
dsi.close();
/** Added for M&M **/
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
} catch (Exception e) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.Disk_Store_Exception_During_Cache_Close), e);
}
it.remove();
}
}
/**
* Used by unit tests to allow them to change the default disk store name.
*/
public static void setDefaultDiskStoreName(String dsName) {
DEFAULT_DS_NAME = dsName;
}
/**
* Used by unit tests to undo a change to the default disk store name.
*/
public static void unsetDefaultDiskStoreName() {
DEFAULT_DS_NAME = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
}
public static String getDefaultDiskStoreName() {
return DEFAULT_DS_NAME;
}
public static String DEFAULT_DS_NAME = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
public DiskStoreImpl getOrCreateDefaultDiskStore() {
DiskStoreImpl result = (DiskStoreImpl) findDiskStore(null);
if (result == null) {
synchronized (this) {
result = (DiskStoreImpl) findDiskStore(null);
if (result == null) {
result = (DiskStoreImpl) createDiskStoreFactory().create(DEFAULT_DS_NAME);
}
}
}
return result;
}
/**
* Returns the DiskStore by name
*
* @since prPersistSprint2
*/
public DiskStore findDiskStore(String name) {
if (name == null) {
name = DEFAULT_DS_NAME;
}
return this.diskStores.get(name);
}
/**
* Returns the DiskStore list
*
* @since prPersistSprint2
*/
public Collection<DiskStoreImpl> listDiskStores() {
return Collections.unmodifiableCollection(this.diskStores.values());
}
public Collection<DiskStoreImpl> listDiskStoresIncludingDefault() {
return Collections.unmodifiableCollection(listDiskStores());
}
public Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned() {
HashSet<DiskStoreImpl> allDiskStores = new HashSet<DiskStoreImpl>();
allDiskStores.addAll(this.diskStores.values());
allDiskStores.addAll(this.regionOwnedDiskStores.values());
return allDiskStores;
}
public boolean executeDiskStoreTask(DiskStoreTask r) {
synchronized (this.diskStoreTaskSync) {
if (!this.diskStoreTaskSync.get()) {
if (this.diskStoreTaskPool == null) {
createDiskStoreTaskPool();
}
try {
this.diskStoreTaskPool.execute(r);
return true;
} catch (RejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Ignored compact schedule during shutdown", ex);
}
}
}
}
return false;
}
/* private static class DiskStoreFuture extends FutureTask {
private DiskStoreTask task;
public DiskStoreFuture(DiskStoreTask r) {
super(r, null);
this.task = r;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
if (result) {
task.taskCancelled();
}
return result;
}
}*/
private void stopDiskStoreTaskPool() {
synchronized (this.diskStoreTaskSync) {
this.diskStoreTaskSync.set(true);
// All the regions have already been closed
// so this pool shouldn't be doing anything.
if (this.diskStoreTaskPool != null) {
List<Runnable> l = this.diskStoreTaskPool.shutdownNow();
for (Runnable runnable : l) {
if (l instanceof DiskStoreTask) {
((DiskStoreTask) l).taskCancelled();
}
}
}
//this.diskStoreTaskPool = null;
}
}
public int stopGatewaySenders(boolean byShutdownAll) {
final boolean isDebugEnabled = logger.isDebugEnabled();
int cnt = 0;
closingGatewaySendersByShutdownAll = byShutdownAll;
synchronized (allGatewaySendersLock) {
GatewaySenderAdvisor advisor = null;
Iterator<GatewaySender> itr = allGatewaySenders.iterator();
while (itr.hasNext()) {
GatewaySender sender = itr.next();
if (isDebugEnabled) {
logger.debug("{}: stopping gateway sender {}", this, sender);
}
try {
sender.stop();
advisor = ((AbstractGatewaySender)sender).getSenderAdvisor();
if (advisor != null) {
if (isDebugEnabled) {
logger.debug("Stopping the GatewaySender advisor");
}
advisor.close();
}
cnt++;
}
catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cache closure while closing sender {}", sender, e);
}
}
}
} // synchronized
destroyGatewaySenderLockService();
if (isDebugEnabled) {
logger.debug("{}: finished stopping {} gateway sender(s), total is {}", this, cnt, allGatewaySenders.size());
}
return cnt;
}
public int stopGatewayReceivers(boolean byShutdownAll) {
int cnt = 0;
closingGatewayReceiversByShutdownAll = byShutdownAll;
synchronized (allGatewayReceiversLock) {
Iterator<GatewayReceiver> itr = allGatewayReceivers.iterator();
while (itr.hasNext()) {
GatewayReceiver receiver = itr.next();
if (logger.isDebugEnabled()) {
logger.debug("{}: stopping gateway receiver {}", this, receiver);
}
try {
receiver.stop();
cnt++;
}
catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("Ignored cache closure while closing receiver {}", receiver, e);
}
}
}
} // synchronized
if (logger.isDebugEnabled()) {
logger.debug("{}: finished stopping {} gateway receiver(s), total is {}", this, cnt, allGatewayReceivers.size());
}
return cnt;
}
void stopServers() {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("{}: stopping bridge servers...", this);
}
boolean stoppedBridgeServer = false;
Iterator allBridgeServersIterator = this.allBridgeServers.iterator();
while (allBridgeServersIterator.hasNext()) {
BridgeServerImpl bridge = (BridgeServerImpl) allBridgeServersIterator.next();
if (isDebugEnabled) {
logger.debug("stopping bridge {}", bridge);
}
try {
bridge.stop();
} catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cache closure while closing bridge {}", bridge, e);
}
}
allBridgeServers.remove(bridge);
stoppedBridgeServer = true;
}
if (stoppedBridgeServer) {
// now that all the bridge servers have stopped empty the static pool of commBuffers it might have used.
ServerConnection.emptyCommBufferPool();
}
// stop HA services if they had been started
if (isDebugEnabled) {
logger.debug("{}: stopping HA services...", this);
}
try {
HARegionQueue.stopHAServices();
} catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cache closure while closing HA services", e);
}
}
if (isDebugEnabled) {
logger.debug("{}: stopping client health monitor...", this);
}
try {
ClientHealthMonitor.shutdownInstance();
} catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cache closure while closing client health monitor", e);
}
}
// Reset the unique id counter for durable clients.
// If a durable client stops/starts its cache, it needs
// to maintain the same unique id.
ClientProxyMembershipID.resetUniqueIdCounter();
}
public final InternalDistributedSystem getDistributedSystem() {
return this.system;
}
/**
* Returns the member id of my distributed system
*
* @since 5.0
*/
public InternalDistributedMember getMyId() {
return this.system.getDistributedMember();
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.cache.Cache#getMembers()
*/
public Set<DistributedMember> getMembers() {
return Collections.unmodifiableSet(this.dm.getOtherNormalDistributionManagerIds());
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.cache.Cache#getAdminMembers()
*/
public Set<DistributedMember> getAdminMembers() {
return this.dm.getAdminMemberSet();
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.cache.Cache#getMembers(com.gemstone.gemfire.cache.Region)
*/
public Set<DistributedMember> getMembers(Region r) {
if (r instanceof DistributedRegion) {
DistributedRegion d = (DistributedRegion) r;
return d.getDistributionAdvisor().adviseCacheOp();
} else if (r instanceof PartitionedRegion) {
PartitionedRegion p = (PartitionedRegion) r;
return p.getRegionAdvisor().adviseAllPRNodes();
} else {
return Collections.EMPTY_SET;
}
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.cache.client.ClientCache#getCurrentServers()
*/
public Set<InetSocketAddress> getCurrentServers() {
Map<String, Pool> pools = PoolManager.getAll();
Set result = null;
for (Pool p : pools.values()) {
PoolImpl pi = (PoolImpl) p;
for (Object o : pi.getCurrentServers()) {
ServerLocation sl = (ServerLocation) o;
if (result == null) {
result = new HashSet<DistributedMember>();
}
result.add(new InetSocketAddress(sl.getHostName(), sl.getPort()));
}
}
if (result == null) {
return Collections.EMPTY_SET;
} else {
return result;
}
}
public LogWriter getLogger() {
return this.system.getLogWriter();
}
public LogWriter getSecurityLogger() {
return this.system.getSecurityLogWriter();
}
public LogWriterI18n getLoggerI18n() {
return this.system.getInternalLogWriter();
}
public LogWriterI18n getSecurityLoggerI18n() {
return this.system.getSecurityInternalLogWriter();
}
public InternalLogWriter getInternalLogWriter() {
return this.system.getInternalLogWriter();
}
public InternalLogWriter getSecurityInternalLogWriter() {
return this.system.getSecurityInternalLogWriter();
}
/**
* get the threadid/sequenceid sweeper task for this cache
*
* @return the sweeper task
*/
protected EventTracker.ExpiryTask getEventTrackerTask() {
return this.recordedEventSweeper;
}
public CachePerfStats getCachePerfStats() {
return this.cachePerfStats;
}
public String getName() {
return this.system.getName();
}
/**
* Get the list of all instances of properties for Declarables with the given class name.
*
* @param className Class name of the declarable
* @return List of all instances of properties found for the given declarable
*/
public List<Properties> getDeclarableProperties(final String className) {
List<Properties> propertiesList = new ArrayList<Properties>();
synchronized (this.declarablePropertiesMap) {
for (Map.Entry<Declarable, Properties> entry : this.declarablePropertiesMap.entrySet()) {
if (entry.getKey().getClass().getName().equals(className)) {
propertiesList.add(entry.getValue());
}
}
}
return propertiesList;
}
/**
* Get the properties for the given declarable.
*
* @param declarable The declarable
* @return Properties found for the given declarable
*/
public Properties getDeclarableProperties(final Declarable declarable) {
return this.declarablePropertiesMap.get(declarable);
}
/**
* Returns the date and time that this cache was created.
*
* @since 3.5
*/
public Date getCreationDate() {
return this.creationDate;
}
/**
* Returns the number of seconds that have elapsed since the Cache was created.
*
* @since 3.5
*/
public int getUpTime() {
return (int) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
}
/**
* All entry and region operations should be using this time rather than
* System.currentTimeMillis(). Specially all version stamps/tags must be
* populated with this timestamp.
*
* @return distributed cache time.
*/
public long cacheTimeMillis() {
if (this.system != null) {
return this.system.getClock().cacheTimeMillis();
} else {
return System.currentTimeMillis();
}
}
public Region createVMRegion(String name, RegionAttributes attrs) throws RegionExistsException, TimeoutException {
return createRegion(name, attrs);
}
private PoolFactory createDefaultPF() {
PoolFactory defpf = PoolManager.createFactory();
try {
String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost());
defpf.addServer(localHostName, CacheServer.DEFAULT_PORT);
} catch (UnknownHostException ex) {
throw new IllegalStateException("Could not determine local host name");
}
return defpf;
}
/**
* Used to set the default pool on a new GemFireCache.
*/
public void determineDefaultPool() {
if (!isClient()) {
throw new UnsupportedOperationException();
}
Pool pool = null;
// create the pool if it does not already exist
if (this.clientpf == null) {
Map<String, Pool> pools = PoolManager.getAll();
if (pools.isEmpty()) {
this.clientpf = createDefaultPF();
} else if (pools.size() == 1) {
// otherwise use a singleton.
pool = pools.values().iterator().next();
} else {
if (pool == null) {
// act as if the default pool was configured
// and see if we can find an existing one that is compatible
PoolFactoryImpl pfi = (PoolFactoryImpl) createDefaultPF();
for (Pool p : pools.values()) {
if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) {
pool = p;
break;
}
}
if (pool == null) {
// if pool is still null then we will not have a default pool for this ClientCache
setDefaultPool(null);
return;
}
}
}
} else {
PoolFactoryImpl pfi = (PoolFactoryImpl) this.clientpf;
if (pfi.getPoolAttributes().locators.isEmpty() && pfi.getPoolAttributes().servers.isEmpty()) {
try {
String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost());
pfi.addServer(localHostName, CacheServer.DEFAULT_PORT);
} catch (UnknownHostException ex) {
throw new IllegalStateException("Could not determine local host name");
}
}
// look for a pool that already exists that is compatible with
// our PoolFactory.
// If we don't find one we will create a new one that meets our needs.
Map<String, Pool> pools = PoolManager.getAll();
for (Pool p : pools.values()) {
if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) {
pool = p;
break;
}
}
}
if (pool == null) {
// create our pool with a unique name
String poolName = "DEFAULT";
int count = 1;
Map<String, Pool> pools = PoolManager.getAll();
while (pools.containsKey(poolName)) {
poolName = "DEFAULT" + count;
count++;
}
pool = this.clientpf.create(poolName);
}
setDefaultPool(pool);
}
/**
* Used to see if a existing cache's pool is compatible with us.
*
* @return the default pool that is right for us
*/
public Pool determineDefaultPool(PoolFactory pf) {
Pool pool = null;
// create the pool if it does not already exist
if (pf == null) {
Map<String, Pool> pools = PoolManager.getAll();
if (pools.isEmpty()) {
throw new IllegalStateException("Since a cache already existed a pool should also exist.");
} else if (pools.size() == 1) {
// otherwise use a singleton.
pool = pools.values().iterator().next();
if (getDefaultPool() != pool) {
throw new IllegalStateException("Existing cache's default pool was not the same as the only existing pool");
}
} else {
// just use the current default pool if one exists
pool = getDefaultPool();
if (pool == null) {
// act as if the default pool was configured
// and see if we can find an existing one that is compatible
PoolFactoryImpl pfi = (PoolFactoryImpl) createDefaultPF();
for (Pool p : pools.values()) {
if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) {
pool = p;
break;
}
}
if (pool == null) {
// if pool is still null then we will not have a default pool for this ClientCache
return null;
}
}
}
} else {
PoolFactoryImpl pfi = (PoolFactoryImpl) pf;
if (pfi.getPoolAttributes().locators.isEmpty() && pfi.getPoolAttributes().servers.isEmpty()) {
try {
String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost());
pfi.addServer(localHostName, CacheServer.DEFAULT_PORT);
} catch (UnknownHostException ex) {
throw new IllegalStateException("Could not determine local host name");
}
}
PoolImpl defPool = (PoolImpl) getDefaultPool();
if (defPool != null && defPool.isCompatible(pfi.getPoolAttributes())) {
pool = defPool;
} else {
throw new IllegalStateException("Existing cache's default pool was not compatible");
}
}
return pool;
}
public Region createRegion(String name, RegionAttributes attrs) throws RegionExistsException, TimeoutException {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
return basicCreateRegion(name, attrs);
}
public Region basicCreateRegion(String name, RegionAttributes attrs) throws RegionExistsException, TimeoutException {
try {
InternalRegionArguments ira = new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null);
if (attrs instanceof UserSpecifiedRegionAttributes) {
ira.setIndexes(((UserSpecifiedRegionAttributes) attrs).getIndexes());
}
return createVMRegion(name, attrs, ira);
} catch (IOException e) {
// only if loading snapshot, not here
InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
assErr.initCause(e);
throw assErr;
} catch (ClassNotFoundException e) {
// only if loading snapshot, not here
InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
assErr.initCause(e);
throw assErr;
}
}
public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs, InternalRegionArguments internalRegionArgs)
throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException {
if (getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
if (!internalRegionArgs.isUsedForMetaRegion() && internalRegionArgs.getInternalMetaRegion() == null) {
throw new IllegalStateException("Regions can not be created in a locator.");
}
}
stopper.checkCancelInProgress(null);
LocalRegion.validateRegionName(name);
RegionAttributes<K, V> attrs = p_attrs;
if (attrs == null) {
throw new IllegalArgumentException(LocalizedStrings.GemFireCache_ATTRIBUTES_MUST_NOT_BE_NULL.toLocalizedString());
}
LocalRegion rgn = null;
// final boolean getDestroyLock = attrs.getDestroyLockFlag();
final InputStream snapshotInputStream = internalRegionArgs.getSnapshotInputStream();
InternalDistributedMember imageTarget = internalRegionArgs.getImageTarget();
final boolean recreate = internalRegionArgs.getRecreateFlag();
final boolean isPartitionedRegion = (attrs.getPartitionAttributes() == null) ? false : true;
final boolean isReinitCreate = snapshotInputStream != null || imageTarget != null || recreate;
final String regionPath = LocalRegion.calcFullPath(name, null);
try {
for (;;) {
getCancelCriterion().checkCancelInProgress(null);
Future future = null;
synchronized (this.rootRegions) {
rgn = (LocalRegion) this.rootRegions.get(name);
if (rgn != null) {
throw new RegionExistsException(rgn);
}
// check for case where a root region is being reinitialized and we
// didn't
// find a region, i.e. the new region is about to be created
if (!isReinitCreate) { // fix bug 33523
String fullPath = Region.SEPARATOR + name;
future = (Future) this.reinitializingRegions.get(fullPath);
}
if (future == null) {
HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this);
attrs = setEvictionAttributesForLargeRegion(attrs);
if (internalRegionArgs.getInternalMetaRegion() != null) {
rgn = internalRegionArgs.getInternalMetaRegion();
} else if (isPartitionedRegion) {
rgn = new PartitionedRegion(name, attrs, null, this, internalRegionArgs);
} else {
/*for (String senderId : attrs.getGatewaySenderIds()) {
if (getGatewaySender(senderId) != null
&& getGatewaySender(senderId).isParallel()) {
throw new IllegalStateException(
LocalizedStrings.AttributesFactory_PARALLELGATEWAYSENDER_0_IS_INCOMPATIBLE_WITH_DISTRIBUTED_REPLICATION
.toLocalizedString(senderId));
}
}*/
if (attrs.getScope().isLocal()) {
rgn = new LocalRegion(name, attrs, null, this, internalRegionArgs);
} else {
rgn = new DistributedRegion(name, attrs, null, this, internalRegionArgs);
}
}
this.rootRegions.put(name, rgn);
if (isReinitCreate) {
regionReinitialized(rgn);
}
break;
}
} // synchronized
boolean interrupted = Thread.interrupted();
try { // future != null
LocalRegion region = (LocalRegion) future.get(); // wait on Future
throw new RegionExistsException(region);
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
throw new Error(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString(), e);
} catch (CancellationException e) {
// future was cancelled
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
} // for
boolean success = false;
try {
setRegionByPath(rgn.getFullPath(), rgn);
rgn.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
success = true;
} catch (CancelException e) {
// don't print a call stack
throw e;
} catch (RedundancyAlreadyMetException e) {
// don't log this
throw e;
} catch (final RuntimeException validationException) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.GemFireCache_INITIALIZATION_FAILED_FOR_REGION_0, rgn.getFullPath()), validationException);
throw validationException;
} finally {
if (!success) {
try {
// do this before removing the region from
// the root set to fix bug 41982.
rgn.cleanupFailedInitialization();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
SystemFailure.checkFailure();
stopper.checkCancelInProgress(t);
// bug #44672 - log the failure but don't override the original exception
logger.warn(LocalizedMessage.create(
LocalizedStrings.GemFireCache_INIT_CLEANUP_FAILED_FOR_REGION_0, rgn.getFullPath()), t);
} finally {
// clean up if initialize fails for any reason
setRegionByPath(rgn.getFullPath(), null);
synchronized (this.rootRegions) {
Region r = (Region) this.rootRegions.get(name);
if (r == rgn) {
this.rootRegions.remove(name);
}
} // synchronized
}
} // success
}
rgn.postCreateRegion();
} catch (RegionExistsException ex) {
// outside of sync make sure region is initialized to fix bug 37563
LocalRegion r = (LocalRegion) ex.getRegion();
r.waitOnInitialization(); // don't give out ref until initialized
throw ex;
}
/**
* Added for M&M . Putting the callback here to avoid creating RegionMBean in case of Exception
**/
if (!rgn.isInternalRegion()) {
system.handleResourceEvent(ResourceEvent.REGION_CREATE, rgn);
}
return rgn;
}
/**
* turn on eviction by default for HDFS regions
*/
@SuppressWarnings("deprecation")
public <K, V> RegionAttributes<K, V> setEvictionAttributesForLargeRegion(
RegionAttributes<K, V> attrs) {
RegionAttributes<K, V> ra = attrs;
if (DISABLE_AUTO_EVICTION) {
return ra;
}
if (attrs.getDataPolicy().withHDFS()
|| attrs.getHDFSStoreName() != null) {
// make the region overflow by default
EvictionAttributes evictionAttributes = attrs.getEvictionAttributes();
boolean hasNoEvictionAttrs = evictionAttributes == null
|| evictionAttributes.getAlgorithm().isNone();
AttributesFactory<K, V> af = new AttributesFactory<K, V>(attrs);
String diskStoreName = attrs.getDiskStoreName();
// set the local persistent directory to be the same as that for
// HDFS store
if (attrs.getHDFSStoreName() != null) {
HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName());
if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) {
// HDFS store expected to be found at this point
throw new IllegalStateException(
LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
.toLocalizedString(attrs.getHDFSStoreName()));
}
// if there is no disk store, use the one configured for hdfs queue
if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
diskStoreName = hdfsStore.getHDFSEventQueueAttributes().getDiskStoreName();
}
}
// set LRU heap eviction with overflow to disk for HDFS stores with
// local Oplog persistence
// set eviction attributes only if not set
if (hasNoEvictionAttrs) {
if (diskStoreName != null) {
af.setDiskStoreName(diskStoreName);
}
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(
ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK));
}
ra = af.create();
}
return ra;
}
public final Region getRegion(String path) {
return getRegion(path, false);
}
/**
* returns a set of all current regions in the cache, including buckets
*
* @since 6.0
*/
public Set<LocalRegion> getAllRegions() {
Set<LocalRegion> result = new HashSet();
synchronized (this.rootRegions) {
for (Object r : this.rootRegions.values()) {
if (r instanceof PartitionedRegion) {
PartitionedRegion p = (PartitionedRegion) r;
PartitionedRegionDataStore prds = p.getDataStore();
if (prds != null) {
Set<Entry<Integer, BucketRegion>> bucketEntries = p.getDataStore().getAllLocalBuckets();
for (Map.Entry e : bucketEntries) {
result.add((LocalRegion) e.getValue());
}
}
} else if (r instanceof LocalRegion) {
LocalRegion l = (LocalRegion) r;
result.add(l);
result.addAll(l.basicSubregions(true));
}
}
}
return result;
}
public Set<LocalRegion> getApplicationRegions() {
Set<LocalRegion> result = new HashSet<LocalRegion>();
synchronized (this.rootRegions) {
for (Object r : this.rootRegions.values()) {
LocalRegion rgn = (LocalRegion) r;
if (rgn.isSecret() || rgn.isUsedForMetaRegion() || rgn instanceof HARegion || rgn.isUsedForPartitionedRegionAdmin()
|| rgn.isInternalRegion()/* rgn.isUsedForPartitionedRegionBucket() */) {
continue; // Skip administrative PartitionedRegions
}
result.add(rgn);
result.addAll(rgn.basicSubregions(true));
}
}
return result;
}
void setRegionByPath(String path, LocalRegion r) {
if (r == null) {
this.pathToRegion.remove(path);
} else {
this.pathToRegion.put(path, r);
}
}
/**
* @throws IllegalArgumentException
* if path is not valid
*/
private static void validatePath(String path) {
if (path == null) {
throw new IllegalArgumentException(LocalizedStrings.GemFireCache_PATH_CANNOT_BE_NULL.toLocalizedString());
}
if (path.length() == 0) {
throw new IllegalArgumentException(LocalizedStrings.GemFireCache_PATH_CANNOT_BE_EMPTY.toLocalizedString());
}
if (path.equals(Region.SEPARATOR)) {
throw new IllegalArgumentException(LocalizedStrings.GemFireCache_PATH_CANNOT_BE_0.toLocalizedString(Region.SEPARATOR));
}
}
public LocalRegion getRegionByPath(String path) {
validatePath(path); // fix for bug 34892
{ // do this before checking the pathToRegion map
LocalRegion result = getReinitializingRegion(path);
if (result != null) {
return result;
}
}
return (LocalRegion) this.pathToRegion.get(path);
}
public LocalRegion getRegionByPathForProcessing(String path) {
LocalRegion result = getRegionByPath(path);
if (result == null) {
stopper.checkCancelInProgress(null);
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // go through
// initialization
// latches
try {
String[] pathParts = parsePath(path);
LocalRegion root;
synchronized (this.rootRegions) {
root = (LocalRegion) this.rootRegions.get(pathParts[0]);
if (root == null)
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.getRegion, calling getSubregion on root({}): {}", pathParts[0], pathParts[1]);
}
result = (LocalRegion) root.getSubregion(pathParts[1], true);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
return result;
}
/**
* @param returnDestroyedRegion
* if true, okay to return a destroyed region
*/
public Region getRegion(String path, boolean returnDestroyedRegion) {
stopper.checkCancelInProgress(null);
{
LocalRegion result = getRegionByPath(path);
// Do not waitOnInitialization() for PR
// if (result != null && !(result instanceof PartitionedRegion)) {
if (result != null) {
result.waitOnInitialization();
if (!returnDestroyedRegion && result.isDestroyed()) {
stopper.checkCancelInProgress(null);
return null;
} else {
return result;
}
}
}
String[] pathParts = parsePath(path);
LocalRegion root;
synchronized (this.rootRegions) {
root = (LocalRegion) this.rootRegions.get(pathParts[0]);
if (root == null) {
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.getRegion, no region found for {}", pathParts[0]);
}
stopper.checkCancelInProgress(null);
return null;
}
if (!returnDestroyedRegion && root.isDestroyed()) {
stopper.checkCancelInProgress(null);
return null;
}
}
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.getRegion, calling getSubregion on root({}): {}", pathParts[0], pathParts[1]);
}
return root.getSubregion(pathParts[1], returnDestroyedRegion);
}
/**
* @param returnDestroyedRegion
* if true, okay to return a destroyed partitioned region
*/
public final Region getPartitionedRegion(String path, boolean returnDestroyedRegion) {
stopper.checkCancelInProgress(null);
{
LocalRegion result = getRegionByPath(path);
// Do not waitOnInitialization() for PR
if (result != null) {
if (!(result instanceof PartitionedRegion)) {
return null;
} else {
return result;
}
}
}
String[] pathParts = parsePath(path);
LocalRegion root;
LogWriterI18n logger = getLoggerI18n();
synchronized (this.rootRegions) {
root = (LocalRegion) this.rootRegions.get(pathParts[0]);
if (root == null) {
if (logger.fineEnabled()) {
logger.fine("GemFireCache.getRegion, no region found for " + pathParts[0]);
}
stopper.checkCancelInProgress(null);
return null;
}
if (!returnDestroyedRegion && root.isDestroyed()) {
stopper.checkCancelInProgress(null);
return null;
}
}
if (logger.fineEnabled()) {
logger.fine("GemFireCache.getPartitionedRegion, calling getSubregion on root(" + pathParts[0] + "): " + pathParts[1]);
}
Region result = root.getSubregion(pathParts[1], returnDestroyedRegion);
if (result != null && !(result instanceof PartitionedRegion)) {
return null;
} else {
return result;
}
}
/** Return true if this region is initializing */
boolean isGlobalRegionInitializing(String fullPath) {
stopper.checkCancelInProgress(null);
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // go through
// initialization
// latches
try {
return isGlobalRegionInitializing((LocalRegion) getRegion(fullPath));
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
/** Return true if this region is initializing */
boolean isGlobalRegionInitializing(LocalRegion region) {
boolean result = region != null && region.scope.isGlobal() && !region.isInitialized();
if (result) {
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.isGlobalRegionInitializing ({})", region.getFullPath());
}
}
return result;
}
public Set rootRegions() {
return rootRegions(false);
}
public final Set rootRegions(boolean includePRAdminRegions) {
return rootRegions(includePRAdminRegions, true);
}
private final Set rootRegions(boolean includePRAdminRegions, boolean waitForInit) {
stopper.checkCancelInProgress(null);
Set regions = new HashSet();
synchronized (this.rootRegions) {
for (Iterator itr = this.rootRegions.values().iterator(); itr.hasNext();) {
LocalRegion r = (LocalRegion) itr.next();
// If this is an internal meta-region, don't return it to end user
if (r.isSecret() || r.isUsedForMetaRegion() || r instanceof HARegion || !includePRAdminRegions
&& (r.isUsedForPartitionedRegionAdmin() || r.isUsedForPartitionedRegionBucket())) {
continue; // Skip administrative PartitionedRegions
}
regions.add(r);
}
}
if (waitForInit) {
for (Iterator r = regions.iterator(); r.hasNext();) {
LocalRegion lr = (LocalRegion) r.next();
// lr.waitOnInitialization();
if (!lr.checkForInitialization()) {
r.remove();
}
}
}
return Collections.unmodifiableSet(regions);
}
/**
* Called by ccn when a client goes away
*
* @since 5.7
*/
public void cleanupForClient(CacheClientNotifier ccn, ClientProxyMembershipID client) {
try {
if (isClosed())
return;
Iterator it = rootRegions(false, false).iterator();
while (it.hasNext()) {
LocalRegion lr = (LocalRegion) it.next();
lr.cleanupForClient(ccn, client);
}
} catch (DistributedSystemDisconnectedException ignore) {
}
}
public boolean isClosed() {
return this.isClosing;
}
public int getLockTimeout() {
return this.lockTimeout;
}
public void setLockTimeout(int seconds) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
this.lockTimeout = seconds;
}
public int getLockLease() {
return this.lockLease;
}
public void setLockLease(int seconds) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
this.lockLease = seconds;
}
public int getSearchTimeout() {
return this.searchTimeout;
}
public void setSearchTimeout(int seconds) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
this.searchTimeout = seconds;
}
public int getMessageSyncInterval() {
return HARegionQueue.getMessageSyncInterval();
}
public void setMessageSyncInterval(int seconds) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
if (seconds < 0) {
throw new IllegalArgumentException(
LocalizedStrings.GemFireCache_THE_MESSAGESYNCINTERVAL_PROPERTY_FOR_CACHE_CANNOT_BE_NEGATIVE.toLocalizedString());
}
HARegionQueue.setMessageSyncInterval(seconds);
}
/**
* Get a reference to a Region that is reinitializing, or null if that Region is not reinitializing or this thread is
* interrupted. If a reinitializing region is found, then this method blocks until reinitialization is complete and
* then returns the region.
*/
LocalRegion getReinitializingRegion(String fullPath) {
Future future = (Future) this.reinitializingRegions.get(fullPath);
if (future == null) {
return null;
}
try {
LocalRegion region = (LocalRegion) future.get();
region.waitOnInitialization();
if (logger.isDebugEnabled()) {
logger.debug("Returning manifested future for: {}", fullPath);
}
return region;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
throw new Error(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString(), e);
} catch (CancellationException e) {
// future was cancelled
logger.debug("future cancelled, returning null");
return null;
}
}
/**
* Register the specified region name as reinitializing, creating and adding a Future for it to the map.
*
* @throws IllegalStateException
* if there is already a region by that name registered.
*/
void regionReinitializing(String fullPath) {
Object old = this.reinitializingRegions.putIfAbsent(fullPath, new FutureResult(this.stopper));
if (old != null) {
throw new IllegalStateException(LocalizedStrings.GemFireCache_FOUND_AN_EXISTING_REINITALIZING_REGION_NAMED_0
.toLocalizedString(fullPath));
}
}
/**
* Set the reinitialized region and unregister it as reinitializing.
*
* @throws IllegalStateException
* if there is no region by that name registered as reinitializing.
*/
void regionReinitialized(Region region) {
String regionName = region.getFullPath();
FutureResult future = (FutureResult) this.reinitializingRegions.get(regionName);
if (future == null) {
throw new IllegalStateException(LocalizedStrings.GemFireCache_COULD_NOT_FIND_A_REINITIALIZING_REGION_NAMED_0
.toLocalizedString(regionName));
}
future.set(region);
unregisterReinitializingRegion(regionName);
}
/**
* Clear a reinitializing region, e.g. reinitialization failed.
*
* @throws IllegalStateException
* if cannot find reinitializing region registered by that name.
*/
void unregisterReinitializingRegion(String fullPath) {
/* Object previous = */this.reinitializingRegions.remove(fullPath);
// if (previous == null) {
// throw new IllegalStateException("Could not find a reinitializing region
// named " +
// fullPath);
// }
}
// /////////////////////////////////////////////////////////////
/**
* Returns true if get should give a copy; false if a reference.
*
* @since 4.0
*/
final boolean isCopyOnRead() {
return this.copyOnRead;
}
/**
* Implementation of {@link com.gemstone.gemfire.cache.Cache#setCopyOnRead}
*
* @since 4.0
*/
public void setCopyOnRead(boolean copyOnRead) {
this.copyOnRead = copyOnRead;
}
/**
* Implementation of {@link com.gemstone.gemfire.cache.Cache#getCopyOnRead}
*
* @since 4.0
*/
final public boolean getCopyOnRead() {
return this.copyOnRead;
}
/**
* Remove the specified root region
*
* @param rootRgn
* the region to be removed
* @return true if root region was removed, false if not found
*/
boolean removeRoot(LocalRegion rootRgn) {
synchronized (this.rootRegions) {
String rgnName = rootRgn.getName();
LocalRegion found = (LocalRegion) this.rootRegions.get(rgnName);
if (found == rootRgn) {
LocalRegion previous = (LocalRegion) this.rootRegions.remove(rgnName);
Assert.assertTrue(previous == rootRgn);
return true;
} else
return false;
}
}
/**
* @return array of two Strings, the root name and the relative path from root If there is no relative path from root,
* then String[1] will be an empty string
*/
static String[] parsePath(String p_path) {
String path = p_path;
validatePath(path);
String[] result = new String[2];
result[1] = "";
// strip off root name from path
int slashIndex = path.indexOf(Region.SEPARATOR_CHAR);
if (slashIndex == 0) {
path = path.substring(1);
slashIndex = path.indexOf(Region.SEPARATOR_CHAR);
}
result[0] = path;
if (slashIndex > 0) {
result[0] = path.substring(0, slashIndex);
result[1] = path.substring(slashIndex + 1);
}
return result;
}
/**
* Makes note of a <code>CacheLifecycleListener</code>
*/
public static void addCacheLifecycleListener(CacheLifecycleListener l) {
synchronized (GemFireCacheImpl.class) {
cacheLifecycleListeners.add(l);
}
}
/**
* Removes a <code>CacheLifecycleListener</code>
*
* @return Whether or not the listener was removed
*/
public static boolean removeCacheLifecycleListener(CacheLifecycleListener l) {
synchronized (GemFireCacheImpl.class) {
return cacheLifecycleListeners.remove(l);
}
}
/**
* Creates the single instance of the Transation Manager for this cache. Returns the existing one upon request.
*
* @return the CacheTransactionManager instance.
*
* @since 4.0
*/
public CacheTransactionManager getCacheTransactionManager() {
return this.txMgr;
}
/**
* @see CacheClientProxy
* @guarded.By {@link #ccpTimerMutex}
*/
private SystemTimer ccpTimer;
/**
* @see #ccpTimer
*/
private final Object ccpTimerMutex = new Object();
/**
* Get cache-wide CacheClientProxy SystemTimer
*
* @return the timer, lazily created
*/
public SystemTimer getCCPTimer() {
synchronized (ccpTimerMutex) {
if (ccpTimer != null) {
return ccpTimer;
}
ccpTimer = new SystemTimer(getDistributedSystem(), true);
if (this.isClosing) {
ccpTimer.cancel(); // poison it, don't throw.
}
return ccpTimer;
}
}
/**
* @see LocalRegion
*/
private final ExpirationScheduler expirationScheduler;
/**
* Get cache-wide ExpirationScheduler
*
* @return the scheduler, lazily created
*/
public ExpirationScheduler getExpirationScheduler() {
return this.expirationScheduler;
}
TXManagerImpl getTXMgr() {
return this.txMgr;
}
/**
* Returns the <code>Executor</code> (thread pool) that is used to execute cache event listeners.
*
* @since 3.5
*/
Executor getEventThreadPool() {
Assert.assertTrue(this.eventThreadPool != null);
return this.eventThreadPool;
}
public BridgeServer addBridgeServer() {
return (BridgeServer) addCacheServer();
}
public CacheServer addCacheServer() {
return addCacheServer(false);
}
public CacheServer addCacheServer(boolean isGatewayReceiver) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
BridgeServerImpl bridge = new BridgeServerImpl(this, isGatewayReceiver);
allBridgeServers.add(bridge);
sendAddCacheServerProfileMessage();
return bridge;
}
public void addGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
synchronized (allGatewaySendersLock) {
if (!allGatewaySenders.contains(sender)) {
new UpdateAttributesProcessor((AbstractGatewaySender) sender).distribute(true);
Set<GatewaySender> tmp = new HashSet<GatewaySender>(allGatewaySenders.size() + 1);
if (!allGatewaySenders.isEmpty()) {
tmp.addAll(allGatewaySenders);
}
tmp.add(sender);
this.allGatewaySenders = Collections.unmodifiableSet(tmp);
} else {
throw new IllegalStateException(LocalizedStrings.GemFireCache_A_GATEWAYSENDER_WITH_ID_0_IS_ALREADY_DEFINED_IN_THIS_CACHE
.toLocalizedString(sender.getId()));
}
}
synchronized (this.rootRegions) {
Set<LocalRegion> appRegions = getApplicationRegions();
for (LocalRegion r : appRegions) {
Set<String> senders = r.getAllGatewaySenderIds();
if (senders.contains(sender.getId()) && !sender.isParallel()) {
r.senderCreated();
}
}
}
if(!sender.isParallel()) {
Region dynamicMetaRegion = getRegion(DynamicRegionFactory.dynamicRegionListName);
if(dynamicMetaRegion == null) {
if(logger.isDebugEnabled()) {
logger.debug(" The dynamic region is null. ");
}
} else {
dynamicMetaRegion.getAttributesMutator().addGatewaySenderId(sender.getId());
}
}
if (!(sender.getRemoteDSId() < 0)) {
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_CREATE, sender);
}
}
public void removeGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
synchronized (allGatewaySendersLock) {
if (allGatewaySenders.contains(sender)) {
new UpdateAttributesProcessor((AbstractGatewaySender) sender, true).distribute(true);
Set<GatewaySender> tmp = new HashSet<GatewaySender>(allGatewaySenders.size() - 1);
if (!allGatewaySenders.isEmpty()) {
tmp.addAll(allGatewaySenders);
}
tmp.remove(sender);
this.allGatewaySenders = Collections.unmodifiableSet(tmp);
}
}
}
public void addGatewayReceiver(GatewayReceiver recv) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
synchronized (allGatewayReceiversLock) {
Set<GatewayReceiver> tmp = new HashSet<GatewayReceiver>(allGatewayReceivers.size() + 1);
if (!allGatewayReceivers.isEmpty()) {
tmp.addAll(allGatewayReceivers);
}
tmp.add(recv);
this.allGatewayReceivers = Collections.unmodifiableSet(tmp);
}
}
public void addAsyncEventQueue(AsyncEventQueue asyncQueue) {
this.allAsyncEventQueues.add(asyncQueue);
system
.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue);
}
/**
* Returns List of GatewaySender (excluding the senders for internal use)
*
* @return List List of GatewaySender objects
*/
public Set<GatewaySender> getGatewaySenders() {
Set<GatewaySender> tempSet = new HashSet<GatewaySender>();
for (GatewaySender sender : allGatewaySenders) {
if (!((AbstractGatewaySender)sender).isForInternalUse()) {
tempSet.add(sender);
}
}
return tempSet;
}
/**
* Returns List of all GatewaySenders (including the senders for internal use)
*
* @return List List of GatewaySender objects
*/
public Set<GatewaySender> getAllGatewaySenders() {
return this.allGatewaySenders;
}
public GatewaySender getGatewaySender(String Id) {
for (GatewaySender sender : this.allGatewaySenders) {
if (sender.getId().equals(Id)) {
return sender;
}
}
return null;
}
public Set<GatewayReceiver> getGatewayReceivers() {
return this.allGatewayReceivers;
}
public Set<AsyncEventQueue> getAsyncEventQueues() {
return this.allAsyncEventQueues;
}
public AsyncEventQueue getAsyncEventQueue(String id) {
for (AsyncEventQueue asyncEventQueue : this.allAsyncEventQueues) {
if (asyncEventQueue.getId().equals(id)) {
return asyncEventQueue;
}
}
return null;
}
public void removeAsyncEventQueue(AsyncEventQueue asyncQueue) {
if (isClient()) {
throw new UnsupportedOperationException(
"operation is not supported on a client cache");
}
// first remove the gateway sender of the queue
if (asyncQueue instanceof AsyncEventQueueImpl) {
removeGatewaySender(((AsyncEventQueueImpl)asyncQueue).getSender());
}
// using gateway senders lock since async queue uses a gateway sender
synchronized (allGatewaySendersLock) {
this.allAsyncEventQueues.remove(asyncQueue);
}
}
/* Cache API - get the conflict resolver for WAN */
public GatewayConflictResolver getGatewayConflictResolver() {
synchronized (this.allGatewayHubsLock) {
return this.gatewayConflictResolver;
}
}
/* Cache API - set the conflict resolver for WAN */
public void setGatewayConflictResolver(GatewayConflictResolver resolver) {
synchronized (this.allGatewayHubsLock) {
this.gatewayConflictResolver = resolver;
}
}
public List getBridgeServers() {
return getCacheServers();
}
public List getCacheServers() {
List bridgeServersWithoutReceiver = null;
if (!allBridgeServers.isEmpty()) {
Iterator allBridgeServersIterator = allBridgeServers.iterator();
while (allBridgeServersIterator.hasNext()) {
BridgeServerImpl bridgeServer = (BridgeServerImpl) allBridgeServersIterator.next();
// If BridgeServer is a GatewayReceiver, don't return as part of CacheServers
if (!bridgeServer.isGatewayReceiver()) {
if (bridgeServersWithoutReceiver == null) {
bridgeServersWithoutReceiver = new ArrayList();
}
bridgeServersWithoutReceiver.add(bridgeServer);
}
}
}
if (bridgeServersWithoutReceiver == null) {
bridgeServersWithoutReceiver = Collections.emptyList();
}
return bridgeServersWithoutReceiver;
}
public List getBridgeServersAndGatewayReceiver() {
return allBridgeServers;
}
/**
* notify partitioned regions that this cache requires all of their events
*/
public void requiresPREvents() {
synchronized (this.partitionedRegions) {
for (Iterator it = this.partitionedRegions.iterator(); it.hasNext();) {
((PartitionedRegion) it.next()).cacheRequiresNotification();
}
}
}
/**
* add a partitioned region to the set of tracked partitioned regions. This is used to notify the regions when this
* cache requires, or does not require notification of all region/entry events.
*/
public void addPartitionedRegion(PartitionedRegion r) {
synchronized (GemFireCacheImpl.class) {
synchronized (this.partitionedRegions) {
if (r.isDestroyed()) {
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r);
}
return;
}
if (this.partitionedRegions.add(r)) {
getCachePerfStats().incPartitionedRegions(1);
}
}
}
}
/**
* Returns a set of all current partitioned regions for test hook.
*/
public Set<PartitionedRegion> getPartitionedRegions() {
synchronized (this.partitionedRegions) {
return new HashSet<PartitionedRegion>(this.partitionedRegions);
}
}
private TreeMap<String, Map<String, PartitionedRegion>> getPRTrees() {
// prTree will save a sublist of PRs who are under the same root
TreeMap<String, Map<String, PartitionedRegion>> prTrees = new TreeMap();
TreeMap<String, PartitionedRegion> prMap = getPartitionedRegionMap();
boolean hasColocatedRegion = false;
for (PartitionedRegion pr : prMap.values()) {
List<PartitionedRegion> childlist = ColocationHelper.getColocatedChildRegions(pr);
if (childlist != null && childlist.size() > 0) {
hasColocatedRegion = true;
break;
}
}
if (hasColocatedRegion) {
LinkedHashMap<String, PartitionedRegion> orderedPrMap = orderByColocation(prMap);
prTrees.put("ROOT", orderedPrMap);
} else {
for (PartitionedRegion pr : prMap.values()) {
String rootName = pr.getRoot().getName();
TreeMap<String, PartitionedRegion> prSubMap = (TreeMap<String, PartitionedRegion>) prTrees.get(rootName);
if (prSubMap == null) {
prSubMap = new TreeMap();
prTrees.put(rootName, prSubMap);
}
prSubMap.put(pr.getFullPath(), pr);
}
}
return prTrees;
}
private TreeMap<String, PartitionedRegion> getPartitionedRegionMap() {
TreeMap<String, PartitionedRegion> prMap = new TreeMap();
for (Map.Entry<String, Region> entry : ((Map<String,Region>)pathToRegion).entrySet()) {
String regionName = (String) entry.getKey();
Region region = entry.getValue();
//Don't wait for non partitioned regions
if(!(region instanceof PartitionedRegion)) {
continue;
}
// Do a getRegion to ensure that we wait for the partitioned region
//to finish initialization
try {
Region pr = getRegion(regionName);
if (pr instanceof PartitionedRegion) {
prMap.put(regionName, (PartitionedRegion) pr);
}
} catch (CancelException ce) {
// if some region throws cancel exception during initialization,
// then no need to shutdownall them gracefully
}
}
return prMap;
}
private LinkedHashMap<String, PartitionedRegion> orderByColocation(TreeMap<String, PartitionedRegion> prMap) {
LinkedHashMap<String, PartitionedRegion> orderedPrMap = new LinkedHashMap();
for (PartitionedRegion pr : prMap.values()) {
addColocatedChildRecursively(orderedPrMap, pr);
}
return orderedPrMap;
}
private void addColocatedChildRecursively(LinkedHashMap<String, PartitionedRegion> prMap, PartitionedRegion pr) {
for (PartitionedRegion colocatedRegion : ColocationHelper.getColocatedChildRegions(pr)) {
addColocatedChildRecursively(prMap, colocatedRegion);
}
prMap.put(pr.getFullPath(), pr);
}
/**
* check to see if any cache components require notification from a partitioned region. Notification adds to the
* messaging a PR must do on each put/destroy/invalidate operation and should be kept to a minimum
*
* @param r
* the partitioned region
* @return true if the region should deliver all of its events to this cache
*/
protected boolean requiresNotificationFromPR(PartitionedRegion r) {
synchronized (GemFireCacheImpl.class) {
boolean hasSerialSenders = hasSerialSenders(r);
boolean result = hasSerialSenders;
if (!result) {
Iterator allBridgeServersIterator = allBridgeServers.iterator();
while (allBridgeServersIterator.hasNext()) {
BridgeServerImpl server = (BridgeServerImpl) allBridgeServersIterator.next();
if (!server.getNotifyBySubscription()) {
result = true;
break;
}
}
}
return result;
}
}
private boolean hasSerialSenders(PartitionedRegion r) {
boolean hasSenders = false;
Set<String> senders = r.getAllGatewaySenderIds();
for (String sender : senders) {
GatewaySender gs = this.getGatewaySender(sender);
if (gs != null && !gs.isParallel()) {
hasSenders = true;
break;
}
}
return hasSenders;
}
/**
* remove a partitioned region from the set of tracked instances.
*
* @see #addPartitionedRegion(PartitionedRegion)
*/
public void removePartitionedRegion(PartitionedRegion r) {
synchronized (this.partitionedRegions) {
if (this.partitionedRegions.remove(r)) {
getCachePerfStats().incPartitionedRegions(-1);
}
}
}
public void setIsServer(boolean isServer) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
stopper.checkCancelInProgress(null);
this.isServer = isServer;
}
public boolean isServer() {
if (isClient()) {
return false;
}
stopper.checkCancelInProgress(null);
if (!this.isServer) {
return (this.allBridgeServers.size() > 0);
} else {
return true;
}
}
public QueryService getQueryService() {
if (isClient()) {
Pool p = getDefaultPool();
if (p == null) {
throw new IllegalStateException("Client cache does not have a default pool. Use getQueryService(String poolName) instead.");
} else {
return p.getQueryService();
}
} else {
return new DefaultQueryService(this);
}
}
public QueryService getLocalQueryService() {
return new DefaultQueryService(this);
}
/**
* @return Context jndi context associated with the Cache.
* @since 4.0
*/
public Context getJNDIContext() {
// if (isClient()) {
// throw new UnsupportedOperationException("operation is not supported on a client cache");
// }
return JNDIInvoker.getJNDIContext();
}
/**
* @return JTA TransactionManager associated with the Cache.
* @since 4.0
*/
public javax.transaction.TransactionManager getJTATransactionManager() {
// if (isClient()) {
// throw new UnsupportedOperationException("operation is not supported on a client cache");
// }
return JNDIInvoker.getTransactionManager();
}
/**
* return the cq/interest information for a given region name, creating one if it doesn't exist
*/
public FilterProfile getFilterProfile(String regionName) {
LocalRegion r = (LocalRegion) getRegion(regionName, true);
if (r != null) {
return r.getFilterProfile();
}
return null;
}
public RegionAttributes getRegionAttributes(String id) {
return (RegionAttributes) this.namedRegionAttributes.get(id);
}
public void setRegionAttributes(String id, RegionAttributes attrs) {
if (attrs == null) {
this.namedRegionAttributes.remove(id);
} else {
this.namedRegionAttributes.put(id, attrs);
}
}
public Map listRegionAttributes() {
return Collections.unmodifiableMap(this.namedRegionAttributes);
}
private static final ThreadLocal xmlCache = new ThreadLocal();
/**
* Returns the cache currently being xml initialized by the thread that calls this method. The result will be null if
* the thread is not initializing a cache.
*/
public static GemFireCacheImpl getXmlCache() {
return (GemFireCacheImpl) xmlCache.get();
}
public void loadCacheXml(InputStream stream) throws TimeoutException, CacheWriterException, GatewayException,
RegionExistsException {
// make this cache available to callbacks being initialized during xml create
final Object oldValue = xmlCache.get();
xmlCache.set(this);
try {
CacheXmlParser xml;
if (xmlParameterizationEnabled) {
char[] buffer = new char[1024];
Reader reader = new BufferedReader(new InputStreamReader(stream, "ISO-8859-1"));
Writer stringWriter = new StringWriter();
int n = -1;
while ((n = reader.read(buffer)) != -1) {
stringWriter.write(buffer, 0, n);
}
/** Now replace all replaceable system properties here using <code>PropertyResolver</code> */
String replacedXmlString = resolver.processUnresolvableString(stringWriter.toString());
/*
* Turn the string back into the default encoding so that the XML parser can work correctly in the presence of
* an "encoding" attribute in the XML prolog.
*/
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(baos, "ISO-8859-1");
writer.write(replacedXmlString);
writer.flush();
xml = CacheXmlParser.parse(new ByteArrayInputStream(baos.toByteArray()));
} else {
xml = CacheXmlParser.parse(stream);
}
xml.create(this);
} catch (IOException e) {
throw new CacheXmlException("Input Stream could not be read for system property substitutions.");
} finally {
xmlCache.set(oldValue);
}
}
public void readyForEvents() {
PoolManagerImpl.readyForEvents(this.system, false);
}
/**
* This cache's reliable message queue factory. Should always have an instance of it.
*/
private ReliableMessageQueueFactory rmqFactory;
private List<File> backupFiles = Collections.emptyList();
/**
* Initializes the reliable message queue. Needs to be called at cache creation
*
* @throws IllegalStateException
* if the factory is in use
*/
private void initReliableMessageQueueFactory() {
synchronized (GemFireCacheImpl.class) {
if (this.rmqFactory != null) {
this.rmqFactory.close(false);
}
this.rmqFactory = new ReliableMessageQueueFactoryImpl();
}
}
/**
* Returns this cache's ReliableMessageQueueFactory.
*
* @since 5.0
*/
public ReliableMessageQueueFactory getReliableMessageQueueFactory() {
return this.rmqFactory;
}
public InternalResourceManager getResourceManager() {
return getResourceManager(true);
}
public InternalResourceManager getResourceManager(boolean checkCancellationInProgress) {
if (checkCancellationInProgress) {
stopper.checkCancelInProgress(null);
}
return this.resourceManager;
}
public void setBackupFiles(List<File> backups) {
this.backupFiles = backups;
}
public List<File> getBackupFiles() {
return Collections.unmodifiableList(this.backupFiles);
}
public BackupManager startBackup(InternalDistributedMember sender)
throws IOException {
BackupManager manager = new BackupManager(sender, this);
if (!this.backupManager.compareAndSet(null, manager)) {
// TODO prpersist internationalize this
throw new IOException("Backup already in progress");
}
manager.start();
return manager;
}
public void clearBackupManager() {
this.backupManager.set(null);
}
public BackupManager getBackupManager() {
return this.backupManager.get();
}
// //////////////////// Inner Classes //////////////////////
// TODO make this a simple int guarded by riWaiters and get rid of the double-check
private final AtomicInteger registerInterestsInProgress = new AtomicInteger();
private final ArrayList<SimpleWaiter> riWaiters = new ArrayList<SimpleWaiter>();
private TypeRegistry pdxRegistry; // never changes but is currently not
// initialized in constructor
/**
* update stats for completion of a registerInterest operation
*/
public void registerInterestCompleted() {
// Don't do a cancellation check, it's just a moot point, that's all
// GemFireCache.this.getCancelCriterion().checkCancelInProgress(null);
if (GemFireCacheImpl.this.isClosing) {
return; // just get out, all of the SimpleWaiters will die of their own accord
}
int cv = registerInterestsInProgress.decrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("registerInterestCompleted: new value = {}", cv);
}
if (cv == 0) {
synchronized (riWaiters) {
// TODO double-check
cv = registerInterestsInProgress.get();
if (cv == 0) { // all clear
if (logger.isDebugEnabled()) {
logger.debug("registerInterestCompleted: Signalling end of register-interest");
}
Iterator it = riWaiters.iterator();
while (it.hasNext()) {
SimpleWaiter sw = (SimpleWaiter) it.next();
sw.doNotify();
}
riWaiters.clear();
} // all clear
} // synchronized
}
}
public void registerInterestStarted() {
// Don't do a cancellation check, it's just a moot point, that's all
// GemFireCache.this.getCancelCriterion().checkCancelInProgress(null);
int newVal = registerInterestsInProgress.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("registerInterestsStarted: new count = {}", newVal);
}
}
/**
* update stats for initiation of a registerInterest operation
*/
/**
* Blocks until no register interests are in progress.
*/
public void waitForRegisterInterestsInProgress() {
// In *this* particular context, let the caller know that
// his cache has been cancelled. doWait below would do that as
// well, so this is just an early out.
GemFireCacheImpl.this.getCancelCriterion().checkCancelInProgress(null);
int count = registerInterestsInProgress.get();
SimpleWaiter sw = null;
if (count > 0) {
synchronized (riWaiters) {
// TODO double-check
count = registerInterestsInProgress.get();
if (count > 0) {
if (logger.isDebugEnabled()) {
logger.debug("waitForRegisterInterestsInProgress: count ={}", count);
}
sw = new SimpleWaiter();
riWaiters.add(sw);
}
} // synchronized
if (sw != null) {
sw.doWait();
}
}
}
/**
* Wait for given sender queue to flush for given timeout.
*
* @param id
* ID of GatewaySender or AsyncEventQueue
* @param isAsyncListener
* true if this is for an AsyncEventQueue and false if for a
* GatewaySender
* @param maxWaitTime
* maximum time to wait in seconds; zero or -ve means infinite wait
*
* @return zero if maxWaitTime was not breached, -1 if queue could not be
* found or is closed, and elapsed time if timeout was breached
*/
public int waitForSenderQueueFlush(String id, boolean isAsyncListener,
int maxWaitTime) {
getCancelCriterion().checkCancelInProgress(null);
AbstractGatewaySender gatewaySender = null;
if (isAsyncListener) {
AsyncEventQueueImpl asyncQueue = (AsyncEventQueueImpl)
getAsyncEventQueue(id);
if (asyncQueue != null) {
gatewaySender = (AbstractGatewaySender) asyncQueue.getSender();
}
}
else {
gatewaySender = (AbstractGatewaySender)getGatewaySender(id);
}
RegionQueue rq;
final long startTime = System.currentTimeMillis();
long elapsedTime;
if (maxWaitTime <= 0) {
maxWaitTime = Integer.MAX_VALUE;
}
while (gatewaySender != null && gatewaySender.isRunning()
&& (rq = gatewaySender.getQueue()) != null) {
if (rq.size() == 0) {
// return zero since it was not a timeout
return 0;
}
try {
Thread.sleep(500);
getCancelCriterion().checkCancelInProgress(null);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
getCancelCriterion().checkCancelInProgress(ie);
}
// clear interrupted flag before retry
Thread.interrupted();
elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime >= (maxWaitTime * 1000L)) {
// return elapsed time
return (int)(elapsedTime / 1000L);
}
}
return -1;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
public void setQueryMonitorRequiredForResourceManager(boolean required) {
QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = required;
}
public boolean isQueryMonitorDisabledForLowMemory() {
return QUERY_MONITOR_DISABLED_FOR_LOW_MEM;
}
/**
* Returns the QueryMonitor instance based on system property MAX_QUERY_EXECUTION_TIME.
* @since 6.0
*/
public QueryMonitor getQueryMonitor() {
//Check to see if monitor is required if ResourceManager critical heap percentage is set
//@see com.gemstone.gemfire.cache.control.ResourceManager#setCriticalHeapPercentage(int)
//or whether we override it with the system variable;
boolean monitorRequired = !QUERY_MONITOR_DISABLED_FOR_LOW_MEM && QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER;
// Added for DUnit test purpose, which turns-on and off the this.TEST_MAX_QUERY_EXECUTION_TIME.
if (!(this.MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0 || monitorRequired)) {
// if this.TEST_MAX_QUERY_EXECUTION_TIME is set, send the QueryMonitor.
// Else send null, so that the QueryMonitor is turned-off.
return null;
}
// Return the QueryMonitor service if MAX_QUERY_EXECUTION_TIME is set or it is required by the ResourceManager and not overriden by system property.
if ((this.MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0 || monitorRequired) && this.queryMonitor == null) {
synchronized (queryMonitorLock) {
if (this.queryMonitor == null) {
int maxTime = MAX_QUERY_EXECUTION_TIME > TEST_MAX_QUERY_EXECUTION_TIME ? MAX_QUERY_EXECUTION_TIME
: TEST_MAX_QUERY_EXECUTION_TIME;
if (monitorRequired && maxTime < 0) {
//this means that the resource manager is being used and we need to monitor query memory usage
//If no max execution time has been set, then we will default to five hours
maxTime = FIVE_HOURS;
}
this.queryMonitor = new QueryMonitor(maxTime);
final LoggingThreadGroup group = LoggingThreadGroup.createThreadGroup("QueryMonitor Thread Group", logger);
Thread qmThread = new Thread(group, this.queryMonitor, "QueryMonitor Thread");
qmThread.setDaemon(true);
qmThread.start();
if (logger.isDebugEnabled()) {
logger.debug("QueryMonitor thread started.");
}
}
}
}
return this.queryMonitor;
}
/**
* Simple class to allow waiters for register interest. Has at most one thread that ever calls wait.
*
* @since 5.7
*/
private class SimpleWaiter {
private boolean notified = false;
SimpleWaiter() {
}
public void doWait() {
synchronized (this) {
while (!this.notified) {
GemFireCacheImpl.this.getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
this.wait(1000);
} catch (InterruptedException ex) {
interrupted = true;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
}
public void doNotify() {
synchronized (this) {
this.notified = true;
this.notifyAll();
}
}
}
private void sendAddCacheServerProfileMessage() {
DM dm = this.getDistributedSystem().getDistributionManager();
Set otherMembers = dm.getOtherDistributionManagerIds();
AddCacheServerProfileMessage msg = new AddCacheServerProfileMessage();
msg.operateOnLocalCache(this);
if (!otherMembers.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Sending add cache server profile message to other members.");
}
ReplyProcessor21 rp = new ReplyProcessor21(dm, otherMembers);
msg.setRecipients(otherMembers);
msg.processorId = rp.getProcessorId();
dm.putOutgoing(msg);
// Wait for replies.
try {
rp.waitForReplies();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
public TXManagerImpl getTxManager() {
return this.txMgr;
}
/**
* @since 6.5
*/
public <K, V> RegionFactory<K, V> createRegionFactory(RegionShortcut atts) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
} else {
return new RegionFactoryImpl<K, V>(this, atts);
}
}
/**
* @since 6.5
*/
public <K, V> RegionFactory<K, V> createRegionFactory() {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
return new RegionFactoryImpl<K, V>(this);
}
/**
* @since 6.5
*/
public <K, V> RegionFactory<K, V> createRegionFactory(String regionAttributesId) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
return new RegionFactoryImpl<K, V>(this, regionAttributesId);
}
/**
* @since 6.5
*/
public <K, V> RegionFactory<K, V> createRegionFactory(RegionAttributes<K, V> regionAttributes) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
return new RegionFactoryImpl<K, V>(this, regionAttributes);
}
/**
* @since 6.5
*/
public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(ClientRegionShortcut atts) {
return new ClientRegionFactoryImpl<K, V>(this, atts);
}
public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(String refid) {
return new ClientRegionFactoryImpl<K, V>(this, refid);
}
/**
* @since 6.5
*/
public QueryService getQueryService(String poolName) {
Pool p = PoolManager.find(poolName);
if (p == null) {
throw new IllegalStateException("Could not find a pool named " + poolName);
} else {
return p.getQueryService();
}
}
public RegionService createAuthenticatedView(Properties properties) {
Pool pool = getDefaultPool();
if (pool == null) {
throw new IllegalStateException("This cache does not have a default pool");
}
return createAuthenticatedCacheView(pool, properties);
}
public RegionService createAuthenticatedView(Properties properties, String poolName) {
Pool pool = PoolManager.find(poolName);
if (pool == null) {
throw new IllegalStateException("Pool " + poolName + " does not exist");
}
return createAuthenticatedCacheView(pool, properties);
}
public RegionService createAuthenticatedCacheView(Pool pool, Properties properties) {
if (pool.getMultiuserAuthentication()) {
return ((PoolImpl) pool).createAuthenticatedCacheView(properties);
} else {
throw new IllegalStateException("The pool " + pool.getName() + " did not have multiuser-authentication set to true");
}
}
public static void initializeRegionShortcuts(Cache c) {
// no shortcuts for SQLFabric since these are not used and some combinations
// are not supported
if (sqlfSystem()) {
return;
}
for (RegionShortcut pra : RegionShortcut.values()) {
switch (pra) {
case PARTITION: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_PERSISTENT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_PERSISTENT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_PERSISTENT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE_PERSISTENT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE_PERSISTENT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.LOCAL);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_PERSISTENT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.LOCAL);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.LOCAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.LOCAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_PERSISTENT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.LOCAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_PROXY: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(0);
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_PROXY_REDUNDANT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(0);
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case REPLICATE_PROXY: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.EMPTY);
af.setScope(Scope.DISTRIBUTED_ACK);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_HDFS: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.HDFS_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
af.setHDFSWriteOnly(false);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_HDFS: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.HDFS_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
af.setHDFSWriteOnly(false);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_WRITEONLY_HDFS_STORE: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.HDFS_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
af.setHDFSWriteOnly(true);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PARTITION_REDUNDANT_WRITEONLY_HDFS_STORE: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.HDFS_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
af.setHDFSWriteOnly(true);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
default:
throw new IllegalStateException("unhandled enum " + pra);
}
}
}
public static void initializeClientRegionShortcuts(Cache c) {
for (ClientRegionShortcut pra : ClientRegionShortcut.values()) {
switch (pra) {
case LOCAL: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_PERSISTENT: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case LOCAL_PERSISTENT_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
c.setRegionAttributes(pra.toString(), af.create());
break;
}
case PROXY: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.EMPTY);
UserSpecifiedRegionAttributes ra = (UserSpecifiedRegionAttributes) af.create();
ra.requiresPoolName = true;
c.setRegionAttributes(pra.toString(), ra);
break;
}
case CACHING_PROXY: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
UserSpecifiedRegionAttributes ra = (UserSpecifiedRegionAttributes) af.create();
ra.requiresPoolName = true;
c.setRegionAttributes(pra.toString(), ra);
break;
}
case CACHING_PROXY_HEAP_LRU: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
UserSpecifiedRegionAttributes ra = (UserSpecifiedRegionAttributes) af.create();
ra.requiresPoolName = true;
c.setRegionAttributes(pra.toString(), ra);
break;
}
case CACHING_PROXY_OVERFLOW: {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
UserSpecifiedRegionAttributes ra = (UserSpecifiedRegionAttributes) af.create();
ra.requiresPoolName = true;
c.setRegionAttributes(pra.toString(), ra);
break;
}
default:
throw new IllegalStateException("unhandled enum " + pra);
}
}
}
public void beginDestroy(String path, DistributedRegion region) {
this.regionsInDestroy.putIfAbsent(path, region);
}
public void endDestroy(String path, DistributedRegion region) {
this.regionsInDestroy.remove(path, region);
}
public DistributedRegion getRegionInDestroy(String path) {
return this.regionsInDestroy.get(path);
}
public DistributionAdvisee getSqlfAdvisee() {
return this.sqlfAdvisee;
}
public void setSqlfAdvisee(DistributionAdvisee advisee) {
this.sqlfAdvisee = advisee;
}
/**
* Mark a node as initialized or not initialized. Used by SQLFabric to avoid creation of buckets or routing of
* operations/functions on a node that is still in the DDL replay phase.
*/
public boolean updateNodeStatus(InternalDistributedMember member, boolean initialized) {
HashSet<BucketAdvisor> advisors = null;
synchronized (this.unInitializedMembers) {
if (initialized) {
if (this.unInitializedMembers.remove(member)) {
if (member.equals(getMyId())) {
// don't invoke volunteerForPrimary() inside the lock since
// BucketAdvisor will also require the lock after locking itself
advisors = new HashSet<BucketAdvisor>(this.deferredVolunteerForPrimary);
this.deferredVolunteerForPrimary.clear();
}
} else {
return false;
}
} else {
return this.unInitializedMembers.add(member);
}
}
if (advisors != null) {
for (BucketAdvisor advisor : advisors) {
if (logger.isDebugEnabled()) {
logger.debug("Invoking volunteer for primary for deferred bucket " + "post SQLFabric DDL replay for BucketAdvisor: {}", advisor);
}
advisor.volunteerForPrimary();
}
}
return true;
}
/**
* Return true if this node is still not initialized else false.
*/
public boolean isUnInitializedMember(InternalDistributedMember member) {
synchronized (this.unInitializedMembers) {
return this.unInitializedMembers.contains(member);
}
}
/**
* Return false for volunteer primary if this node is not currently initialized. Also adds the {@link BucketAdvisor}
* to a list that will be replayed once this node is initialized.
*/
public boolean doVolunteerForPrimary(BucketAdvisor advisor) {
synchronized (this.unInitializedMembers) {
if (!this.unInitializedMembers.contains(getMyId())) {
return true;
}
if (logger.isDebugEnabled()) {
logger.debug("Deferring volunteer for primary due to uninitialized " + "node (SQLFabric DDL replay) for BucketAdvisor: {}", advisor);
}
this.deferredVolunteerForPrimary.add(advisor);
return false;
}
}
/**
* Remove all the uninitialized members from the given collection.
*/
public final void removeUnInitializedMembers(Collection<InternalDistributedMember> members) {
synchronized (this.unInitializedMembers) {
for (final InternalDistributedMember m : this.unInitializedMembers) {
members.remove(m);
}
}
}
public final boolean isSqlfSystem() {
return this.sqlfSystem;
}
public static boolean sqlfSystem() {
return (instance != null && instance.isSqlfSystem());
}
public void setSqlfSystem() {
this.sqlfSystem = true;
}
public TombstoneService getTombstoneService() {
return this.tombstoneService;
}
public TypeRegistry getPdxRegistry() {
return this.pdxRegistry;
}
public boolean getPdxReadSerialized() {
return this.cacheConfig.pdxReadSerialized;
}
public PdxSerializer getPdxSerializer() {
return this.cacheConfig.pdxSerializer;
}
public String getPdxDiskStore() {
return this.cacheConfig.pdxDiskStore;
}
public boolean getPdxPersistent() {
return this.cacheConfig.pdxPersistent;
}
public boolean getPdxIgnoreUnreadFields() {
return this.cacheConfig.pdxIgnoreUnreadFields;
}
/**
* Returns true if any of the GemFire services prefers PdxInstance. And application has not requested getObject() on
* the PdxInstance.
*
*/
public boolean getPdxReadSerializedByAnyGemFireServices() {
if ((getPdxReadSerialized() || DefaultQuery.getPdxReadSerialized()) && PdxInstanceImpl.getPdxReadSerialized()) {
return true;
}
return false;
}
public CacheConfig getCacheConfig() {
return this.cacheConfig;
}
public DM getDistributionManager() {
return this.dm;
}
public GatewaySenderFactory createGatewaySenderFactory(){
return WANServiceProvider.createGatewaySenderFactory(this);
}
public GatewayReceiverFactory createGatewayReceiverFactory() {
return WANServiceProvider.createGatewayReceiverFactory(this);
}
public AsyncEventQueueFactory createAsyncEventQueueFactory() {
return new AsyncEventQueueFactoryImpl(this);
}
public DistributionAdvisor getDistributionAdvisor() {
return getResourceAdvisor();
}
public ResourceAdvisor getResourceAdvisor() {
return resourceAdvisor;
}
public Profile getProfile() {
return resourceAdvisor.createProfile();
}
public DistributionAdvisee getParentAdvisee() {
return null;
}
public InternalDistributedSystem getSystem() {
return this.system;
}
public String getFullPath() {
return "ResourceManager";
}
public void fillInProfile(Profile profile) {
resourceManager.fillInProfile(profile);
}
public int getSerialNumber() {
return this.serialNumber;
}
public TXEntryStateFactory getTXEntryStateFactory() {
return this.txEntryStateFactory;
}
// test hook
public void setPdxSerializer(PdxSerializer v) {
this.cacheConfig.setPdxSerializer(v);
basicSetPdxSerializer(v);
}
private void basicSetPdxSerializer(PdxSerializer v) {
TypeRegistry.setPdxSerializer(v);
if (v instanceof ReflectionBasedAutoSerializer) {
AutoSerializableManager asm = AutoSerializableManager.getInstance((ReflectionBasedAutoSerializer) v);
if (asm != null) {
asm.setRegionService(this);
}
}
}
// test hook
public void setReadSerialized(boolean v) {
this.cacheConfig.setPdxReadSerialized(v);
}
public void setDeclarativeCacheConfig(CacheConfig cacheConfig) {
this.cacheConfig.setDeclarativeConfig(cacheConfig);
basicSetPdxSerializer(this.cacheConfig.getPdxSerializer());
}
/**
* Add to the map of declarable properties. Any properties that exactly match existing
* properties for a class in the list will be discarded (no duplicate Properties allowed).
*
* @param mapOfNewDeclarableProps Map of the declarable properties to add
*/
public void addDeclarableProperties(final Map<Declarable, Properties> mapOfNewDeclarableProps) {
synchronized (this.declarablePropertiesMap) {
for (Map.Entry<Declarable, Properties> newEntry : mapOfNewDeclarableProps.entrySet()) {
// Find and remove a Declarable from the map if an "equal" version is already stored
Class clazz = newEntry.getKey().getClass();
Object matchingDeclarable = null;
for (Map.Entry<Declarable, Properties> oldEntry : this.declarablePropertiesMap.entrySet()) {
if (clazz.getName().equals(oldEntry.getKey().getClass().getName()) && (newEntry.getValue().equals(oldEntry.getValue()) ||
((newEntry.getKey() instanceof Identifiable) && (((Identifiable) oldEntry.getKey()).getId().equals(((Identifiable) newEntry.getKey()).getId()))))) {
matchingDeclarable = oldEntry.getKey();
break;
}
}
if (matchingDeclarable != null) {
this.declarablePropertiesMap.remove(matchingDeclarable);
}
// Now add the new/replacement properties to the map
this.declarablePropertiesMap.put(newEntry.getKey(), newEntry.getValue());
}
}
}
public static boolean isXmlParameterizationEnabled() {
return xmlParameterizationEnabled;
}
public static void setXmlParameterizationEnabled(boolean isXmlParameterizationEnabled) {
xmlParameterizationEnabled = isXmlParameterizationEnabled;
}
private Declarable initializer;
private Properties initializerProps;
/**
* A factory for temporary result sets than can overflow to disk.
*/
private TemporaryResultSetFactory resultSetFactory;
public Declarable getInitializer() {
return this.initializer;
}
public Properties getInitializerProps() {
return this.initializerProps;
}
public void setInitializer(Declarable initializer, Properties initializerProps) {
this.initializer = initializer;
this.initializerProps = initializerProps;
}
public PdxInstanceFactory createPdxInstanceFactory(String className) {
return PdxInstanceFactoryImpl.newCreator(className, true);
}
public PdxInstanceFactory createPdxInstanceFactory(String className, boolean b) {
return PdxInstanceFactoryImpl.newCreator(className, b);
}
public PdxInstance createPdxEnum(String className, String enumName, int enumOrdinal) {
return PdxInstanceFactoryImpl.createPdxEnum(className, enumName, enumOrdinal, this);
}
public JmxManagerAdvisor getJmxManagerAdvisor() {
return this.jmxAdvisor;
}
public CacheSnapshotService getSnapshotService() {
return new CacheSnapshotServiceImpl(this);
}
private void startColocatedJmxManagerLocator() {
InternalLocator loc = InternalLocator.getLocator();
if (loc != null) {
loc.startJmxManagerLocationService(this);
}
}
@Override
public HDFSStoreFactory createHDFSStoreFactory() {
// TODO Auto-generated method stub
return new HDFSStoreFactoryImpl(this);
}
public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) {
return new HDFSStoreFactoryImpl(this, creation);
}
public void addHDFSStore(HDFSStoreImpl hsi) {
HDFSStoreDirector.getInstance().addHDFSStore(hsi);
//TODO:HDFS Add a resource event for hdfs store creation as well
// like the following disk store event
//system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
}
public void removeHDFSStore(HDFSStoreImpl hsi) {
//hsi.destroy();
HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName());
//TODO:HDFS Add a resource event for hdfs store as well
// like the following disk store event
//system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
}
public void closeHDFSStores() {
HDFSRegionDirector.reset();
HDFSStoreDirector.getInstance().closeHDFSStores();
}
public HDFSStoreImpl findHDFSStore(String name) {
return HDFSStoreDirector.getInstance().getHDFSStore(name);
}
public Collection<HDFSStoreImpl> getHDFSStores() {
return HDFSStoreDirector.getInstance().getAllHDFSStores();
}
public TemporaryResultSetFactory getResultSetFactory() {
return this.resultSetFactory;
}
public MemoryAllocator getOffHeapStore() {
return this.getSystem().getOffHeapStore();
}
public DiskStoreMonitor getDiskStoreMonitor() {
return diskMonitor;
}
/**
* @see Extensible#getExtensionPoint()
* @since 8.1
*/
@Override
public ExtensionPoint<Cache> getExtensionPoint() {
return extensionPoint;
}
public static int getClientFunctionTimeout() {
return clientFunctionTimeout;
}
public CqService getCqService() {
return this.cqService;
}
/**
* get reference to LuceneService singleton
* @return LuceneService
*/
public LuceneService getLuceneService() {
return this.luceneService;
}
}