blob: 37a3c9b79e679c1fdd864267b4f1ca169bd2069d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.pdx.internal;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.DistributedLockService;
import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.locks.DLockService;
import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
import com.gemstone.gemfire.pdx.JSONFormatter;
import com.gemstone.gemfire.pdx.PdxInitializationException;
import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
/**
* @author dsmith
*
*/
public class PeerTypeRegistration implements TypeRegistration {
/**
*
*/
private static final int MAX_TRANSACTION_FAILURES = 10;
public static final String LOCK_SERVICE_NAME = "__PDX";
/**
* The region name. Public for tests only.
*/
public static final String REGION_NAME = "PdxTypes";
public static final String REGION_FULL_PATH = "/" + REGION_NAME;
private int nextTypeId;
private final int maxTypeId;
private int nextEnumId;
private final int maxEnumId;
private volatile DistributedLockService dls;
private final Object dlsLock = new Object();
private GemFireCacheImpl cache;
/**
* The region where the PDX metadata is stored.
* Because this region is transactional for our internal updates
* but we don't want to participate in the users transactions,
* all operations on this region must suspend any existing
* transactions with suspendTX/resumeTX.
*/
private Region<Object/*Integer or EnumCode*/, Object/*PdxType or enum info*/> idToType;
/** This map serves two purposes. It lets us look
* up an id based on a type, if we previously found that type
* in the region. And, if a type is present in this map, that means
* we read the type while holding the dlock, which means the type
* was distributed to all members.
*/
private Map<PdxType, Integer> typeToId = Collections.synchronizedMap(new HashMap<PdxType, Integer>());
private Map<EnumInfo, EnumId> enumToId = Collections.synchronizedMap(new HashMap<EnumInfo, EnumId>());
private final Map<String, Set<PdxType>> classToType = new CopyOnWriteHashMap<String, Set<PdxType>>();
private volatile boolean typeRegistryInUse = false;
public PeerTypeRegistration(GemFireCacheImpl cache) {
this.cache = cache;
int distributedSystemId = cache.getDistributedSystem().getDistributionManager().getDistributedSystemId();
if(distributedSystemId == -1) {
distributedSystemId = 0;
}
this.nextTypeId = distributedSystemId << 24;
this.maxTypeId = distributedSystemId << 24 | 0xFFFFFF;
this.nextEnumId = distributedSystemId << 24;
this.maxEnumId = distributedSystemId << 24 | 0xFFFFFF;
}
private Region<Object/*Integer or EnumCode*/, Object/*PdxType or enum info*/> getIdToType() {
if (this.idToType != null) {
return this.idToType;
} else {
if (this.cache.getPdxPersistent()
&& this.cache.getCacheConfig().pdxDiskStoreUserSet) {
throw new PdxInitializationException("PDX registry could not be initialized because the disk store " + this.cache.getPdxDiskStore() + " was not created.");
} else {
throw new PdxInitializationException("PDX registry was not initialized.");
}
}
}
public void initialize() {
AttributesFactory<Object, Object> factory = new AttributesFactory<Object, Object>();
factory.setScope(Scope.DISTRIBUTED_ACK);
if(cache.getPdxPersistent()) {
if(cache.getCacheConfig().pdxDiskStoreUserSet) {
factory.setDiskStoreName(cache.getPdxDiskStore());
} else {
factory.setDiskStoreName(cache.getOrCreateDefaultDiskStore().getName());
}
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
} else {
factory.setDataPolicy(DataPolicy.REPLICATE);
}
//Add a listener that makes sure that if anyone in the DS is using PDX
//Our PDX configuration is valid for this member. This is important if
//we are the gateway, we need to validate that we have a distributed system
//id.
factory.addCacheListener(new CacheListenerAdapter<Object, Object>() {
@Override
public void afterCreate(EntryEvent<Object, Object> event) {
verifyConfiguration();
//update a local map with the pdxtypes registered
Object value = event.getNewValue();
if(value instanceof PdxType){
updateClassToTypeMap((PdxType) value);
}
}
});
factory.setCacheWriter(new CacheWriterAdapter<Object, Object>() {
@Override
public void beforeUpdate(EntryEvent<Object, Object> event)
throws CacheWriterException {
if(!event.getOldValue().equals(event.getNewValue())) {
PdxRegistryMismatchException ex = new PdxRegistryMismatchException("Trying to add a PDXType with the same id as an existing PDX type. id=" + event.getKey() + ", existing pdx type " + event.getOldValue() + ", new type " + event.getNewValue());
throw new CacheWriterException(ex);
}
}
});
RegionAttributes<Object, Object> regionAttrs = factory.create();
InternalRegionArguments internalArgs = new InternalRegionArguments();
internalArgs.setIsUsedForMetaRegion(true);
internalArgs.setMetaRegionWithTransactions(true);
try {
this.idToType = cache.createVMRegion(REGION_NAME, regionAttrs,
internalArgs);
} catch (IOException ex) {
throw new PdxInitializationException("Could not create pdx registry", ex);
} catch (TimeoutException ex) {
throw new PdxInitializationException("Could not create pdx registry", ex);
} catch (RegionExistsException ex) {
throw new PdxInitializationException("Could not create pdx registry", ex);
} catch (ClassNotFoundException ex) {
throw new PdxInitializationException("Could not create pdx registry", ex);
}
//If there is anything in the id to type registry,
//we should validate our configuration now.
//And send those types to any existing gateways.
if(!getIdToType().isEmpty()) {
verifyConfiguration();
}
}
private DistributedLockService getLockService() {
if (this.dls != null) {
return this.dls;
}
synchronized (this.dlsLock) {
if (this.dls == null) {
try {
this.dls = DLockService.create(LOCK_SERVICE_NAME,
this.cache.getDistributedSystem(), true /* distributed */,
true /* destroyOnDisconnect */, true /* automateFreeResources */);
} catch (IllegalArgumentException e) {
this.dls = DistributedLockService
.getServiceNamed(LOCK_SERVICE_NAME);
if (this.dls == null) {
throw e;
}
}
}
return this.dls;
}
}
private static final String LOCK_NAME = "PDX_LOCK";
private int allocateTypeId() {
TXStateProxy currentState = suspendTX();
Region<Object, Object> r = getIdToType();
try {
//Find the next available type id.
do {
this.nextTypeId++;
if(this.nextTypeId == maxTypeId) {
throw new InternalGemFireError("Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is " + maxTypeId);
}
} while(r.get(nextTypeId) != null);
this.lastAllocatedTypeId = this.nextTypeId;
return this.nextTypeId;
} finally {
resumeTX(currentState);
}
}
private EnumId allocateEnumId() {
TXStateProxy currentState = suspendTX();
Region<Object, Object> r = getIdToType();
try {
//Find the next available type id.
do {
this.nextEnumId++;
if(this.nextEnumId == maxEnumId) {
throw new InternalGemFireError("Used up all of the PDX enum ids for this distributed system. The maximum number of PDX types is " + maxEnumId);
}
} while(r.get(new EnumId(nextEnumId)) != null);
this.lastAllocatedEnumId = this.nextEnumId;
return new EnumId(this.nextEnumId);
} finally {
resumeTX(currentState);
}
}
private void unlock() {
try {
DistributedLockService dls = getLockService();
dls.unlock(LOCK_NAME);
} catch(LockServiceDestroyedException e) {
// fix for bug 43574
cache.getCancelCriterion().checkCancelInProgress(e);
throw e;
}
}
private void lock() {
DistributedLockService dls = getLockService();
try {
if (!dls.lock(LOCK_NAME, -1, -1)) {
// this should be impossible
throw new InternalGemFireException("Could not obtain pdx lock");
}
} catch (LockServiceDestroyedException e) {
// fix for bug 43172
cache.getCancelCriterion().checkCancelInProgress(e);
throw e;
}
}
/**
* For bug #43690 we cannot use shared sockets when doing pdx type registration
*/
private boolean useUDPMessagingIfNecessary() {
boolean result = false;
InternalDistributedSystem sys = cache.getDistributedSystem();
if (sys != null && !sys.threadOwnsResources()) {
sys.getDistributionManager().forceUDPMessagingForCurrentThread();
result = true;
}
return result;
}
private void releaseUDPMessaging(boolean release) {
if (release) {
InternalDistributedSystem sys = cache.getDistributedSystem();
if (sys != null) {
sys.getDistributionManager().releaseUDPMessagingForCurrentThread();
}
}
}
private int lastAllocatedTypeId; // for unit tests
private int lastAllocatedEnumId; // for unit tests
/**
* Test hook that returns the most recently allocated type id
*
* @return the most recently allocated type id
*/
public int getLastAllocatedTypeId() {
verifyConfiguration();
return this.lastAllocatedTypeId;
}
/**
* Test hook that returns the most recently allocated enum id
*
* @return the most recently allocated enum id
*/
public int getLastAllocatedEnumId() {
verifyConfiguration();
return this.lastAllocatedEnumId;
}
public int defineType(PdxType newType) {
verifyConfiguration();
Integer existingId = typeToId.get(newType);
if(existingId != null) {
return existingId.intValue();
}
lock();
try {
int id = getExistingIdForType(newType);
if(id != -1) {
return id;
}
id = allocateTypeId();
newType.setTypeId(id);
updateIdToTypeRegion(newType);
typeToId.put(newType, Integer.valueOf(id));
//this.cache.getLogger().info("Defining: " + newType, new RuntimeException("STACK"));
return newType.getTypeId();
} finally {
unlock();
}
}
private void updateIdToTypeRegion(PdxType newType) {
updateRegion(newType.getTypeId(), newType);
}
private void updateIdToEnumRegion(EnumId id, EnumInfo ei) {
updateRegion(id, ei);
}
private void updateRegion(Object k, Object v) {
Region<Object, Object> r = getIdToType();
Cache c = (Cache)r.getRegionService();
checkDistributedTypeRegistryState();
TXManagerImpl txManager = (TXManagerImpl) c.getCacheTransactionManager();
TXStateProxy currentState = suspendTX();
boolean state = useUDPMessagingIfNecessary();
try {
//The loop might not be necessary because we're
//updating a replicated region inside a dlock,
//but just in case we'll make sure to retry the transaction.
int failureCount = 0;
while(true) {
txManager.begin();
try {
r.put(k, v);
txManager.commit();
return;
} catch(TransactionException e) {
// even put can now throw a TransactionException, rollback if required
if (txManager.exists()) {
txManager.rollback();
}
//Let's just make sure things don't get out of hand.
if(++failureCount > MAX_TRANSACTION_FAILURES) {
throw e;
}
}
}
} finally {
releaseUDPMessaging(state);
resumeTX(currentState);
}
}
private void checkDistributedTypeRegistryState() {
CheckTypeRegistryState.send(cache.getDistributionManager());
}
public PdxType getType(int typeId) {
verifyConfiguration();
TXStateProxy currentState = suspendTX();
try {
return (PdxType)getIdToType().get(typeId);
} finally {
resumeTX(currentState);
}
}
public void addRemoteType(int typeId, PdxType type) {
verifyConfiguration();
TXStateProxy currentState = suspendTX();
Region<Object, Object> r = getIdToType();
try {
if(!r.containsKey(typeId)) {
//This type could actually be for this distributed system,
//so we need to make sure the type is published while holding
//the distributed lock.
lock();
try {
r.put(typeId, type);
} finally {
unlock();
}
}
} finally {
resumeTX(currentState);
}
}
public void gatewaySenderStarted(GatewaySender gatewaySender) {
if(!typeRegistryInUse || this.idToType == null) {
return;
}
checkAllowed(true, hasPersistentRegions());
}
public void creatingPersistentRegion() {
//Anything is allowed until the registry is in use.
if(!typeRegistryInUse) {
return;
}
checkAllowed(hasGatewaySender(), true);
}
public boolean hasGatewaySender(){
Set<GatewaySender> sendersAndAsyncQueues = cache.getGatewaySenders();
Iterator<GatewaySender> itr = sendersAndAsyncQueues.iterator();
while(itr.hasNext()){
GatewaySender sender = itr.next();
if(AsyncEventQueueImpl.isAsyncEventQueue(sender.getId())){
itr.remove();
}
}
return !sendersAndAsyncQueues.isEmpty();
}
public void creatingPool() {
if(typeRegistryInUse) {
throw new PdxInitializationException("The PDX metadata has already been created as a peer metadata region. Please create your pools first");
}
}
void verifyConfiguration() {
if(typeRegistryInUse) {
return;
} else {
boolean hasPersistentRegions = hasPersistentRegions();
checkAllowed(hasGatewaySender(), hasPersistentRegions);
for(Pool pool : PoolManager.getAll().values()) {
if(!((PoolImpl) pool).isUsedByGateway()) {
throw new PdxInitializationException("The PDX metadata has already been " +
"created as a peer metadata region. " +
"Please use ClientCacheFactory to create clients.");
}
}
typeRegistryInUse = true;
}
}
public boolean hasPersistentRegions() {
Collection<DiskStoreImpl> diskStores = cache.listDiskStoresIncludingRegionOwned();
boolean hasPersistentRegions = false;
for(DiskStoreImpl store : diskStores) {
hasPersistentRegions |= store.hasPersistedData();
}
return hasPersistentRegions;
}
private void checkAllowed(boolean hasGatewaySender, boolean hasDiskStore) {
if(hasDiskStore && !cache.getPdxPersistent()) {
throw new PdxInitializationException("The PDX metadata must be persistent in a member that has persistent data. See CacheFactory.setPdxPersistent.");
}
int distributedSystemId = cache.getDistributedSystem().getDistributionManager().getDistributedSystemId();
if(hasGatewaySender && distributedSystemId == -1) {
throw new PdxInitializationException("When using PDX with a WAN gateway sender, you must set the distributed-system-id gemfire property for your distributed system. See the javadocs for DistributedSystem.");
}
}
/** Should be called holding the dlock */
private int getExistingIdForType(PdxType newType) {
TXStateProxy currentState = suspendTX();
try {
int result = -1;
for (Map.Entry<Object, Object> entry : getIdToType().entrySet()) {
Object v = entry.getValue();
Object k = entry.getKey();
if (k instanceof EnumId) {
EnumId id = (EnumId)k;
EnumInfo info = (EnumInfo)v;
enumToId.put(info, id);
} else {
PdxType foundType = (PdxType)v;
Integer id = (Integer)k;
typeToId.put(foundType, id);
if (foundType.equals(newType)) {
result = foundType.getTypeId();
}
}
}
return result;
} finally {
resumeTX(currentState);
}
}
/** Should be called holding the dlock */
private EnumId getExistingIdForEnum(EnumInfo ei) {
TXStateProxy currentState = suspendTX();
try {
EnumId result = null;
for (Map.Entry<Object, Object> entry : getIdToType().entrySet()) {
Object v = entry.getValue();
Object k = entry.getKey();
if (k instanceof EnumId) {
EnumId id = (EnumId)k;
EnumInfo info = (EnumInfo)v;
enumToId.put(info, id);
// TODO: do I need to keep track of multiple ids for the same enum?
if (ei.equals(info)) {
result = id;
}
} else {
typeToId.put((PdxType)v, (Integer)k);
}
}
return result;
} finally {
resumeTX(currentState);
}
}
private TXStateProxy suspendTX() {
Cache c = (Cache)getIdToType().getRegionService();
TXManagerImpl txManager = (TXManagerImpl) c.getCacheTransactionManager();
TXStateProxy currentState = txManager.internalSuspend();
return currentState;
}
private void resumeTX(TXStateProxy state) {
if(state != null) {
TXManagerImpl txManager = state.getTxMgr();
txManager.resume(state);
}
}
public int getEnumId(Enum<?> v) {
verifyConfiguration();
EnumInfo ei = new EnumInfo(v);
EnumId existingId = enumToId.get(ei);
if(existingId != null) {
return existingId.intValue();
}
lock();
try {
EnumId id = getExistingIdForEnum(ei);
if(id != null) {
return id.intValue();
}
id = allocateEnumId();
updateIdToEnumRegion(id, ei);
enumToId.put(ei, id);
return id.intValue();
} finally {
unlock();
}
}
public void addRemoteEnum(int id, EnumInfo enumInfo) {
verifyConfiguration();
TXStateProxy currentState = suspendTX();
EnumId enumId = new EnumId(id);
Region<Object, Object> r = getIdToType();
try {
if(!r.containsKey(enumId)) {
//This enum could actually be for this distributed system,
//so we need to make sure the enum is published while holding
//the distributed lock.
lock();
try {
r.put(enumId, enumInfo);
} finally {
unlock();
}
}
} finally {
resumeTX(currentState);
}
}
public int defineEnum(EnumInfo newInfo) {
verifyConfiguration();
EnumId existingId = enumToId.get(newInfo);
if(existingId != null) {
return existingId.intValue();
}
lock();
try {
EnumId id = getExistingIdForEnum(newInfo);
if(id != null) {
return id.intValue();
}
id = allocateEnumId();
updateIdToEnumRegion(id, newInfo);
enumToId.put(newInfo, id);
return id.intValue();
} finally {
unlock();
}
}
public EnumInfo getEnumById(int id) {
verifyConfiguration();
EnumId enumId = new EnumId(id);
TXStateProxy currentState = suspendTX();
try {
return (EnumInfo)getIdToType().get(enumId);
} finally {
resumeTX(currentState);
}
}
@Override
public Map<Integer, PdxType> types() {
// ugh, I don't think we can rely on the local map to contain all types
Map<Integer, PdxType> types = new HashMap<Integer, PdxType>();
for (Entry<Object, Object> type : getIdToType().entrySet()) {
Object id = type.getKey();
if (type.getValue() instanceof PdxType) {
types.put((Integer) id, (PdxType) type.getValue());
}
}
return types;
}
@Override
public Map<Integer, EnumInfo> enums() {
// ugh, I don't think we can rely on the local map to contain all types
Map<Integer, EnumInfo> enums = new HashMap<Integer, EnumInfo>();
for (Entry<Object, Object> type : getIdToType().entrySet()) {
Object id = type.getKey();
if (type.getValue() instanceof EnumInfo) {
enums.put(((EnumId) id).intValue(), (EnumInfo) type.getValue());
}
}
return enums;
}
/**
* adds a PdxType for a field to a {@code className => Set<PdxType>} map
*
* @param type
*/
private void updateClassToTypeMap(PdxType type) {
if(type != null){
synchronized (this.classToType){
if(type.getClassName().equals(JSONFormatter.JSON_CLASSNAME) )
return;//no need to include here
Set<PdxType> pdxTypeSet = this.classToType.get(type.getClassName());
if (pdxTypeSet == null) {
pdxTypeSet = new CopyOnWriteHashSet<PdxType>();
}
pdxTypeSet.add(type);
classToType.put(type.getClassName(), pdxTypeSet);
}
}
}
@Override
public PdxType getPdxTypeForField(String fieldName, String className) {
Set<PdxType> pdxTypes = classToType.get(className);
if (pdxTypes != null) {
for (PdxType pdxType : pdxTypes) {
if (pdxType.getPdxField(fieldName) != null) {
return pdxType;
}
}
}
return null;
}
/*
* For testing purpose
*/
public Map<String, Set<PdxType>> getClassToType() {
return classToType;
}
/*
* test hook
*/
@Override
public void testClearRegistry(){
idToType.clear();
enumToId.clear();
typeToId.clear();
}
@Override
public boolean isClient() {
return false;
}
@Override
public void addImportedType(int typeId, PdxType importedType) {
addRemoteType(typeId, importedType);
}
@Override
public void addImportedEnum(int id, EnumInfo importedInfo) {
addRemoteEnum(id, importedInfo);
}
public static int getPdxRegistrySize() {
GemFireCacheImpl cache = GemFireCacheImpl.getExisting();
if(cache == null) {
return 0;
}
TypeRegistry registry = cache.getPdxRegistry();
if(registry == null) {
return 0;
}
return registry.getLocalSize();
}
@Override
public int getLocalSize() {
return idToType.size();
}
}