blob: edee3b47ee0fe9b18b11a5413845c4012636e1c6 [file] [log] [blame]
/**
*
*/
package com.gemstone.org.jgroups.protocols;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.internal.DSClock;
import com.gemstone.gemfire.distributed.internal.DSClock.DSClockTestHook;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMember;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.org.jgroups.Event;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.Message;
import com.gemstone.org.jgroups.protocols.GemFireTimeSync.GFTimeSyncHeader;
import com.gemstone.org.jgroups.stack.IpAddress;
import com.gemstone.org.jgroups.stack.ProtocolStack;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.VM;
/**
* @author shobhit
*
*/
public class CacheTimeSlowDownDUnitTest extends DistributedTestCase {
/**
*
*/
private static final long serialVersionUID = 6040213401971040968L;
/**
* @param name
*/
public CacheTimeSlowDownDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
if (!logPerTest) {
// always disconnect from DS at the end of this test so it
// doesn't leave a tampered one for the next test
disconnectAllFromDS();
}
}
public void testCacheClockSlowDownUsingTimeTask() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
disconnectAllFromDS();
int port = getDUnitLocatorPort();
// Check the new member's clock offset received from locator (co-ordinator)
final long joinTimeOffset = (Long) vm0.invoke(new SerializableCallable("Check join time offset") {
@Override
public Object call() throws CacheException {
InternalDistributedSystem system = getSystem();
// Just to make sure the cache time is started and accepts dm timer task.
Cache cache = CacheFactory.create(system);
DistributionManager dm = (DistributionManager) system.getDistributionManager();
DSClock clock = dm.getSystem().getClock();
clock.setTestHook(new DistManagerTestHook());
return clock.getCacheTimeOffset();
}
});
// Send a custom GemfireTimeSync Message to reduce the offset and in turn slow down the cache time.
VM locator = Host.getLocator();
locator.invoke(new CacheSerializableRunnable("Send custom time sync message") {
@Override
public void run2() {
InternalDistributedSystem system = (InternalDistributedSystem) InternalDistributedSystem.getAnyInstance();
JGroupMembershipManager jgmm = MembershipManagerHelper.getMembershipManager(system);
JChannel jchannel = MembershipManagerHelper.getJChannel(system);
NetMember locMem = system.getDistributedMember().getNetMember();
// Check if protocol statck has GemfireTimeSync protocol in it in
// correct position.
if (jchannel != null && jchannel.isConnected()) {
ProtocolStack pstack = jchannel.getProtocolStack();
GemFireTimeSync gts = (GemFireTimeSync) pstack.findProtocol("GemFireTimeSync");
NetView nw = jgmm.getView();
IpAddress memAdd = null;
for (Object member : nw) {
InternalDistributedMember iMem = (InternalDistributedMember) member;
NetMember netMem = iMem.getNetMember();
if (!netMem.equals(locMem)) {
memAdd = ((JGroupMember)netMem).getAddress();
}
}
Message offsetMessage = new Message();
offsetMessage.setDest(memAdd);
offsetMessage.isHighPriority = true;
offsetMessage.putHeader("GemFireTimeSync", new GFTimeSyncHeader(0,
/* GFTimeSyncHeader.OP_TIME_OFFSET */(byte) 2, (joinTimeOffset - 19)));
gts.passDown(new Event(Event.MSG, offsetMessage));
getLogWriter().info("Sent a GemfireTimeSync message to apply lower offset: " + (joinTimeOffset - 19) + "ms");
}
}
});
// Check if member received the message and started a timer task or not.
long newTimeOffset = (Long) vm0.invoke(new SerializableCallable("Verify timer task and final offset") {
@Override
public Object call() throws CacheException {
InternalDistributedSystem system = getSystem();
DSClock clock = system.getClock();
DSClockTestHook testHook = clock.getTestHook();
assertNotNull(testHook);
while(testHook.getInformation("TimerTaskCancelled") == null) {
pause(100);
}
boolean isCancelled = (Boolean) testHook.getInformation("TimerTaskCancelled");
List<Long> cacheTimes = (List) testHook.getInformation("CacheTime");
List<Long> awaitedTimes = (List) testHook.getInformation("AwaitedTime");
assertTrue(isCancelled);
assertEquals(cacheTimes.size(), awaitedTimes.size());
for (int i=0; i<cacheTimes.size()-1; i++) { //Last one wont satisfy condition.
assertTrue((cacheTimes.get(i) - awaitedTimes.get(i)) >= 0);
}
clock.setTestHook(null);
return clock.getCacheTimeOffset();
}
});
assertEquals((joinTimeOffset - 19), newTimeOffset);
}
public class DistManagerTestHook implements DSClockTestHook {
Map info = new HashMap();
@Override
public void suspendAtBreakPoint(int breakPoint) {
switch (breakPoint) {
case 1:
if (info.get("CacheTime") == null) {
info.put("CacheTime", new ArrayList());
}
if (info.get("AwaitedTime") == null) {
info.put("AwaitedTime", new ArrayList());
}
break;
case 2:
break;
case 3:
break;
default:
}
}
@Override
public void addInformation(Object key, Object value) {
String sKey = (String)key;
if ("CacheTime".equals(key)) {
List cacheTimes = (List)info.get(key);
cacheTimes.add(value);
} else if ("AwaitedTime".equals(key)) {
List awaitedTimes = (List)info.get(key);
awaitedTimes.add(value);
} else {
info.put(key, value);
}
}
@Override
public Object getInformation(Object key) {
return info.get(key);
}
}
}