blob: 3420d36a2ce7cbf6739b7b95b817c5b34f060c64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms;
import static org.apache.geode.distributed.internal.membership.api.MembershipConfig.DEFAULT_LOCATOR_WAIT_TIME;
import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.FIND_LOCATOR_RETRY_SLEEP;
import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.JOIN_RETRY_SLEEP;
import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.getMinimumRetriesBeforeBecomingCoordinator;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactoryImpl;
import org.apache.geode.distributed.internal.membership.api.MemberStartupException;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.distributed.internal.membership.api.MembershipBuilder;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorBuilder;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreatorImpl;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketFactory;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Tests of using the membership APIs to make multiple Membership systems that communicate
* with each other and form a group
*/
public class MembershipIntegrationTest {
private InetAddress localHost;
private DSFIDSerializer dsfidSerializer;
private TcpSocketCreator socketCreator;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Before
public void before() throws IOException, MembershipConfigurationException {
localHost = LocalHostUtil.getLocalHost();
dsfidSerializer = new DSFIDSerializerImpl();
socketCreator = new TcpSocketCreatorImpl();
}
@Test
public void oneMembershipCanStartWithALocator()
throws IOException, MemberStartupException {
final MembershipLocator<MemberIdentifier> locator = createLocator(0);
locator.start();
final Membership<MemberIdentifier> membership = createMembership(locator,
locator.getPort());
start(membership);
assertThat(membership.getView().getMembers()).hasSize(1);
stop(membership);
stop(locator);
}
@Test
public void twoMembershipsCanStartWithOneLocator()
throws IOException, MemberStartupException {
final MembershipLocator<MemberIdentifier> locator = createLocator(0);
locator.start();
final int locatorPort = locator.getPort();
final Membership<MemberIdentifier> membership1 = createMembership(locator, locatorPort);
start(membership1);
final Membership<MemberIdentifier> membership2 = createMembership(null, locatorPort);
start(membership2);
await().untilAsserted(
() -> assertThat(membership1.getView().getMembers()).hasSize(2));
await().untilAsserted(
() -> assertThat(membership2.getView().getMembers()).hasSize(2));
stop(membership1, membership2);
stop(locator);
}
@Test
public void twoLocatorsCanStartSequentially()
throws IOException, MemberStartupException {
final MembershipLocator<MemberIdentifier> locator1 = createLocator(0);
locator1.start();
final int locatorPort1 = locator1.getPort();
Membership<MemberIdentifier> membership1 = createMembership(locator1, locatorPort1);
start(membership1);
final MembershipLocator<MemberIdentifier> locator2 = createLocator(0, locatorPort1);
locator2.start();
final int locatorPort2 = locator2.getPort();
Membership<MemberIdentifier> membership2 =
createMembership(locator2, locatorPort1, locatorPort2);
start(membership2);
await().untilAsserted(
() -> assertThat(membership1.getView().getMembers()).hasSize(2));
await().untilAsserted(
() -> assertThat(membership2.getView().getMembers()).hasSize(2));
stop(membership2, membership1);
stop(locator2, locator1);
}
@Test
public void secondMembershipCanJoinUsingTheSecondLocatorToStart()
throws IOException, MemberStartupException {
final MembershipLocator<MemberIdentifier> locator1 = createLocator(0);
locator1.start();
final int locatorPort1 = locator1.getPort();
final Membership<MemberIdentifier> membership1 = createMembership(locator1, locatorPort1);
start(membership1);
final MembershipLocator<MemberIdentifier> locator2 = createLocator(0, locatorPort1);
locator2.start();
int locatorPort2 = locator2.getPort();
// Force the next membership to use locator2 by stopping locator1
stop(locator1);
Membership<MemberIdentifier> membership2 =
createMembership(locator2, locatorPort1, locatorPort2);
start(membership2);
await().untilAsserted(
() -> assertThat(membership1.getView().getMembers()).hasSize(2));
await().untilAsserted(
() -> assertThat(membership2.getView().getMembers()).hasSize(2));
stop(membership2, membership1);
stop(locator2, locator1);
}
@Test
public void locatorWaitsForLocatorWaitTimeUntilAllLocatorsContacted()
throws InterruptedException, TimeoutException, ExecutionException {
final Supplier<ExecutorService> executorServiceSupplier =
() -> LoggingExecutors.newCachedThreadPool("membership", false);
int[] locatorPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
int locatorWaitTime = (int) Duration.ofMinutes(5).getSeconds();
final MembershipConfig config =
createMembershipConfig(true, locatorWaitTime, locatorPorts[0], locatorPorts[1]);
/*
* Start a locator trying to contact the locator that hasn't started it's port
*
* Set its locator-wait-time so it'll not become a coordinator right away, allowing time for the
* other member to start and become a coordinator.
*/
CompletableFuture<Membership<MemberIdentifier>> createMembership0 =
launchLocator(executorServiceSupplier, locatorPorts[0], config);
// minimum duration a locator waits to become the coordinator, regardless of locatorWaitTime
final Duration minimumJoinWaitTime = Duration
// amount of sleep time per retry in GMSJoinLeave.join()
.ofMillis(JOIN_RETRY_SLEEP + FIND_LOCATOR_RETRY_SLEEP)
// expected number of retries in GMSJoinLeave.join()
.multipliedBy(getMinimumRetriesBeforeBecomingCoordinator(locatorPorts.length));
/*
* By sleeping for 2x the minimumJoinWaitTime, we are trying to make sure we sleep for
* longer than the minimum but shorter than the locatorWaitTime so we can detect whether the
* lateJoiningMembership is waiting for the full locatorWaitTime and not just the minimum
* wait time.
*/
Thread.sleep(2 * minimumJoinWaitTime.toMillis());
assertThat(createMembership0.getNow(null)).isNull();
/*
* Now start the other locator, after waiting longer than the minimum wait time for
* connecting to a locator but shorter than the locator-wait-time.
*/
CompletableFuture<Membership<MemberIdentifier>> createMembership1 =
launchLocator(executorServiceSupplier, locatorPorts[1], config);
// Make sure the members are created in less than the locator-wait-time
Membership<MemberIdentifier> membership0 = createMembership0.get(2, TimeUnit.MINUTES);
Membership<MemberIdentifier> membership1 = createMembership1.get(2, TimeUnit.MINUTES);
// Make sure the members see each other in the view
await().untilAsserted(() -> assertThat(membership0.getView().getMembers()).hasSize(2));
await().untilAsserted(() -> assertThat(membership1.getView().getMembers()).hasSize(2));
stop(membership0, membership1);
}
private CompletableFuture<Membership<MemberIdentifier>> launchLocator(
Supplier<ExecutorService> executorServiceSupplier, int locatorPort, MembershipConfig config) {
return executorServiceRule.supplyAsync(() -> {
try {
Path locatorDirectory0 = temporaryFolder.newFolder().toPath();
MembershipLocator<MemberIdentifier> locator0 =
MembershipLocatorBuilder.newLocatorBuilder(
socketCreator,
dsfidSerializer,
locatorDirectory0,
executorServiceSupplier)
.setConfig(config)
.setPort(locatorPort)
.create();
locator0.start();
Membership<MemberIdentifier> membership = createMembership(config, locator0);
membership.start();
membership.startEventProcessing();
return membership;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private void start(final Membership<MemberIdentifier> membership)
throws MemberStartupException {
membership.start();
membership.startEventProcessing();
}
private Membership<MemberIdentifier> createMembership(
final MembershipLocator<MemberIdentifier> embeddedLocator,
final int... locatorPorts)
throws MembershipConfigurationException {
final boolean isALocator = embeddedLocator != null;
final MembershipConfig config =
createMembershipConfig(isALocator, DEFAULT_LOCATOR_WAIT_TIME, locatorPorts);
return createMembership(config, embeddedLocator);
}
private Membership<MemberIdentifier> createMembership(
final MembershipConfig config,
final MembershipLocator<MemberIdentifier> embeddedLocator)
throws MembershipConfigurationException {
final MemberIdentifierFactoryImpl memberIdFactory = new MemberIdentifierFactoryImpl();
final TcpClient locatorClient =
new TcpClient(socketCreator, dsfidSerializer.getObjectSerializer(),
dsfidSerializer.getObjectDeserializer(), TcpSocketFactory.DEFAULT);
return MembershipBuilder.<MemberIdentifier>newMembershipBuilder(
socketCreator, locatorClient, dsfidSerializer, memberIdFactory)
.setMembershipLocator(embeddedLocator)
.setConfig(config)
.create();
}
private MembershipConfig createMembershipConfig(
final boolean isALocator,
final int locatorWaitTime,
final int... locatorPorts) {
return new MembershipConfig() {
public String getLocators() {
return getLocatorString(locatorPorts);
}
// TODO - the Membership system starting in the locator *MUST* be told that is
// is a locator through this flag. Ideally it should be able to infer this from
// being associated with a locator
@Override
public int getVmKind() {
return isALocator ? MemberIdentifier.LOCATOR_DM_TYPE : MemberIdentifier.NORMAL_DM_TYPE;
}
@Override
public int getLocatorWaitTime() {
return locatorWaitTime;
}
};
}
private String getLocatorString(
final int... locatorPorts) {
final String hostName = localHost.getHostName();
return Arrays.stream(locatorPorts)
.mapToObj(port -> hostName + '[' + port + ']')
.collect(Collectors.joining(","));
}
private MembershipLocator<MemberIdentifier> createLocator(
final int localPort,
final int... locatorPorts)
throws MembershipConfigurationException,
IOException {
final Supplier<ExecutorService> executorServiceSupplier =
() -> LoggingExecutors.newCachedThreadPool("membership", false);
Path locatorDirectory = temporaryFolder.newFolder().toPath();
final MembershipConfig config =
createMembershipConfig(true, DEFAULT_LOCATOR_WAIT_TIME, locatorPorts);
return MembershipLocatorBuilder.<MemberIdentifier>newLocatorBuilder(
socketCreator,
dsfidSerializer,
locatorDirectory,
executorServiceSupplier)
.setConfig(config)
.setPort(localPort)
.create();
}
private void stop(final Membership<MemberIdentifier>... memberships) {
Arrays.stream(memberships).forEach(membership -> membership.disconnect(false));
}
private void stop(final MembershipLocator<MemberIdentifier>... locators) {
Arrays.stream(locators).forEach(locator -> locator.stop());
}
}