blob: 1d2a00bad150e61f0193d016cfc3bb0ceef6895c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms;
import static org.apache.geode.distributed.internal.membership.gms.util.MemberIdentifierUtil.createMemberID;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.HIGH_PRIORITY_ACKED_MESSAGE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.stream.Collectors;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.geode.distributed.internal.membership.api.Authenticator;
import org.apache.geode.distributed.internal.membership.api.LifecycleListener;
import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberShunnedException;
import org.apache.geode.distributed.internal.membership.api.MemberStartupException;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipListener;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.api.Message;
import org.apache.geode.distributed.internal.membership.api.MessageListener;
import org.apache.geode.distributed.internal.membership.gms.GMSMembership.StartupEvent;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.Versioning;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
public class GMSMembershipJUnitTest {
private static final Version OLDER_THAN_CURRENT_VERSION =
Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL - 1));
private static final Version NEWER_THAN_CURRENT_VERSION =
Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL + 1));;
private static final int DEFAULT_PORT = 8888;
private Services services;
private MembershipConfig mockConfig;
private Authenticator authenticator;
private HealthMonitor healthMonitor;
private MemberIdentifier myMemberId;
private MemberIdentifier[] mockMembers;
private Messenger messenger;
private JoinLeave joinLeave;
private Stopper stopper;
private MembershipListener listener;
private GMSMembership<MemberIdentifier> manager;
private List<MemberIdentifier> members;
private MessageListener messageListener;
private LifecycleListener directChannelCallback;
@Before
public void initMocks() throws Exception {
mockConfig = new MembershipConfig() {
@Override
public long getMemberTimeout() {
return 2000;
}
@Override
public String getLocators() {
return "localhost[10344]";
}
@Override
public boolean getDisableTcp() {
return true;
}
@Override
public long getAckWaitThreshold() {
return 1;
}
@Override
public long getAckSevereAlertThreshold() {
return 10;
}
};
authenticator = mock(Authenticator.class);
myMemberId = createMemberID(8887);
UUID uuid = new UUID(12345, 12345);
myMemberId.setUUID(uuid);
messenger = mock(Messenger.class);
when(messenger.getMemberID()).thenReturn(myMemberId);
stopper = mock(Stopper.class);
when(stopper.isCancelInProgress()).thenReturn(false);
healthMonitor = mock(HealthMonitor.class);
when(healthMonitor.getFailureDetectionPort()).thenReturn(Integer.valueOf(-1));
joinLeave = mock(JoinLeave.class);
services = mock(Services.class);
when(services.getAuthenticator()).thenReturn(authenticator);
when(services.getConfig()).thenReturn(mockConfig);
when(services.getMessenger()).thenReturn(messenger);
when(services.getCancelCriterion()).thenReturn(stopper);
when(services.getHealthMonitor()).thenReturn(healthMonitor);
when(services.getJoinLeave()).thenReturn(joinLeave);
Timer t = new Timer(true);
when(services.getTimer()).thenReturn(t);
Random r = new Random();
mockMembers = new MemberIdentifier[5];
for (int i = 0; i < mockMembers.length; i++) {
mockMembers[i] = createMemberID(DEFAULT_PORT + i);
uuid = new UUID(r.nextLong(), r.nextLong());
mockMembers[i].setUUID(uuid);
}
members = new ArrayList<>(Arrays.asList(mockMembers));
listener = mock(MembershipListener.class);
messageListener = mock(MessageListener.class);
directChannelCallback = mock(LifecycleListener.class);
manager = new GMSMembership(listener, messageListener, directChannelCallback);
manager.getGMSManager().init(services);
when(services.getManager()).thenReturn(manager.getGMSManager());
DSFIDSerializer serializer = new DSFIDSerializerImpl();
when(services.getSerializer()).thenReturn(serializer);
Services.registerSerializables(serializer);
}
@After
public void tearDown() throws Exception {
if (manager != null) {
manager.getGMSManager().stop();
manager.getGMSManager().stopped();
}
}
@Test
public void testSendMessage() throws Exception {
services.getSerializer().registerDSFID(HIGH_PRIORITY_ACKED_MESSAGE, TestMessage.class);
TestMessage m = new TestMessage();
m.setRecipient(mockMembers[0]);
manager.getGMSManager().start();
manager.getGMSManager().started();
MemberIdentifier myGMSMemberId = myMemberId;
List<MemberIdentifier> gmsMembers =
members.stream().map(x -> ((MemberIdentifier) x)).collect(Collectors.toList());
manager.getGMSManager().installView(new GMSMembershipView<>(myGMSMemberId, 1, gmsMembers));
MemberIdentifier[] destinations = new MemberIdentifier[] {mockMembers[0]};
Set<MemberIdentifier> failures =
manager.send(destinations, m);
verify(messenger).send(isA(Message.class));
if (failures != null) {
assertEquals(0, failures.size());
}
}
private GMSMembershipView createView(MemberIdentifier creator, int viewId,
List<MemberIdentifier> members) {
List<MemberIdentifier> gmsMembers = new ArrayList<>(members);
return new GMSMembershipView(creator, viewId, gmsMembers);
}
@Test
public void testStartupEvents() throws Exception {
manager.getGMSManager().start();
manager.getGMSManager().started();
manager.isJoining = true;
List<MemberIdentifier> viewMembers =
Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId});
manager.getGMSManager().installView(createView(myMemberId, 2, viewMembers));
// add a surprise member that will be shunned due to it's having
// an old view ID
MemberIdentifier surpriseMember = mockMembers[2];
surpriseMember.setVmViewId(1);
manager.handleOrDeferSurpriseConnect(surpriseMember);
assertEquals(1, manager.getStartupEvents().size());
// add a surprise member that will be accepted
MemberIdentifier surpriseMember2 = mockMembers[3];
surpriseMember2.setVmViewId(3);
manager.handleOrDeferSurpriseConnect(surpriseMember2);
assertEquals(2, manager.getStartupEvents().size());
// suspect a member
MemberIdentifier suspectMember = mockMembers[1];
manager.handleOrDeferSuspect(
new SuspectMember<>(mockMembers[0], suspectMember, "testing"));
// suspect messages aren't queued - they're ignored before joining the system
assertEquals(2, manager.getStartupEvents().size());
verify(listener, never()).memberSuspect(suspectMember, mockMembers[0], "testing");
TestMessage m = new TestMessage();
mockMembers[0].setVmViewId(1);
m.setRecipient(mockMembers[0]);
m.setSender(mockMembers[1]);
manager.handleOrDeferMessage(m);
assertEquals(3, manager.getStartupEvents().size());
// this view officially adds surpriseMember2
viewMembers = Arrays
.asList(new MemberIdentifier[] {mockMembers[0], myMemberId, surpriseMember2});
manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 3, viewMembers));
assertEquals(4, manager.getStartupEvents().size());
// add a surprise member that will be shunned due to it's having
// an old view ID
MemberIdentifier surpriseMember3 = mockMembers[4];
surpriseMember.setVmViewId(1);
manager.handleOrDeferSurpriseConnect(surpriseMember);
assertEquals(5, manager.getStartupEvents().size());
// process a new view after we finish joining but before event processing has started
manager.isJoining = false;
mockMembers[4].setVmViewId(4);
viewMembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId,
surpriseMember2, mockMembers[4]});
manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 4, viewMembers));
assertEquals(6, manager.getStartupEvents().size());
// exercise the toString methods for code coverage
for (StartupEvent<MemberIdentifier> ev : manager.getStartupEvents()) {
ev.toString();
}
manager.startEventProcessing();
// all startup events should have been processed
assertEquals(0, manager.getStartupEvents().size());
// the new view should have been installed
assertEquals(4, manager.getView().getViewId());
// supriseMember2 should have been announced
verify(listener).newMemberConnected(surpriseMember2);
// supriseMember should have been rejected (old view ID)
verify(listener, never()).newMemberConnected(surpriseMember);
// for code coverage also install a view after we finish joining but before
// event processing has started. This should notify the distribution manager
// with a LocalViewMessage to process the view
manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 5, viewMembers));
await().untilAsserted(() -> assertEquals(manager.getView().getViewId(), 5));
// process a suspect now - it will be passed to the listener
reset(listener);
suspectMember = mockMembers[1];
manager.handleOrDeferSuspect(
new SuspectMember<>(mockMembers[0], suspectMember, "testing"));
verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing");
}
/**
* This test ensures that the membership manager can accept an ID that does not have a UUID and
* replace it with one that does have a UUID from the current membership view.
*/
@Test
public void testAddressesWithoutUUIDs() throws Exception {
manager.getGMSManager().start();
manager.getGMSManager().started();
manager.isJoining = true;
List<MemberIdentifier> viewMembers =
Arrays.asList(new MemberIdentifier[] {mockMembers[0], mockMembers[1], myMemberId});
GMSMembershipView view = createView(myMemberId, 2, viewMembers);
manager.getGMSManager().installView(view);
when(services.getJoinLeave().getView()).thenReturn(view);
MemberIdentifier[] destinations = new MemberIdentifier[viewMembers.size()];
for (int i = 0; i < destinations.length; i++) {
MemberIdentifier id = viewMembers.get(i);
destinations[i] = createMemberID(id.getMembershipPort());
}
manager.checkAddressesForUUIDs(destinations);
// each destination w/o a UUID should have been replaced with the corresponding
// ID from the membership view
for (int i = 0; i < destinations.length; i++) {
assertTrue(destinations[i].hasUUID());
}
}
@Test
public void noDispatchWhenSick() throws MemberShunnedException, MemberStartupException {
final Message msg = mock(Message.class);
when(msg.dropMessageWhenMembershipIsPlayingDead()).thenReturn(true);
final GMSMembership spy = Mockito.spy(manager);
spy.beSick();
spy.getGMSManager().start();
spy.getGMSManager().started();
spy.handleOrDeferMessage(msg);
verify(spy, never()).dispatchMessage(any(Message.class));
assertThat(spy.getStartupEvents()).isEmpty();
}
@Test
public void testIsMulticastAllowedWithOldVersionSurpriseMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
manager.addSurpriseMember(createSurpriseMember(OLDER_THAN_CURRENT_VERSION));
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse();
}
@Test
public void testIsMulticastAllowedWithCurrentVersionSurpriseMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
manager.addSurpriseMember(createSurpriseMember(KnownVersion.CURRENT));
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
}
@Test
public void testIsMulticastAllowedWithNewVersionSurpriseMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
manager.addSurpriseMember(createSurpriseMember(NEWER_THAN_CURRENT_VERSION));
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
}
@Test
public void testIsMulticastAllowedWithOldVersionViewMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
view.getMembers().get(0).setVersionForTest(OLDER_THAN_CURRENT_VERSION);
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse();
}
@Test
public void testMulticastAllowedWithCurrentVersionViewMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
}
@Test
public void testMulticastAllowedWithNewVersionViewMember() {
MembershipView<MemberIdentifier> view = createMembershipView();
view.getMembers().get(0).setVersionForTest(NEWER_THAN_CURRENT_VERSION);
manager.processView(view);
assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue();
}
@Test
public void membershipInvokesUpstreamListenerDuringForcedDisconnect() {
// have an exception interrupt the shutdown process and ensure that a thread is
// launched to inform the cache of shutdown
IllegalStateException expectedException = new IllegalStateException();
doThrow(expectedException).when(services).emergencyClose();
assertThatThrownBy(() -> manager.uncleanShutdown("For testing",
new MemberDisconnectedException("For Testing")))
.isEqualTo(expectedException);
verify(listener).membershipFailure(isA(String.class), isA(Throwable.class));
}
private MemberIdentifier createSurpriseMember(Version version) {
MemberIdentifier surpriseMember = createMemberID(DEFAULT_PORT + 5);
surpriseMember.setVmViewId(3);
surpriseMember.setVersionForTest(version);
return surpriseMember;
}
private MembershipView<MemberIdentifier> createMembershipView() {
List<MemberIdentifier> viewMembers = createMemberIdentifiers();
return new MembershipView<>(myMemberId, 2, viewMembers);
}
private List<MemberIdentifier> createMemberIdentifiers() {
List<MemberIdentifier> viewMembers = new ArrayList<>();
for (int i = 0; i < 2; ++i) {
MemberIdentifier memberIdentifier = createMemberID(DEFAULT_PORT + 6 + i);
viewMembers.add(memberIdentifier);
}
return viewMembers;
}
}