blob: 626dd1e775f8ee08cbeaf8d7ea99d5327b23ec02 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.functions;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.Assert;
import dunit.DistributedTestCase;
import dunit.DistributedTestCase.WaitCriterion;
@SuppressWarnings("serial")
public class DistributedRegionFunction extends FunctionAdapter {
@Override
public void execute(FunctionContext context) {
RegionFunctionContext rcontext = (RegionFunctionContext)context;
Region<Object, Object> region = rcontext.getDataSet();
InternalDistributedSystem sys = InternalDistributedSystem
.getConnectedInstance();
sys.getLogWriter().fine(
"DistributedRegionFunction#execute( " + rcontext + " )");
Assert.assertTrue(region.getAttributes().getDataPolicy().withStorage());
Assert.assertTrue(region.getAttributes()
.getDataPolicy() != DataPolicy.NORMAL);
Assert.assertTrue(rcontext.getFilter().size() == 20);
long startTime = System.currentTimeMillis();
// Boolean.TRUE dummy argument indicates that CacheClose has to be done from
// the body itself
if (Boolean.TRUE.equals(rcontext.getArguments())) {
// do not close cache in retry
if (!rcontext.isPossibleDuplicate()) {
sys.disconnect();
throw new CacheClosedException("Throwing CacheClosedException "
+ "to simulate failover during function exception");
}
}
else {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
return false;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 12000, 500, false);
}
long endTime = System.currentTimeMillis();
// intentionally doing region operation to cause cacheClosedException
region.put("execKey-201", new Integer(201));
if (rcontext.isPossibleDuplicate()) { // Below operation is done when the
// function is reexecuted
region.put("execKey-202", new Integer(202));
region.put("execKey-203", new Integer(203));
}
sys.getLogWriter().fine(
"Time wait for Function Execution = " + (endTime - startTime));
for (int i = 0; i < 5000; i++) {
context.getResultSender().sendResult(Boolean.TRUE);
}
context.getResultSender().lastResult(Boolean.TRUE);
}
@Override
public String getId() {
return "DistributedRegionFunction";
}
@Override
public boolean isHA() {
return true;
}
}