blob: a84fb8de8ec462160d0e89b496931425765c6f47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache;
import static java.lang.Math.abs;
import static java.lang.System.currentTimeMillis;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
import static org.apache.geode.test.dunit.Wait.pause;
import java.io.IOException;
import java.util.Properties;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheTestCase;
import org.apache.geode.distributed.internal.DSClock;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category({ClientServerTest.class})
public class ClientServerTimeSyncDUnitTest extends JUnit4CacheTestCase {
public ClientServerTimeSyncDUnitTest() {
super();
}
@Ignore("Bug 52327")
@Test
public void testClientTimeAdvances() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // Server
VM vm1 = host.getVM(1); // Client
final String regionName = "testRegion";
final long TEST_OFFSET = 10000;
ClientCache cache = null;
try {
final int serverPort =
(Integer) vm0.invoke(new SerializableCallable("Start server with a region") {
@Override
public Object call() {
Cache cache = getCache();
cache.createRegionFactory(REPLICATE).create(regionName);
getLogWriter().info("Done creating region, now creating CacheServer");
CacheServer server = null;
try {
server = cache.addCacheServer();
server.setPort(getRandomAvailableTCPPort());
server.start();
} catch (IOException e) {
fail("Starting cache server failed.", e);
}
// now set an artificial time offset for the test
basicGetSystem().getClock().setCacheTimeOffset(null, TEST_OFFSET, true);
getLogWriter()
.info("Done creating and starting CacheServer on port " + server.getPort());
return server.getPort();
}
});
final String hostName = getServerHostName(vm0.getHost());
// Start client with proxy region and register interest
disconnectFromDS();
Properties props = new Properties();
props.setProperty(LOCATORS, "");
props = getSystem(props).getProperties();
cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
.addPoolServer(hostName, serverPort).setPoolPingInterval(5000).create();
Region proxyRegion =
cache.createClientRegionFactory(CACHING_PROXY).create(regionName);
proxyRegion.registerInterestRegex(".*");
proxyRegion.put("testkey", "testValue1");
final DSClock clock = ((GemFireCacheImpl) cache).getSystem().getClock();
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
long clientTimeOffset = clock.getCacheTimeOffset();
getLogWriter()
.info("Client node's new time offset is: " + clientTimeOffset);
return clientTimeOffset >= TEST_OFFSET;
}
@Override
public String description() {
return "Waiting for cacheTimeOffset to be non-zero. PingOp should have set it to something";
}
};
GeodeAwaitility.await().untilAsserted(wc);
} finally {
cache.close();
vm1.invoke(() -> CacheTestCase.disconnectFromDS());
}
}
@Ignore("not yet implemented")
@Test
public void testClientTimeSlowsDown() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // Server
VM vm1 = host.getVM(1); // Client
final String regionName = "testRegion";
final long TEST_OFFSET = 10000;
ClientCache cache = null;
try {
final int serverPort =
(Integer) vm0.invoke(new SerializableCallable("Start server with a region") {
@Override
public Object call() {
Cache cache = getCache();
cache.createRegionFactory(REPLICATE).create(regionName);
getLogWriter().info("Done creating region, now creating CacheServer");
CacheServer server = null;
try {
server = cache.addCacheServer();
server.setPort(getRandomAvailableTCPPort());
server.start();
} catch (IOException e) {
fail("Starting cache server failed.", e);
}
// now set an artificial time offset for the test
basicGetSystem().getClock().setCacheTimeOffset(null, -TEST_OFFSET, true);
getLogWriter()
.info("Done creating and starting CacheServer on port " + server.getPort());
return server.getPort();
}
});
pause((int) TEST_OFFSET); // let cacheTimeMillis consume the time offset
final String hostName = getServerHostName(vm0.getHost());
// Start client with proxy region and register interest
disconnectFromDS();
Properties props = new Properties();
props.setProperty(LOCATORS, "");
props = getSystem(props).getProperties();
cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
.addPoolServer(hostName, serverPort).setPoolPingInterval(5000).create();
Region proxyRegion =
cache.createClientRegionFactory(CACHING_PROXY).create(regionName);
proxyRegion.registerInterestRegex(".*");
proxyRegion.put("testkey", "testValue1");
final DSClock clock = ((GemFireCacheImpl) cache).getSystem().getClock();
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
long clientTimeOffset = clock.getCacheTimeOffset();
getLogWriter()
.info("Client node's new time offset is: " + clientTimeOffset);
if (clientTimeOffset >= 0) {
return false;
}
long cacheTime = clock.cacheTimeMillis();
return abs(currentTimeMillis() - (cacheTime - clientTimeOffset)) < 5;
}
@Override
public String description() {
return "Waiting for cacheTimeOffset to be negative and cacheTimeMillis to stabilize";
}
};
GeodeAwaitility.await().untilAsserted(wc);
} finally {
cache.close();
vm1.invoke(() -> CacheTestCase.disconnectFromDS());
}
}
}