| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal.membership.adapter; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.ACK_SEVERE_ALERT_THRESHOLD; |
| import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD; |
| import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL; |
| import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyInt; |
| import static org.mockito.Matchers.isA; |
| import static org.mockito.Mockito.anyLong; |
| import static org.mockito.Mockito.atLeastOnce; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.reset; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.stream.Collectors; |
| |
| import org.assertj.core.api.Assertions; |
| import org.jgroups.util.UUID; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DMStats; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionConfigImpl; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.HighPriorityAckedMessage; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.direct.DirectChannel; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.distributed.internal.membership.MembershipView; |
| import org.apache.geode.distributed.internal.membership.adapter.GMSMembershipManager.StartupEvent; |
| import org.apache.geode.distributed.internal.membership.gms.GMSMemberData; |
| import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; |
| import org.apache.geode.distributed.internal.membership.gms.Services; |
| import org.apache.geode.distributed.internal.membership.gms.Services.Stopper; |
| import org.apache.geode.distributed.internal.membership.gms.SuspectMember; |
| import org.apache.geode.distributed.internal.membership.gms.api.Authenticator; |
| import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier; |
| import org.apache.geode.distributed.internal.membership.gms.api.MembershipConfig; |
| import org.apache.geode.distributed.internal.membership.gms.api.MembershipListener; |
| import org.apache.geode.distributed.internal.membership.gms.api.MessageListener; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.GMSMessage; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; |
| import org.apache.geode.internal.admin.remote.AlertListenerMessage; |
| import org.apache.geode.internal.admin.remote.RemoteTransportConfig; |
| import org.apache.geode.internal.statistics.DummyStatisticsRegistry; |
| import org.apache.geode.internal.tcp.ConnectExceptions; |
| import org.apache.geode.test.junit.categories.MembershipTest; |
| |
| @Category({MembershipTest.class}) |
| public class GMSMembershipManagerJUnitTest { |
| |
| private static final long WAIT_FOR_REPLIES_MILLIS = 2000; |
| |
| private Services services; |
| private MembershipConfig mockConfig; |
| private DistributionConfig distConfig; |
| private Properties distProperties; |
| private Authenticator authenticator; |
| private HealthMonitor healthMonitor; |
| private InternalDistributedMember myMemberId; |
| private InternalDistributedMember[] mockMembers; |
| private Messenger messenger; |
| private JoinLeave joinLeave; |
| private Stopper stopper; |
| private MembershipListener listener; |
| private GMSMembershipManager manager; |
| private List<InternalDistributedMember> members; |
| private DirectChannel dc; |
| private MessageListener messageListener; |
| |
| @Before |
| public void initMocks() throws Exception { |
| Properties nonDefault = new Properties(); |
| nonDefault.put(ACK_WAIT_THRESHOLD, "1"); |
| nonDefault.put(ACK_SEVERE_ALERT_THRESHOLD, "10"); |
| nonDefault.put(DISABLE_TCP, "true"); |
| nonDefault.put(MCAST_PORT, "0"); |
| nonDefault.put(MCAST_TTL, "0"); |
| nonDefault.put(LOG_FILE, ""); |
| nonDefault.put(LOG_LEVEL, "fine"); |
| nonDefault.put(MEMBER_TIMEOUT, "2000"); |
| nonDefault.put(LOCATORS, "localhost[10344]"); |
| distConfig = new DistributionConfigImpl(nonDefault); |
| distProperties = nonDefault; |
| RemoteTransportConfig tconfig = |
| new RemoteTransportConfig(distConfig, ClusterDistributionManager.NORMAL_DM_TYPE); |
| |
| mockConfig = new ServiceConfig(tconfig, distConfig); |
| |
| authenticator = mock(Authenticator.class); |
| myMemberId = new InternalDistributedMember("localhost", 8887); |
| GMSMemberData m = (GMSMemberData) myMemberId.getMemberData(); |
| UUID uuid = new UUID(12345, 12345); |
| m.setUUID(uuid); |
| |
| messenger = mock(Messenger.class); |
| when(messenger.getMemberID()).thenReturn(myMemberId); |
| |
| stopper = mock(Stopper.class); |
| when(stopper.isCancelInProgress()).thenReturn(false); |
| |
| healthMonitor = mock(HealthMonitor.class); |
| when(healthMonitor.getFailureDetectionPort()).thenReturn(Integer.valueOf(-1)); |
| |
| joinLeave = mock(JoinLeave.class); |
| |
| services = mock(Services.class); |
| when(services.getAuthenticator()).thenReturn(authenticator); |
| when(services.getConfig()).thenReturn(mockConfig); |
| when(services.getMessenger()).thenReturn(messenger); |
| when(services.getCancelCriterion()).thenReturn(stopper); |
| when(services.getHealthMonitor()).thenReturn(healthMonitor); |
| when(services.getJoinLeave()).thenReturn(joinLeave); |
| |
| Timer t = new Timer(true); |
| when(services.getTimer()).thenReturn(t); |
| |
| Random r = new Random(); |
| mockMembers = new InternalDistributedMember[5]; |
| for (int i = 0; i < mockMembers.length; i++) { |
| mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i); |
| m = (GMSMemberData) mockMembers[i].getMemberData(); |
| uuid = new UUID(r.nextLong(), r.nextLong()); |
| m.setUUID(uuid); |
| } |
| members = new ArrayList<>(Arrays.asList(mockMembers)); |
| |
| listener = mock(MembershipListener.class); |
| messageListener = mock(MessageListener.class); |
| manager = new GMSMembershipManager(listener, messageListener, null); |
| manager.getGMSManager().init(services); |
| when(services.getManager()).thenReturn(manager.getGMSManager()); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (manager != null) { |
| manager.getGMSManager().stop(); |
| manager.getGMSManager().stopped(); |
| } |
| } |
| |
| @Test |
| public void testSendMessage() throws Exception { |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| m.setRecipient(mockMembers[0]); |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| MemberIdentifier myGMSMemberId = myMemberId; |
| List<MemberIdentifier> gmsMembers = |
| members.stream().map(x -> ((MemberIdentifier) x)).collect(Collectors.toList()); |
| manager.getGMSManager().installView(new GMSMembershipView(myGMSMemberId, 1, gmsMembers)); |
| Set<InternalDistributedMember> failures = |
| manager.send(m.getRecipients(), m); |
| verify(messenger).send(isA(GMSMessageAdapter.class)); |
| if (failures != null) { |
| assertEquals(0, failures.size()); |
| } |
| } |
| |
| |
| |
| private GMSMembershipView createView(InternalDistributedMember creator, int viewId, |
| List<InternalDistributedMember> members) { |
| List<MemberIdentifier> gmsMembers = new ArrayList<>(members); |
| return new GMSMembershipView(creator, viewId, gmsMembers); |
| } |
| |
| @Test |
| public void testSendAdminMessageFailsDuringShutdown() throws Exception { |
| AlertListenerMessage m = AlertListenerMessage.create(mockMembers[0], 1, |
| Instant.now(), "thread", "", 1L, "", ""); |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| manager.getGMSManager().installView(createView(myMemberId, 1, members)); |
| manager.setShutdown(); |
| Set<InternalDistributedMember> failures = |
| manager.send(new InternalDistributedMember[] {mockMembers[0]}, m); |
| verify(messenger, never()).send(isA(GMSMessage.class)); |
| assertEquals(1, failures.size()); |
| assertEquals(mockMembers[0], failures.iterator().next()); |
| } |
| |
| @Test |
| public void testSendToEmptyListIsRejected() throws Exception { |
| InternalDistributedMember[] emptyList = new InternalDistributedMember[0]; |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| m.setRecipient(mockMembers[0]); |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| manager.getGMSManager().installView(createView(myMemberId, 1, members)); |
| Set<InternalDistributedMember> failures = manager.send(null, m); |
| verify(messenger, never()).send(isA(GMSMessage.class)); |
| reset(messenger); |
| failures = manager.send(emptyList, m); |
| verify(messenger, never()).send(isA(GMSMessage.class)); |
| } |
| |
| @Test |
| public void testStartupEvents() throws Exception { |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| manager.isJoining = true; |
| |
| List<InternalDistributedMember> viewmembers = |
| Arrays.asList(new InternalDistributedMember[] {mockMembers[0], myMemberId}); |
| manager.getGMSManager().installView(createView(myMemberId, 2, viewmembers)); |
| |
| // add a surprise member that will be shunned due to it's having |
| // an old view ID |
| InternalDistributedMember surpriseMember = mockMembers[2]; |
| surpriseMember.setVmViewId(1); |
| manager.handleOrDeferSurpriseConnect(surpriseMember); |
| assertEquals(1, manager.getStartupEvents().size()); |
| |
| // add a surprise member that will be accepted |
| InternalDistributedMember surpriseMember2 = mockMembers[3]; |
| surpriseMember2.setVmViewId(3); |
| manager.handleOrDeferSurpriseConnect(surpriseMember2); |
| assertEquals(2, manager.getStartupEvents().size()); |
| |
| // suspect a member |
| InternalDistributedMember suspectMember = mockMembers[1]; |
| manager.handleOrDeferSuspect( |
| new SuspectMember(mockMembers[0], suspectMember, "testing")); |
| // suspect messages aren't queued - they're ignored before joining the system |
| assertEquals(2, manager.getStartupEvents().size()); |
| verify(listener, never()).memberSuspect(suspectMember, mockMembers[0], "testing"); |
| |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| mockMembers[0].setVmViewId(1); |
| m.setRecipient(mockMembers[0]); |
| m.setSender(mockMembers[1]); |
| manager.handleOrDeferMessage(m); |
| assertEquals(3, manager.getStartupEvents().size()); |
| |
| // this view officially adds surpriseMember2 |
| viewmembers = Arrays |
| .asList(new InternalDistributedMember[] {mockMembers[0], myMemberId, surpriseMember2}); |
| manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 3, viewmembers)); |
| assertEquals(4, manager.getStartupEvents().size()); |
| |
| // add a surprise member that will be shunned due to it's having |
| // an old view ID |
| InternalDistributedMember surpriseMember3 = mockMembers[4]; |
| surpriseMember.setVmViewId(1); |
| manager.handleOrDeferSurpriseConnect(surpriseMember); |
| assertEquals(5, manager.getStartupEvents().size()); |
| |
| // process a new view after we finish joining but before event processing has started |
| manager.isJoining = false; |
| mockMembers[4].setVmViewId(4); |
| viewmembers = Arrays.asList(new InternalDistributedMember[] {mockMembers[0], myMemberId, |
| surpriseMember2, mockMembers[4]}); |
| manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 4, viewmembers)); |
| assertEquals(6, manager.getStartupEvents().size()); |
| |
| // exercise the toString methods for code coverage |
| for (StartupEvent ev : manager.getStartupEvents()) { |
| ev.toString(); |
| } |
| |
| manager.startEventProcessing(); |
| |
| // all startup events should have been processed |
| assertEquals(0, manager.getStartupEvents().size()); |
| // the new view should have been installed |
| assertEquals(4, manager.getView().getViewId()); |
| // supriseMember2 should have been announced |
| verify(listener).newMemberConnected(surpriseMember2); |
| // supriseMember should have been rejected (old view ID) |
| verify(listener, never()).newMemberConnected(surpriseMember); |
| |
| // for code coverage also install a view after we finish joining but before |
| // event processing has started. This should notify the distribution manager |
| // with a LocalViewMessage to process the view |
| reset(listener); |
| manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 5, viewmembers)); |
| assertEquals(0, manager.getStartupEvents().size()); |
| verify(messageListener).messageReceived(isA(LocalViewMessage.class)); |
| |
| // process a suspect now - it will be passed to the listener |
| reset(listener); |
| suspectMember = mockMembers[1]; |
| manager.handleOrDeferSuspect( |
| new SuspectMember(mockMembers[0], suspectMember, "testing")); |
| verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing"); |
| } |
| |
| @Test |
| public void testDirectChannelSend() throws Exception { |
| setUpDirectChannelMock(); |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| InternalDistributedMember[] recipients = |
| new InternalDistributedMember[] {mockMembers[2], mockMembers[3]}; |
| m.setRecipients(Arrays.asList(recipients)); |
| Set<InternalDistributedMember> failures = manager.directChannelSend(recipients, m); |
| assertTrue(failures == null); |
| verify(dc).send(isA(GMSMembershipManager.class), isA(mockMembers.getClass()), |
| isA(DistributionMessage.class), anyLong(), anyLong()); |
| } |
| |
| @Test |
| public void testDirectChannelSendFailureToOneRecipient() throws Exception { |
| setUpDirectChannelMock(); |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| InternalDistributedMember[] recipients = |
| new InternalDistributedMember[] {mockMembers[2], mockMembers[3]}; |
| m.setRecipients(Arrays.asList(recipients)); |
| Set<InternalDistributedMember> failures = manager.directChannelSend(recipients, m); |
| |
| ConnectExceptions exception = new ConnectExceptions(); |
| exception.addFailure(recipients[0], new Exception("testing")); |
| when(dc.send(any(GMSMembershipManager.class), any(mockMembers.getClass()), |
| any(DistributionMessage.class), anyLong(), anyLong())).thenThrow(exception); |
| failures = manager.directChannelSend(recipients, m); |
| assertTrue(failures != null); |
| assertEquals(1, failures.size()); |
| assertEquals(recipients[0], failures.iterator().next()); |
| } |
| |
| @Test |
| public void testDirectChannelSendFailureToAll() throws Exception { |
| setUpDirectChannelMock(); |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| InternalDistributedMember[] recipients = |
| new InternalDistributedMember[] {mockMembers[2], mockMembers[3]}; |
| m.setRecipients(Arrays.asList(recipients)); |
| Set<InternalDistributedMember> failures = manager.directChannelSend(recipients, m); |
| when(dc.send(any(GMSMembershipManager.class), any(mockMembers.getClass()), |
| any(DistributionMessage.class), anyInt(), anyInt())).thenReturn(0); |
| when(stopper.isCancelInProgress()).thenReturn(Boolean.TRUE); |
| try { |
| manager.directChannelSend(recipients, m); |
| fail("expected directChannelSend to throw an exception"); |
| } catch (DistributedSystemDisconnectedException expected) { |
| } |
| } |
| |
| @Test |
| public void testDirectChannelSendAllRecipients() throws Exception { |
| setUpDirectChannelMock(); |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| m.setRecipient(DistributionMessage.ALL_RECIPIENTS); |
| assertTrue(m.forAll()); |
| Set<InternalDistributedMember> failures = manager.directChannelSend(null, m); |
| assertTrue(failures == null); |
| verify(dc).send(isA(GMSMembershipManager.class), isA(mockMembers.getClass()), |
| isA(DistributionMessage.class), anyLong(), anyLong()); |
| } |
| |
| @Test |
| public void testDirectChannelSendFailureDueToForcedDisconnect() throws Exception { |
| setUpDirectChannelMock(); |
| HighPriorityAckedMessage m = new HighPriorityAckedMessage(); |
| InternalDistributedMember[] recipients = |
| new InternalDistributedMember[] {mockMembers[2], mockMembers[3]}; |
| m.setRecipients(Arrays.asList(recipients)); |
| Set<InternalDistributedMember> failures = manager.directChannelSend(recipients, m); |
| manager.setShutdown(); |
| ConnectExceptions exception = new ConnectExceptions(); |
| exception.addFailure(recipients[0], new Exception("testing")); |
| when(dc.send(any(GMSMembershipManager.class), any(mockMembers.getClass()), |
| any(DistributionMessage.class), anyLong(), anyLong())).thenThrow(exception); |
| Assertions.assertThatThrownBy(() -> { |
| manager.directChannelSend(recipients, m); |
| }).isInstanceOf(DistributedSystemDisconnectedException.class); |
| } |
| |
| /** |
| * This test ensures that the membership manager can accept an ID that does not have a UUID and |
| * replace it with one that does have a UUID from the current membership view. |
| */ |
| @Test |
| public void testAddressesWithoutUUIDs() throws Exception { |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| manager.isJoining = true; |
| |
| List<InternalDistributedMember> viewmembers = |
| Arrays.asList(new InternalDistributedMember[] {mockMembers[0], mockMembers[1], myMemberId}); |
| GMSMembershipView view = createView(myMemberId, 2, viewmembers); |
| manager.getGMSManager().installView(view); |
| when(services.getJoinLeave().getView()).thenReturn(view); |
| |
| InternalDistributedMember[] destinations = new InternalDistributedMember[viewmembers.size()]; |
| for (int i = 0; i < destinations.length; i++) { |
| InternalDistributedMember id = viewmembers.get(i); |
| destinations[i] = new InternalDistributedMember(id.getHost(), id.getMembershipPort()); |
| } |
| manager.checkAddressesForUUIDs(destinations); |
| // each destination w/o a UUID should have been replaced with the corresponding |
| // ID from the membership view |
| for (int i = 0; i < destinations.length; i++) { |
| assertTrue(((GMSMemberData) destinations[i].getMemberData()).hasUUID()); |
| } |
| } |
| |
| @Test |
| public void testReplyProcessorInitiatesSuspicion() throws Exception { |
| DistributionManager dm = mock(DistributionManager.class); |
| DMStats stats = mock(DMStats.class); |
| |
| InternalDistributedSystem system = |
| new InternalDistributedSystem.BuilderForTesting(distProperties) |
| .setDistributionManager(dm) |
| .setStatisticsManagerFactory( |
| (name, startTime, statsDisabled) -> new DummyStatisticsRegistry(name, startTime)) |
| .build(); |
| |
| when(dm.getStats()).thenReturn(stats); |
| when(dm.getSystem()).thenReturn(system); |
| when(dm.getCancelCriterion()).thenReturn(stopper); |
| when(dm.getMembershipManager()).thenReturn(manager); |
| when(dm.getViewMembers()).thenReturn(members); |
| when(dm.getDistributionManagerIds()).thenReturn(new HashSet<>(members)); |
| when(dm.addMembershipListenerAndGetDistributionManagerIds(any( |
| org.apache.geode.distributed.internal.MembershipListener.class))) |
| .thenReturn(new HashSet(members)); |
| |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| manager.isJoining = true; |
| |
| List<InternalDistributedMember> viewmembers = |
| Arrays.asList(new InternalDistributedMember[] {mockMembers[0], mockMembers[1], myMemberId}); |
| manager.getGMSManager().installView(createView(myMemberId, 2, viewmembers)); |
| |
| List<InternalDistributedMember> mbrs = new ArrayList<>(1); |
| mbrs.add(mockMembers[0]); |
| ReplyProcessor21 rp = new ReplyProcessor21(dm, mbrs); |
| rp.enableSevereAlertProcessing(); |
| boolean result = rp.waitForReplies(WAIT_FOR_REPLIES_MILLIS); |
| assertFalse(result); // the wait should have timed out |
| verify(healthMonitor, atLeastOnce()).checkIfAvailable(isA(MemberIdentifier.class), |
| isA(String.class), isA(Boolean.class)); |
| } |
| |
| /** |
| * Some tests require a DirectChannel mock |
| */ |
| private void setUpDirectChannelMock() throws Exception { |
| dc = mock(DirectChannel.class); |
| when(dc.send(any(GMSMembershipManager.class), any(mockMembers.getClass()), |
| any(DistributionMessage.class), anyInt(), anyInt())).thenReturn(100); |
| |
| manager.getGMSManager().start(); |
| manager.getGMSManager().started(); |
| |
| manager.setDirectChannel(dc); |
| |
| GMSMembershipView view = createView(myMemberId, 1, members); |
| manager.getGMSManager().installView(view); |
| when(joinLeave.getView()).thenReturn(view); |
| |
| manager.startEventProcessing(); |
| } |
| |
| } |