| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.distributed; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.GemFireConfigException; |
| import com.gemstone.gemfire.SystemConnectException; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.SocketCreator; |
| import com.gemstone.gemfire.distributed.internal.*; |
| import com.gemstone.gemfire.cache.*; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.cache30.CacheSerializableRunnable; |
| |
| import dunit.*; |
| |
| import java.net.Inet4Address; |
| import java.net.Inet6Address; |
| import java.net.InetAddress; |
| import java.net.NetworkInterface; |
| import java.util.*; |
| import java.util.concurrent.TimeoutException; |
| |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager; |
| import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper; |
| import com.gemstone.org.jgroups.protocols.pbcast.ClientGmsImpl; |
| import com.gemstone.org.jgroups.util.ExternalStrings; |
| |
| /** |
| * Tests the functionality of the {@link DistributedSystem} class. |
| * |
| * @see InternalDistributedSystemJUnitTest |
| * |
| * @author David Whitlock |
| */ |
| public class DistributedSystemDUnitTest extends DistributedTestCase { |
| |
| public DistributedSystemDUnitTest(String name) { |
| super(name); |
| } |
| |
| public void setUp() throws Exception { |
| super.setUp(); |
| disconnectAllFromDS(); |
| } |
| |
| //////// Test methods |
| |
| /** |
| * ensure that waitForMemberDeparture correctly flushes the serial message queue for |
| * the given member |
| */ |
| public void testWaitForDeparture() throws Exception { |
| disconnectAllFromDS(); |
| Properties p = getDistributedSystemProperties(); |
| p.put(DistributionConfig.LOCATORS_NAME, ""); |
| p.put(DistributionConfig.MCAST_PORT_NAME, ""+AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS)); |
| p.put(DistributionConfig.DISABLE_TCP_NAME, "true"); |
| InternalDistributedSystem ds = (InternalDistributedSystem)DistributedSystem.connect(p); |
| try { |
| // construct a member ID that will represent a departed member |
| InternalDistributedMember mbr = new InternalDistributedMember("localhost", 12345, "", ""); |
| final DistributionManager mgr = (DistributionManager)ds.getDistributionManager(); |
| // schedule a message in order to create a queue for the fake member |
| final FakeMessage msg = new FakeMessage(null); |
| mgr.getExecutor(DistributionManager.SERIAL_EXECUTOR, mbr).execute(new SizeableRunnable(100) { |
| public void run() { |
| msg.doAction(mgr, false); |
| } |
| public String toString() { |
| return "Processing fake message"; |
| } |
| }); |
| try { |
| assertTrue("expected the serial queue to be flushed", mgr.getMembershipManager().waitForDeparture(mbr)); |
| } catch (InterruptedException e) { |
| fail("interrupted"); |
| } catch (TimeoutException e) { |
| fail("timed out - increase this test's member-timeout setting"); |
| } |
| } finally { |
| ds.disconnect(); |
| } |
| } |
| |
| static class FakeMessage extends SerialDistributionMessage { |
| volatile boolean[] blocked; |
| volatile boolean processed; |
| |
| FakeMessage(boolean[] blocked) { |
| this.blocked = blocked; |
| } |
| public void doAction(DistributionManager dm, boolean block) { |
| processed = true; |
| if (block) { |
| synchronized(blocked) { |
| blocked[0] = true; |
| blocked.notify(); |
| try { |
| blocked.wait(60000); |
| } catch (InterruptedException e) {} |
| } |
| } |
| } |
| public int getDSFID() { |
| return 0; // never serialized |
| } |
| protected void process(DistributionManager dm) { |
| // this is never called |
| } |
| public String toString() { |
| return "FakeMessage(blocking="+(blocked!=null)+")"; |
| } |
| } |
| |
| /** |
| * Tests that we can get a DistributedSystem with the same |
| * configuration twice. |
| */ |
| public void testGetSameSystemTwice() { |
| Properties config = new Properties(); |
| |
| // int unusedPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| // config.setProperty("mcast-port", String.valueOf(unusedPort)); |
| // a loner is all this test needs |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| // set a flow-control property for the test (bug 37562) |
| config.setProperty("mcast-flow-control", "3000000,0.20,3000"); |
| |
| DistributedSystem system1 = DistributedSystem.connect(config); |
| DistributedSystem system2 = DistributedSystem.connect(config); |
| assertSame(system1, system2); |
| system1.disconnect(); |
| } |
| |
| /** |
| * Tests that getting a <code>DistributedSystem</code> with a |
| * different configuration after one has already been obtained |
| * throws an exception. |
| */ |
| public void testGetDifferentSystem() { |
| Properties config = new Properties(); |
| |
| // int unusedPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| // config.setProperty("mcast-port", String.valueOf(unusedPort)); |
| // a loner is all this test needs |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| config.setProperty("mcast-flow-control", "3000000,0.20,3000"); |
| |
| |
| DistributedSystem system1 = DistributedSystem.connect(config); |
| config.setProperty("mcast-address", "224.0.0.1"); |
| try { |
| DistributedSystem.connect(config); |
| if (System.getProperty("gemfire.mcast-address") == null) { |
| fail("Should have thrown an IllegalStateException"); |
| } |
| } |
| catch (IllegalStateException ex) { |
| // pass... |
| } |
| finally { |
| system1.disconnect(); |
| } |
| } |
| |
| /** |
| * Tests getting a system with a different configuration after |
| * another system has been closed. |
| */ |
| public void testGetDifferentSystemAfterClose() { |
| Properties config = new Properties(); |
| |
| // int unusedPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| // config.setProperty("mcast-port", String.valueOf(unusedPort)); |
| // a loner is all this test needs |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| |
| DistributedSystem system1 = DistributedSystem.connect(config); |
| system1.disconnect(); |
| int time = DistributionConfig.DEFAULT_ACK_WAIT_THRESHOLD + 17; |
| config.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, |
| String.valueOf(time)); |
| DistributedSystem system2 = DistributedSystem.connect(config); |
| system2.disconnect(); |
| } |
| |
| |
| public void testGetProperties() { |
| Properties config = new Properties(); |
| |
| // int unusedPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| // config.setProperty("mcast-port", String.valueOf(unusedPort)); |
| // a loner is all this test needs |
| int unusedPort = 0; |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| |
| DistributedSystem system1 = DistributedSystem.connect(config); |
| |
| assertTrue(config != system1.getProperties()); |
| assertEquals(unusedPort, Integer.parseInt(system1.getProperties().getProperty("mcast-port"))); |
| |
| system1.disconnect(); |
| |
| assertTrue(config != system1.getProperties()); |
| assertEquals(unusedPort, Integer.parseInt(system1.getProperties().getProperty("mcast-port"))); |
| } |
| |
| |
| public void testIsolatedDistributedSystem() throws Exception { |
| Properties config = new Properties(); |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| try { |
| // make sure isolated distributed system can still create a cache and region |
| Cache cache = CacheFactory.create(getSystem()); |
| Region r = cache.createRegion(getUniqueName(), new AttributesFactory().create()); |
| r.put("test", "value"); |
| assertEquals("value", r.get("test")); |
| } finally { |
| getSystem().disconnect(); |
| } |
| } |
| |
| |
| /** test the ability to set the port used to listen for tcp/ip connections */ |
| public void testSpecificTcpPort() throws Exception { |
| Properties config = new Properties(); |
| int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| int tcpPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| config.setProperty("mcast-port", String.valueOf(mcastPort)); |
| config.setProperty("locators", ""); |
| config.setProperty("tcp-port", String.valueOf(tcpPort)); |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| DistributionManager dm = (DistributionManager)system.getDistributionManager(); |
| JGroupMembershipManager mgr = (JGroupMembershipManager)dm.getMembershipManager(); |
| int actualPort = mgr.getDirectChannelPort(); |
| system.disconnect(); |
| assertEquals(tcpPort, actualPort); |
| } |
| |
| /** test that loopback cannot be used as a bind address when a locator w/o a bind address is being used */ |
| public void testLoopbackNotAllowed() throws Exception { |
| // DISABLED for bug #49926 |
| InetAddress loopback = null; |
| for (Enumeration<NetworkInterface> it = NetworkInterface.getNetworkInterfaces(); it.hasMoreElements(); ) { |
| NetworkInterface nif = it.nextElement(); |
| for (Enumeration<InetAddress> ait = nif.getInetAddresses(); ait.hasMoreElements(); ) { |
| InetAddress a = ait.nextElement(); |
| Class theClass = SocketCreator.getLocalHost() instanceof Inet4Address? Inet4Address.class : Inet6Address.class; |
| if (a.isLoopbackAddress() && (a.getClass().isAssignableFrom(theClass))) { |
| loopback = a; |
| break; |
| } |
| } |
| } |
| if (loopback != null) { |
| Properties config = new Properties(); |
| config.put(DistributionConfig.MCAST_PORT_NAME, "0"); |
| String locators = InetAddress.getLocalHost().getHostName()+":"+getDUnitLocatorPort(); |
| config.put(DistributionConfig.LOCATORS_NAME, locators); |
| config.setProperty(DistributionConfig.BIND_ADDRESS_NAME, loopback.getHostAddress()); |
| getLogWriter().info("attempting to connect with " + loopback +" and locators=" + locators); |
| try { |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| system.disconnect(); |
| fail("expected a configuration exception disallowing use of loopback address"); |
| } catch (GemFireConfigException e) { |
| // expected |
| } catch (DistributionException e) { |
| // expected |
| } |
| } |
| } |
| |
| /** test jgroups handling of a request to retransmit a large message */ |
| public void testBug43652() throws Exception { |
| DistributedSystem sys = getSystem(); |
| JGroupMembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys); |
| boolean result = mgr.getNakAck().testRetransmitLargeMessage(new byte[75000]); |
| assertTrue(result); |
| sys.disconnect(); |
| } |
| |
| public void testUDPPortRange() throws Exception { |
| Properties config = new Properties(); |
| int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| int unicastPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| config.setProperty("mcast-port", String.valueOf(mcastPort)); |
| config.setProperty("locators", ""); |
| // Minimum 3 ports required in range for UDP, FD_SOCK and TcpConduit. |
| config.setProperty(DistributionConfig.MEMBERSHIP_PORT_RANGE_NAME, |
| ""+unicastPort+"-"+(unicastPort+2)); |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| DistributionManager dm = (DistributionManager)system.getDistributionManager(); |
| InternalDistributedMember idm = dm.getDistributionManagerId(); |
| system.disconnect(); |
| assertTrue(unicastPort <= idm.getPort() && idm.getPort() <= unicastPort+2); |
| assertTrue(unicastPort <= idm.getPort() && idm.getDirectChannelPort() <= unicastPort+2); |
| } |
| |
| public void testMembershipPortRange() throws Exception { |
| Properties config = new Properties(); |
| int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| int unicastPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| config.setProperty("mcast-port", String.valueOf(mcastPort)); |
| config.setProperty("locators", ""); |
| config.setProperty(DistributionConfig.MEMBERSHIP_PORT_RANGE_NAME, |
| ""+unicastPort+"-"+(unicastPort+1)); |
| try { |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| } catch (Exception e) { |
| assertTrue("The exception must be IllegalArgumentException", e instanceof IllegalArgumentException); |
| return; |
| } |
| fail("IllegalArgumentException must have been thrown by DistributedSystem.connect() as port-range: " |
| + config.getProperty(DistributionConfig.MEMBERSHIP_PORT_RANGE_NAME) |
| + " must have at least 3 values in range"); |
| } |
| |
| public void testMembershipPortRangeWithExactThreeValues() throws Exception { |
| Properties config = new Properties(); |
| int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| config.setProperty("mcast-port", String.valueOf(mcastPort)); |
| config.setProperty("locators", ""); |
| config.setProperty(DistributionConfig.MEMBERSHIP_PORT_RANGE_NAME, "" |
| + (DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1] - 2) + "-" |
| + (DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1])); |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| Cache cache = CacheFactory.create(system); |
| cache.addCacheServer(); |
| DistributionManager dm = (DistributionManager) system.getDistributionManager(); |
| InternalDistributedMember idm = dm.getDistributionManagerId(); |
| system.disconnect(); |
| assertTrue(idm.getPort() <= DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1]); |
| assertTrue(idm.getPort() >= DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[0]); |
| assertTrue(idm.getDirectChannelPort() <= DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1]); |
| assertTrue(idm.getDirectChannelPort() >= DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[0]); |
| } |
| |
| public void testConflictingUDPPort() throws Exception { |
| final Properties config = new Properties(); |
| final int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| final int unicastPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| config.setProperty("mcast-port", String.valueOf(mcastPort)); |
| config.setProperty("locators", ""); |
| config.setProperty(DistributionConfig.MEMBERSHIP_PORT_RANGE_NAME, |
| ""+unicastPort+"-"+(unicastPort+2)); |
| system = (InternalDistributedSystem)DistributedSystem.connect(config); |
| DistributionManager dm = (DistributionManager)system.getDistributionManager(); |
| InternalDistributedMember idm = dm.getDistributionManagerId(); |
| VM vm = Host.getHost(0).getVM(1); |
| vm.invoke(new CacheSerializableRunnable("start conflicting system") { |
| public void run2() { |
| try { |
| DistributedSystem system = DistributedSystem.connect(config); |
| system.disconnect(); |
| } catch (SystemConnectException e) { |
| return; // |
| } |
| fail("expected a SystemConnectException but didn't get one"); |
| } |
| }); |
| system.disconnect(); |
| } |
| |
| /** |
| * Tests that configuring a distributed system with a cache-xml-file |
| * of "" does not initialize a cache. See bug 32254. |
| * |
| * @since 4.0 |
| */ |
| public void testEmptyCacheXmlFile() throws Exception { |
| Properties config = new Properties(); |
| |
| // int unusedPort = |
| // AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| // config.setProperty("mcast-port", String.valueOf(unusedPort)); |
| // a loner is all this test needs |
| config.setProperty("mcast-port", "0"); |
| config.setProperty("locators", ""); |
| config.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, ""); |
| |
| DistributedSystem sys = DistributedSystem.connect(config); |
| |
| try { |
| try { |
| CacheFactory.getInstance(sys); |
| fail("Should have thrown a CancelException"); |
| } |
| catch (CancelException expected) { |
| } |
| // now make sure we can create the cache |
| CacheFactory.create(sys); |
| |
| } finally { |
| sys.disconnect(); |
| } |
| } |
| |
| static volatile String problem; |
| |
| public void testInterruptedWhileConnecting() throws Exception { |
| Runnable r = new Runnable() { |
| public void run() { |
| ClientGmsImpl.SLOW_JOIN_LOCK = new Object(); |
| ClientGmsImpl.SLOW_JOIN = true; |
| try { |
| assertTrue("should be disconnected at this point", InternalDistributedSystem.getConnectedInstance() == null); |
| getSystem(); |
| problem = "a connection to the distributed system was established but it should have failed"; |
| } catch (SystemConnectException e) { |
| if (!e.getMessage().endsWith(ExternalStrings.ClientGmsImpl_JOIN_INTERRUPTED.getRawText())) { |
| problem = "got a system connect exception but it was for the wrong reason"; |
| getLogWriter().info("wrong exception thrown: '" + e.getMessage() + "' (wanted '"+ |
| ExternalStrings.ClientGmsImpl_JOIN_INTERRUPTED.getRawText()+"')", e); |
| } |
| } finally { |
| ClientGmsImpl.SLOW_JOIN = false; |
| ClientGmsImpl.SLOW_JOIN_LOCK = null; |
| } |
| } |
| }; |
| Thread connectThread = new Thread(r, "testInterruptedWhileConnecting connect thread"); |
| ClientGmsImpl.SLOW_JOIN = false; |
| connectThread.start(); |
| while (ClientGmsImpl.SLOW_JOIN == false) { |
| pause(1000); |
| } |
| pause(5000); |
| connectThread.interrupt(); |
| connectThread.join(60000); |
| getLogWriter().info("done waiting for connectThread. Thread is " + |
| (connectThread.isAlive()? "still alive" : "stopped")); |
| if (ClientGmsImpl.SLOW_JOIN) { |
| problem = "failed to either connect or get an exception - one of these should have happened"; |
| dumpMyThreads(getLogWriter()); |
| } |
| if (problem != null) { |
| fail(problem); |
| } |
| } |
| |
| |
| } |