blob: 38145d197b9ef4ab12359c9ab6aaf244cb70b552 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.management.bean.stats;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHDFSQueuePersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.management.ManagementService;
import com.gemstone.gemfire.management.RegionMXBean;
import com.gemstone.gemfire.management.internal.ManagementConstants;
import com.gemstone.gemfire.test.junit.categories.HoplogTest;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest
;
/**
* Test for verifying HDFS related MBean attributes
* @author rishim
*
*/
@Category({IntegrationTest.class, HoplogTest.class})
public class HDFSRegionMBeanAttributeJUnitTest extends TestCase {
public static final String HDFS_STORE_NAME = "HDFSMBeanJUnitTestStore";
public static final String REGION_NAME = "HDFSMBeanJUnitTest_Region";
protected Path testDataDir;
protected Cache cache;
protected HDFSStoreFactory hsf;
protected HDFSStoreImpl hdfsStore;
protected Region<Object, Object> region;
SortedOplogStatistics stats;
HFileStoreStatistics storeStats;
BlockCache blockCache;
@Override
protected void setUp() throws Exception {
super.setUp();
System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
testDataDir = new Path("test-case");
cache = createCache();
configureHdfsStoreFactory();
hdfsStore = (HDFSStoreImpl) hsf.create(HDFS_STORE_NAME);
RegionFactory<Object, Object> regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
// regionfactory.setCompressionCodec("Some");
PartitionAttributesFactory fac = new PartitionAttributesFactory();
fac.setTotalNumBuckets(10);
regionfactory.setPartitionAttributes(fac.create());
region = regionfactory.create(REGION_NAME);
}
protected void configureHdfsStoreFactory() throws Exception {
hsf = this.cache.createHDFSStoreFactory();
hsf.setHomeDir(testDataDir.toString());
}
protected Cache createCache() {
CacheFactory cf = new CacheFactory().set("mcast-port", "0").set("log-level", "info");
cache = cf.create();
return cache;
}
@Override
protected void tearDown() throws Exception {
hdfsStore.getFileSystem().delete(testDataDir, true);
cache.close();
super.tearDown();
}
public void testStoreUsageStats() throws Exception {
PartitionedRegion parRegion = (PartitionedRegion)region;
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 100; i++) {
String key = ("key-" + (i * 100 + i));
String value = ("value-" + System.nanoTime());
parRegion.put(key, value);
items.add(new TestEvent(key, value));
}
//Dont want to create
Set<BucketRegion> localPrimaryBucketRegions = parRegion.getDataStore().getAllLocalPrimaryBucketRegions();
BucketRegion flushingBucket= localPrimaryBucketRegions.iterator().next();
HoplogOrganizer hoplogOrganizer = getOrganizer(parRegion,flushingBucket.getId());
hoplogOrganizer.flush(items.iterator(), 100);
GemFireCacheImpl cache = GemFireCacheImpl.getExisting();
ManagementService service = ManagementService.getExistingManagementService(cache);
RegionMXBean bean = service.getLocalRegionMBean(region.getFullPath());
//assertTrue(bean.getEntryCount() == ManagementConstants.ZERO);
assertTrue(bean.getEntrySize() == ManagementConstants.NOT_AVAILABLE_LONG);
assertTrue(0 < bean.getDiskUsage());
}
private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
if (br == null) {
// got rebalanced or something
throw new BucketMovedException("Bucket region is no longer available. BucketId: " +
bucketId + " RegionPath: " + region.getFullPath());
}
return br.getHoplogOrganizer();
}
public static class TestEvent extends SortedHDFSQueuePersistedEvent implements Serializable {
private static final long serialVersionUID = 1L;
Object key;
public TestEvent(String k, String v) throws Exception {
this(k, v, Operation.PUT_IF_ABSENT);
}
public TestEvent(String k, String v, Operation op) throws Exception {
super(v, op, (byte) 0x02, false, new DiskVersionTag(), BlobHelper.serializeToBlob(k), 0);
this.key = k;
}
}
}