blob: 75dfa93b9e15380dface499167c9672567e74a80 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
import com.gemstone.gemfire.cache.hdfs.internal.SortedListForAsyncQueueJUnitTest.KeyValue;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
import com.gemstone.gemfire.test.junit.categories.HoplogTest;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
@SuppressWarnings("rawtypes")
@Category({IntegrationTest.class, HoplogTest.class})
public class HDFSEntriesSetJUnitTest extends TestCase {
private GemFireCacheImpl cache;
private HDFSStoreImpl store;
private PartitionedRegion region;
private BucketRegion bucket;
private HDFSParallelGatewaySenderQueue queue;
private HDFSBucketRegionQueue brq;
private HoplogOrganizer hdfs;
public void setUp() throws Exception {
System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
cache = (GemFireCacheImpl) new CacheFactory()
.set("mcast-port", "0")
.set("log-level", "info")
.create();
HDFSStoreFactory hsf = this.cache.createHDFSStoreFactory();
hsf.setHomeDir("hoplogs");
store = (HDFSStoreImpl) hsf.create("test");
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(1);
RegionFactory rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
region = (PartitionedRegion) rf.setHDFSStoreName("test").setPartitionAttributes(paf.create()).create("test");
// prime the region so buckets get created
region.put("test", "test");
GatewaySenderAttributes g = new GatewaySenderAttributes();
g.isHDFSQueue = true;
g.id = "HDFSEntriesSetJUnitTest_Queue";
ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(cache, g);
Set<Region> set = new HashSet<Region>();
set.add(region);
queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
brq = (HDFSBucketRegionQueue)((PartitionedRegion) queue.getRegion()).getDataStore().getLocalBucketById(0);
bucket = region.getDataStore().getLocalBucketById(0);
HdfsRegionManager mgr = HDFSRegionDirector.getInstance().manageRegion(region, "test", null);
hdfs = mgr.<SortedHoplogPersistedEvent>create(0);
AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
}
public void tearDown() throws Exception {
store.getFileSystem().delete(new Path("hoplogs"), true);
hdfs.close();
cache.close();
}
public void testEmptyIterator() throws Exception {
checkIteration(Collections.<String>emptyList(), new KeyValue[] { }, new KeyValue[] { });
}
public void testQueueOnlyIterator() throws Exception {
KeyValue[] qvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K1", "1"),
new KeyValue("K2", "2"),
new KeyValue("K3", "3"),
new KeyValue("K4", "4")
};
checkIteration(getExpected(), qvals, new KeyValue[] { });
}
public void testHdfsOnlyIterator() throws Exception {
KeyValue[] hvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K1", "1"),
new KeyValue("K2", "2"),
new KeyValue("K3", "3"),
new KeyValue("K4", "4")
};
checkIteration(getExpected(), new KeyValue[] { }, hvals);
}
public void testUnevenIterator() throws Exception {
KeyValue[] qvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K2", "2"),
};
KeyValue[] hvals = new KeyValue[] {
new KeyValue("K1", "1"),
new KeyValue("K3", "3"),
new KeyValue("K4", "4")
};
checkIteration(getExpected(), qvals, hvals);
}
public void testEitherOrIterator() throws Exception {
KeyValue[] qvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K2", "2"),
new KeyValue("K4", "4")
};
KeyValue[] hvals = new KeyValue[] {
new KeyValue("K1", "1"),
new KeyValue("K3", "3")
};
checkIteration(getExpected(), qvals, hvals);
}
public void testDuplicateIterator() throws Exception {
KeyValue[] qvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K1", "1"),
new KeyValue("K2", "2"),
new KeyValue("K3", "3"),
new KeyValue("K4", "4"),
new KeyValue("K4", "4")
};
KeyValue[] hvals = new KeyValue[] {
new KeyValue("K0", "0"),
new KeyValue("K1", "1"),
new KeyValue("K2", "2"),
new KeyValue("K3", "3"),
new KeyValue("K4", "4"),
new KeyValue("K4", "4")
};
checkIteration(getExpected(), qvals, hvals);
}
private List<String> getExpected() {
List<String> expected = new ArrayList<String>();
expected.add("0");
expected.add("1");
expected.add("2");
expected.add("3");
expected.add("4");
return expected;
}
private void checkIteration(List<String> expected, KeyValue[] qvals, KeyValue[] hvals)
throws Exception {
int seq = 0;
List<PersistedEventImpl> evts = new ArrayList<PersistedEventImpl>();
for (KeyValue kv : hvals) {
evts.add(new SortedHDFSQueuePersistedEvent(getNewEvent(kv.key, kv.value, seq++)));
}
hdfs.flush(evts.iterator(), evts.size());
for (KeyValue kv : qvals) {
queue.put(getNewEvent(kv.key, kv.value, seq++));
}
List<String> actual = new ArrayList<String>();
Iterator vals = new HDFSEntriesSet(bucket, brq, hdfs, IteratorType.VALUES, null).iterator();
while (vals.hasNext()) {
Object val = vals.next();
if(val instanceof CachedDeserializable) {
val = ((CachedDeserializable) val).getDeserializedForReading();
}
actual.add((String) val);
}
assertEquals(expected, actual);
}
private HDFSGatewayEventImpl getNewEvent(Object key, Object value, long seq) throws Exception {
EntryEventImpl evt = EntryEventImpl.create(region, Operation.CREATE,
key, value, null, false, (DistributedMember) cache.getMyId());
evt.setEventId(new EventID(cache.getDistributedSystem()));
HDFSGatewayEventImpl event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, evt, null, true, 0);
event.setShadowKey(seq);
return event;
}
}