blob: 242923bacccec6ca270ee8b1bd2f1f81830c274c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
import com.gemstone.gemfire.cache.hdfs.HDFSStore;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
* Contains utility functions
*
* @author Hemant Bhanawat
*
*/
public class HDFSIntegrationUtil {
public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache,
boolean writeOnly, String regionPath)
{
// Create default event attributes
HDFSEventQueueAttributesFactory hdfsqueueFactory = new HDFSEventQueueAttributesFactory();
return createAsyncQueueForHDFS(cache,
regionPath, writeOnly, hdfsqueueFactory.create());
}
public static AsyncEventQueue createAsyncQueueForHDFS(Cache cache,
String regionPath, boolean writeOnly, HDFSEventQueueAttributes eventAttribs)
{
LogWriterI18n logger = cache.getLoggerI18n();
String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setBatchSize(eventAttribs.getBatchSizeMB());
factory.setPersistent(eventAttribs.isPersistent());
factory.setDiskStoreName(eventAttribs.getDiskStoreName());
factory.setMaximumQueueMemory(eventAttribs.getMaximumQueueMemory());
factory.setBatchTimeInterval(eventAttribs.getBatchTimeInterval());
factory.setDiskSynchronous(eventAttribs.isDiskSynchronous());
factory.setDiskSynchronous(eventAttribs.isDiskSynchronous());
factory.setDispatcherThreads(eventAttribs.getDispatcherThreads());
factory.setParallel(true);
factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
((AsyncEventQueueFactoryImpl)factory).setBucketSorted(!writeOnly);
((AsyncEventQueueFactoryImpl)factory).setIsHDFSQueue(true);
AsyncEventQueue asyncQ = null;
if (!writeOnly)
asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
else
asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: " + asyncQ.getDiskStoreName() +
". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted: " + !writeOnly) ;
return asyncQ;
}
public static void createAndAddAsyncQueue(String regionPath,
RegionAttributes regionAttributes, Cache cache) {
if(!regionAttributes.getDataPolicy().withHDFS()) {
return;
}
String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null
&& !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
HDFSStore store = ((GemFireCacheImpl)cache).findHDFSStore(regionAttributes.getHDFSStoreName());
if (store == null) {
throw new IllegalStateException(
LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
.toLocalizedString(regionAttributes.getHDFSStoreName()));
}
HDFSEventQueueAttributes queueAttrs = store.getHDFSEventQueueAttributes();
if(queueAttrs == null) {
// no async queue is specified for region with a HDFS store. Create a async queue with default
// properties and set the bucketsorted=true.
HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS(cache, regionAttributes.getHDFSWriteOnly(), leaderRegionPath);
}
else {
HDFSIntegrationUtil.createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), queueAttrs);
}
}
}
}
private static String getLeaderRegionPath(String regionPath,
RegionAttributes regionAttributes, Cache cache) {
String colocated;
while(regionAttributes.getPartitionAttributes() != null
&& (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
// Do not waitOnInitialization() for PR
GemFireCacheImpl gfc = (GemFireCacheImpl)cache;
Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
if(colocatedRegion == null) {
Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
}
regionAttributes = colocatedRegion.getAttributes();
regionPath = colocatedRegion.getFullPath();
}
return regionPath;
}
}