blob: a4ac4286103ecb9c21fd731e302019ef3502d9eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.messenger;
import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD;
import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.SerializationException;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Digest;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.SerialAckedMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import org.apache.geode.distributed.internal.membership.gms.messages.InstallViewMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.JoinResponseMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.LeaveRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messenger.JGroupsMessenger.JGroupsReceiver;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.statistics.StatisticsRegistry;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
public class JGroupsMessengerJUnitTest {
private static final String AES_128 = "AES:128";
private Services services;
private JGroupsMessenger messenger;
private JoinLeave joinLeave;
private Manager manager;
private Stopper stopper;
private HealthMonitor healthMonitor;
private InterceptUDP interceptor;
private long statsId = 123;
private void initMocks(boolean enableMcast) throws Exception {
initMocks(enableMcast, new Properties());
}
/**
* Create stub and mock objects
*/
private void initMocks(boolean enableMcast, Properties addProp) throws Exception {
if (messenger != null) {
messenger.stop();
messenger = null;
}
Properties nonDefault = new Properties();
nonDefault.put(DISABLE_TCP, "true");
nonDefault.put(MCAST_PORT,
enableMcast ? "" + AvailablePortHelper.getRandomAvailableUDPPort() : "0");
nonDefault.put(MCAST_TTL, "0");
nonDefault.put(LOG_FILE, "");
nonDefault.put(LOG_LEVEL, "fine");
nonDefault.put(LOCATORS, "localhost[10344]");
nonDefault.put(ACK_WAIT_THRESHOLD, "1");
nonDefault.putAll(addProp);
DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
RemoteTransportConfig tconfig =
new RemoteTransportConfig(config, ClusterDistributionManager.NORMAL_DM_TYPE);
stopper = mock(Stopper.class);
when(stopper.isCancelInProgress()).thenReturn(false);
manager = mock(Manager.class);
when(manager.isMulticastAllowed()).thenReturn(enableMcast);
healthMonitor = mock(HealthMonitor.class);
joinLeave = mock(JoinLeave.class);
ServiceConfig serviceConfig = new ServiceConfig(tconfig, config);
services = mock(Services.class);
when(services.getConfig()).thenReturn(serviceConfig);
when(services.getCancelCriterion()).thenReturn(stopper);
when(services.getHealthMonitor()).thenReturn(healthMonitor);
when(services.getManager()).thenReturn(manager);
when(services.getJoinLeave()).thenReturn(joinLeave);
DistributionManager dm = mock(DistributionManager.class);
InternalDistributedSystem system =
new InternalDistributedSystem.BuilderForTesting(nonDefault)
.setDistributionManager(dm)
.setStatisticsManagerFactory(
(name, startTime, statsDisabled) -> new StatisticsRegistry(name, startTime))
.build();
when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
messenger = new JGroupsMessenger();
messenger.init(services);
// if I do this earlier then test this return messenger as null
when(services.getMessenger()).thenReturn(messenger);
String jgroupsConfig = messenger.jgStackConfig;
int startIdx = jgroupsConfig.indexOf("<org");
int insertIdx = jgroupsConfig.indexOf('>', startIdx + 4) + 1;
jgroupsConfig = jgroupsConfig.substring(0, insertIdx) + "<" + InterceptUDP.class.getName()
+ "/>" + jgroupsConfig.substring(insertIdx);
messenger.jgStackConfig = jgroupsConfig;
// System.out.println("jgroups config: " + jgroupsConfig);
messenger.start();
messenger.started();
interceptor =
(InterceptUDP) messenger.myChannel.getProtocolStack().getTransport().getUpProtocol();
}
@After
public void stopMessenger() {
if (messenger != null && messenger.myChannel != null) {
messenger.stop();
}
}
@Test
public void ioExceptionInitiatesSuspectProcessing() throws Exception {
// see GEODE-634
initMocks(false);
NetView v = createView();
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
messenger.handleJGroupsIOException(new IOException("je m'en fiche"),
new JGAddress(v.getMembers().get(1)));
verify(healthMonitor).suspect(isA(InternalDistributedMember.class), isA(String.class));
}
@Test
public void ioExceptionDuringShutdownAvoidsSuspectProcessing() throws Exception {
// see GEODE-634
initMocks(false);
NetView v = createView();
when(joinLeave.getView()).thenReturn(v);
when(manager.shutdownInProgress()).thenReturn(true);
messenger.installView(v);
messenger.handleJGroupsIOException(new IOException("fichez-moi le camp"),
new JGAddress(v.getMembers().get(1)));
verify(healthMonitor, never()).checkIfAvailable(isA(InternalDistributedMember.class),
isA(String.class), isA(Boolean.class));
}
private NetView createView() {
InternalDistributedMember sender = messenger.getMemberID();
List<InternalDistributedMember> mbrs = new ArrayList<>();
mbrs.add(sender);
mbrs.add(createAddress(100));
mbrs.add(createAddress(101));
NetView v = new NetView(sender, 1, mbrs);
return v;
}
@Test
public void alertMessagesBypassFlowControl() throws Exception {
initMocks(false);
Message jgmsg = new Message();
DistributionMessage dmsg = mock(DistributionMessage.class);
when(dmsg.getProcessorType()).thenReturn(ClusterDistributionManager.SERIAL_EXECUTOR);
messenger.setMessageFlags(dmsg, jgmsg);
assertFalse("expected no_fc to not be set in " + jgmsg.getFlags(),
jgmsg.isFlagSet(Message.Flag.NO_FC));
AlertingAction.execute(() -> {
messenger.setMessageFlags(dmsg, jgmsg);
assertTrue("expected no_fc to be set in " + jgmsg.getFlags(),
jgmsg.isFlagSet(Message.Flag.NO_FC));
});
}
@Test
public void testMemberWeightIsSerialized() throws Exception {
HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
InternalDistributedMember mbr = createAddress(8888);
((GMSMember) mbr.getNetMember()).setMemberWeight((byte) 40);
mbr.toData(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
mbr = new InternalDistributedMember();
mbr.fromData(in);
assertEquals(40, mbr.getNetMember().getMemberWeight());
}
@Test
public void testSerializationError() throws Exception {
for (int i = 0; i < 2; i++) {
boolean enableMcast = (i == 1);
initMocks(enableMcast);
InternalDistributedMember mbr = createAddress(8888);
DistributedCacheOperation.CacheOperationMessage msg =
mock(DistributedCacheOperation.CacheOperationMessage.class);
when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
when(msg.getMulticast()).thenReturn(enableMcast);
if (!enableMcast) {
// for non-mcast we send a message with a reply-processor
when(msg.getProcessorId()).thenReturn(1234);
} else {
// for mcast we send a direct-ack message and expect the messenger
// to register it
when(msg.isDirectAck()).thenReturn(true);
}
when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
// for code coverage we need to test with both a SerializationException and
// an IOException. The former is wrapped in a GemfireIOException while the
// latter is not
doThrow(new SerializationException()).when(msg).toData(any(DataOutput.class));
try {
messenger.send(msg);
fail("expected a failure");
} catch (GemFireIOException e) {
// success
}
if (enableMcast) {
verify(msg, atLeastOnce()).registerProcessor();
}
doThrow(new IOException()).when(msg).toData(any(DataOutput.class));
try {
messenger.send(msg);
fail("expected a failure");
} catch (GemFireIOException e) {
// success
}
}
}
@Test
public void testJChannelError() throws Exception {
for (int i = 0; i < 2; i++) {
boolean enableMcast = (i == 1);
initMocks(enableMcast);
JChannel mockChannel = mock(JChannel.class);
when(mockChannel.isConnected()).thenReturn(true);
doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
JChannel realChannel = messenger.myChannel;
messenger.myChannel = mockChannel;
try {
InternalDistributedMember mbr = createAddress(8888);
DistributedCacheOperation.CacheOperationMessage msg =
mock(DistributedCacheOperation.CacheOperationMessage.class);
when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
when(msg.getMulticast()).thenReturn(enableMcast);
when(msg.getProcessorId()).thenReturn(1234);
when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
try {
messenger.send(msg);
fail("expected a failure");
} catch (DistributedSystemDisconnectedException e) {
// success
}
verify(mockChannel).send(isA(Message.class));
} finally {
messenger.myChannel = realChannel;
}
}
}
@Test
public void testJChannelErrorDuringDisconnect() throws Exception {
for (int i = 0; i < 4; i++) {
System.out.println("loop #" + i);
boolean enableMcast = (i % 2 == 1);
initMocks(enableMcast);
JChannel mockChannel = mock(JChannel.class);
when(mockChannel.isConnected()).thenReturn(true);
Exception ex, shutdownCause;
if (i < 2) {
ex = new RuntimeException("");
shutdownCause = new RuntimeException("shutdownCause");
} else {
shutdownCause = new ForcedDisconnectException("");
ex = new RuntimeException("", shutdownCause);
}
doThrow(ex).when(mockChannel).send(any(Message.class));
JChannel realChannel = messenger.myChannel;
messenger.myChannel = mockChannel;
when(services.getShutdownCause()).thenReturn(shutdownCause);
try {
InternalDistributedMember mbr = createAddress(8888);
DistributedCacheOperation.CacheOperationMessage msg =
mock(DistributedCacheOperation.CacheOperationMessage.class);
when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
when(msg.getMulticast()).thenReturn(enableMcast);
when(msg.getProcessorId()).thenReturn(1234);
when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
try {
messenger.send(msg);
fail("expected a failure");
} catch (DistributedSystemDisconnectedException e) {
// the ultimate cause should be the shutdownCause returned
// by Services.getShutdownCause()
Throwable cause = e;
while (cause.getCause() != null) {
cause = cause.getCause();
}
assertTrue(cause != e);
assertTrue(cause == shutdownCause);
}
verify(mockChannel).send(isA(Message.class));
} finally {
messenger.myChannel = realChannel;
}
}
}
@Test
public void testSendWhenChannelIsClosed() throws Exception {
for (int i = 0; i < 2; i++) {
initMocks(false);
JChannel mockChannel = mock(JChannel.class);
when(mockChannel.isConnected()).thenReturn(false);
doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
JChannel realChannel = messenger.myChannel;
messenger.myChannel = mockChannel;
try {
InternalDistributedMember mbr = createAddress(8888);
DistributedCacheOperation.CacheOperationMessage msg =
mock(DistributedCacheOperation.CacheOperationMessage.class);
when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
when(msg.getMulticast()).thenReturn(false);
when(msg.getProcessorId()).thenReturn(1234);
try {
messenger.send(msg);
fail("expected a failure");
} catch (DistributedSystemDisconnectedException e) {
// success
}
verify(mockChannel, never()).send(isA(Message.class));
} finally {
messenger.myChannel = realChannel;
}
}
}
@Test
public void testSendUnreliably() throws Exception {
for (int i = 0; i < 2; i++) {
boolean enableMcast = (i == 1);
initMocks(enableMcast);
InternalDistributedMember mbr = createAddress(8888);
DistributedCacheOperation.CacheOperationMessage msg =
mock(DistributedCacheOperation.CacheOperationMessage.class);
when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
when(msg.getMulticast()).thenReturn(enableMcast);
if (!enableMcast) {
// for non-mcast we send a message with a reply-processor
when(msg.getProcessorId()).thenReturn(1234);
} else {
// for mcast we send a direct-ack message and expect the messenger
// to register it
when(msg.isDirectAck()).thenReturn(true);
}
when(msg.getDSFID()).thenReturn((int) DataSerializableFixedID.PUT_ALL_MESSAGE);
interceptor.collectMessages = true;
try {
messenger.sendUnreliably(msg);
} catch (GemFireIOException e) {
fail("expected success");
}
if (enableMcast) {
verify(msg, atLeastOnce()).registerProcessor();
}
verify(msg).toData(isA(DataOutput.class));
assertTrue("expected 1 message but found " + interceptor.collectedMessages,
interceptor.collectedMessages.size() == 1);
assertTrue(interceptor.collectedMessages.get(0).isFlagSet(Message.Flag.NO_RELIABILITY));
}
}
@Test
public void testMessageDeliveredToHandler() throws Exception {
doTestMessageDeliveredToHandler(false);
}
@Test
public void testMessageDeliveredToHandlerMcast() throws Exception {
doTestMessageDeliveredToHandler(true);
}
private void doTestMessageDeliveredToHandler(boolean mcast) throws Exception {
initMocks(mcast);
MessageHandler mh = mock(MessageHandler.class);
messenger.addHandler(JoinRequestMessage.class, mh);
InternalDistributedMember addr = messenger.getMemberID();
NetView v = new NetView(addr);
when(joinLeave.getView()).thenReturn(v);
InternalDistributedMember sender = createAddress(8888);
JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
interceptor.up(new Event(Event.MSG, jmsg));
verify(mh, times(1)).processMessage(any(JoinRequestMessage.class));
LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing");
when(joinLeave.getMemberID(any())).thenReturn(sender);
jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL);
interceptor.up(new Event(Event.MSG, jmsg));
verify(manager).processMessage(any(LeaveRequestMessage.class));
}
@Test
public void testBigMessageIsFragmented() throws Exception {
doTestBigMessageIsFragmented(false, false);
}
@Test
public void testBigMessageIsFragmentedMcast() throws Exception {
doTestBigMessageIsFragmented(true, true);
}
@Test
public void testBroadcastUDPMessage() throws Exception {
doTestBigMessageIsFragmented(false, true);
}
public void doTestBigMessageIsFragmented(boolean mcastEnabled, boolean mcastMsg)
throws Exception {
initMocks(mcastEnabled);
MessageHandler mh = mock(MessageHandler.class);
messenger.addHandler(JoinRequestMessage.class, mh);
InternalDistributedMember sender = messenger.getMemberID();
NetView v = new NetView(sender);
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
if (mcastMsg) {
msg.setMulticast(true);
}
messenger.send(msg);
int sentMessages = (mcastEnabled && mcastMsg) ? interceptor.mcastSentDataMessages
: interceptor.unicastSentDataMessages;
assertTrue("expected 1 message to be sent but found " + sentMessages, sentMessages == 1);
// send a big message and expect fragmentation
msg = new JoinRequestMessage(messenger.localAddress, sender,
new byte[(int) (services.getConfig().getDistributionConfig().getUdpFragmentSize() * (1.5))],
-1, 0);
// configure an incoming message handler for JoinRequestMessage
final JoinRequestMessage[] messageReceived = new JoinRequestMessage[1];
messenger.addHandler(JoinRequestMessage.class, message -> messageReceived[0] = message);
// configure the outgoing message interceptor
interceptor.unicastSentDataMessages = 0;
interceptor.collectMessages = true;
interceptor.collectedMessages.clear();
messenger.send(msg);
assertTrue("expected 2 messages to be sent but found " + interceptor.unicastSentDataMessages,
interceptor.unicastSentDataMessages == 2);
List<Message> messages = new ArrayList<>(interceptor.collectedMessages);
UUID fakeMember = new UUID(50, 50);
short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
int seqno = 1;
for (Message m : messages) {
m.setSrc(fakeMember);
UNICAST3.Header oldHeader = (UNICAST3.Header) m.getHeader(unicastHeaderId);
if (oldHeader == null)
continue;
UNICAST3.Header newHeader =
UNICAST3.Header.createDataHeader(seqno, oldHeader.connId(), seqno == 1);
seqno += 1;
m.putHeader(unicastHeaderId, newHeader);
interceptor.up(new Event(Event.MSG, m));
}
Thread.sleep(5000);
System.out.println("received message = " + messageReceived[0]);
}
@Test
public void testSendToMultipleMembers() throws Exception {
initMocks(false);
InternalDistributedMember sender = messenger.getMemberID();
InternalDistributedMember other = createAddress(8888);
NetView v = new NetView(sender);
v.add(other);
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
List<InternalDistributedMember> recipients = v.getMembers();
SerialAckedMessage msg = new SerialAckedMessage();
msg.setRecipients(recipients);
messenger.send(msg);
int sentMessages = interceptor.unicastSentDataMessages;
assertTrue("expected 2 message to be sent but found " + sentMessages, sentMessages == 2);
}
@Test
public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect()
throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
doCallRealMethod().when(services).isAutoReconnectEnabled();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
assertTrue(messenger.myChannel.isConnected());
}
@Test
public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect()
throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
doCallRealMethod().when(services).isAutoReconnectEnabled();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertTrue(messenger.myChannel.isConnected());
}
@Test
public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(true).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
assertTrue(messenger.myChannel.isConnected());
}
@Test
public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(true).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertTrue(messenger.myChannel.isConnected());
}
@Test
public void testChannelClosedOnEmergencyClose() throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testChannelClosedOnStop() throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect()
throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(true).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect()
throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(true).when(services).isShutdownDueToForcedDisconnect();
doReturn(false).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect()
throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(true).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception {
initMocks(false);
doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
doCallRealMethod().when(services).getShutdownCause();
doCallRealMethod().when(services).emergencyClose();
doReturn(false).when(services).isShutdownDueToForcedDisconnect();
doReturn(true).when(services).isAutoReconnectEnabled();
doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertFalse(messenger.myChannel.isConnected());
}
@Test
public void testMessageFiltering() throws Exception {
initMocks(true);
InternalDistributedMember mbr = createAddress(8888);
NetView view = new NetView(mbr);
// the digest should be set in an outgoing join response
JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view, 0);
messenger.filterOutgoingMessage(joinResponse);
assertNotNull(joinResponse.getMessengerData());
// save the view digest for later
byte[] data = joinResponse.getMessengerData();
// the digest should be used and the message bytes nulled out in an incoming join response
messenger.filterIncomingMessage(joinResponse);
assertNull(joinResponse.getMessengerData());
// the digest shouldn't be set in an outgoing rejection message
joinResponse =
new JoinResponseMessage("you can't join my distributed system. nyah nyah nyah!", 0);
messenger.filterOutgoingMessage(joinResponse);
assertNull(joinResponse.getMessengerData());
// the digest shouldn't be installed from an incoming rejection message
joinResponse.setMessengerData(data);
messenger.filterIncomingMessage(joinResponse);
assertNotNull(joinResponse.getMessengerData());
}
@Test
public void testPingPong() throws Exception {
initMocks(false);
GMSPingPonger pinger = messenger.pingPonger;
InternalDistributedMember mbr = createAddress(8888);
JGAddress addr = new JGAddress(mbr);
Message pingMessage = pinger.createPingMessage(null, addr);
assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
assertFalse(pinger.isPongMessage(pingMessage.getBuffer()));
Message pongMessage = pinger.createPongMessage(null, addr);
assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
assertFalse(pinger.isPingMessage(pongMessage.getBuffer()));
interceptor.collectMessages = true;
pinger.sendPingMessage(messenger.myChannel, null, addr);
assertEquals("expected 1 message but found " + interceptor.collectedMessages,
interceptor.collectedMessages.size(), 1);
pingMessage = interceptor.collectedMessages.get(0);
assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
interceptor.collectedMessages.clear();
pinger.sendPongMessage(messenger.myChannel, null, addr);
assertEquals("expected 1 message but found " + interceptor.collectedMessages,
interceptor.collectedMessages.size(), 1);
pongMessage = interceptor.collectedMessages.get(0);
assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
interceptor.collectedMessages.clear();
JGroupsReceiver receiver = (JGroupsReceiver) messenger.myChannel.getReceiver();
long pongsReceived = messenger.pongsReceived.longValue();
receiver.receive(pongMessage);
assertEquals(pongsReceived + 1, messenger.pongsReceived.longValue());
receiver.receive(pingMessage);
assertEquals("expected 1 message but found " + interceptor.collectedMessages,
interceptor.collectedMessages.size(), 1);
Message m = interceptor.collectedMessages.get(0);
assertTrue(pinger.isPongMessage(m.getBuffer()));
}
@Test
public void testJGroupsIOExceptionHandler() throws Exception {
initMocks(false);
InternalDistributedMember mbr = createAddress(8888);
NetView v = new NetView(mbr);
v.add(messenger.getMemberID());
messenger.installView(v);
IOException ioe = new IOException("test exception");
messenger.handleJGroupsIOException(ioe, new JGAddress(mbr));
messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored
verify(healthMonitor).suspect(mbr, "Unable to send messages to this member via JGroups");
}
@Test
public void testReceiver() throws Exception {
try {
DistributionStats.enableClockStats = true;
initMocks(false);
JGroupsReceiver receiver = (JGroupsReceiver) messenger.myChannel.getReceiver();
// a zero-length message is ignored
Message msg = new Message(new JGAddress(messenger.getMemberID()));
Object result = messenger.readJGMessage(msg);
assertNull(result);
// for code coverage we need to pump this message through the receiver
receiver.receive(msg);
// for more code coverage we need to actually set a buffer in the message
msg.setBuffer(new byte[0]);
result = messenger.readJGMessage(msg);
assertNull(result);
receiver.receive(msg);
// now create a view and a real distribution-message
InternalDistributedMember myAddress = messenger.getMemberID();
InternalDistributedMember other = createAddress(8888);
NetView v = new NetView(myAddress);
v.add(other);
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
List<InternalDistributedMember> recipients = v.getMembers();
SerialAckedMessage dmsg = new SerialAckedMessage();
dmsg.setRecipients(recipients);
// a message is ignored during manager shutdown
msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
receiver.receive(msg);
verify(manager, never()).processMessage(isA(DistributionMessage.class));
assertTrue("There should be UDPDispatchRequestTime stats",
services.getStatistics().getUDPDispatchRequestTime() > 0);
} finally {
DistributionStats.enableClockStats = false;
}
}
@Test
public void testUseOldJChannel() throws Exception {
initMocks(false);
JChannel channel = messenger.myChannel;
services.getConfig().getTransport().setOldDSMembershipInfo(new MembershipInformation(channel,
Collections.singleton(new InternalDistributedMember("localhost", 10000)),
new ConcurrentLinkedQueue<>()));
JGroupsMessenger newMessenger = new JGroupsMessenger();
newMessenger.init(services);
newMessenger.start();
newMessenger.started();
newMessenger.stop();
assertTrue(newMessenger.myChannel == messenger.myChannel);
}
@Test
public void testGetMessageState() throws Exception {
initMocks(true/* multicast */);
messenger.testMulticast(50); // do some multicast messaging
NAKACK2 nakack = (NAKACK2) messenger.myChannel.getProtocolStack().findProtocol("NAKACK2");
assertNotNull(nakack);
long seqno = nakack.getCurrentSeqno();
Map state = new HashMap();
messenger.getMessageState(null, state, true);
assertEquals(1, state.size());
Long stateLong = (Long) state.values().iterator().next();
assertTrue(
"expected multicast state to be at least " + seqno + " but it was " + stateLong.longValue(),
stateLong.longValue() >= seqno);
}
@Test
public void testGetMessageStateNoMulticast() throws Exception {
initMocks(false/* multicast */);
Map state = new HashMap();
messenger.getMessageState(null, state, true);
assertEquals("expected an empty map but received " + state, 0, state.size());
}
@Test
public void testWaitForMessageStateSucceeds() throws Exception {
initMocks(true/* multicast */);
JGroupsMessenger.MessageTracker tracker = mock(JGroupsMessenger.MessageTracker.class);
InternalDistributedMember mbr = createAddress(1234);
messenger.scheduledMcastSeqnos.put(mbr, tracker);
when(tracker.get()).thenReturn(0l, 2l, 49l, 50l, 80l);
Map state = new HashMap();
state.put("JGroups.mcastState", Long.valueOf(50));
messenger.waitForMessageState(mbr, state);
verify(tracker, times(4)).get();
reset(tracker);
when(tracker.get()).thenReturn(0l, 2l, 60l);
messenger.waitForMessageState(mbr, state);
verify(tracker, times(3)).get();
}
@Test
public void testWaitForMessageStateThrowsExceptionIfMessagesMissing() throws Exception {
initMocks(true/* multicast */);
NAKACK2 nakack = mock(NAKACK2.class);
Digest digest = mock(Digest.class);
when(nakack.getDigest(any(Address.class))).thenReturn(digest);
when(digest.get(any(Address.class))).thenReturn(new long[] {0, 0}, new long[] {2, 50},
new long[] {49, 50});
try {
// message 50 will never arrive
Map state = new HashMap();
state.put("JGroups.mcastState", Long.valueOf(50));
InternalDistributedMember mbr = createAddress(1234);
messenger.scheduledMcastSeqnos.put(mbr, new JGroupsMessenger.MessageTracker(30));
messenger.waitForMessageState(mbr, state);
fail("expected a GemFireIOException to be thrown");
} catch (GemFireIOException e) {
// pass
}
}
private NetView createView(InternalDistributedMember otherMbr) {
InternalDistributedMember sender = messenger.getMemberID();
List<InternalDistributedMember> mbrs = new ArrayList<>();
mbrs.add(sender);
mbrs.add(otherMbr);
NetView v = new NetView(sender, 1, mbrs);
return v;
}
@Test
public void testEncryptedFindCoordinatorRequest() throws Exception {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
final String udpDhalgo = "AES:128";
p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, udpDhalgo);
initMocks(false, p);
NetView v = createView(otherMbr);
when(joinLeave.getMemberID(messenger.getMemberID().getNetMember()))
.thenReturn(messenger.getMemberID());
GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, udpDhalgo);
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
FindCoordinatorRequest gfmsg = new FindCoordinatorRequest(messenger.getMemberID(),
new ArrayList<InternalDistributedMember>(2), 1,
messenger.getPublicKey(messenger.getMemberID()), 1, "");
Set<InternalDistributedMember> recipients = new HashSet<>();
recipients.add(otherMbr);
gfmsg.setRecipients(recipients);
short version = Version.CURRENT_ORDINAL;
HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
messenger.writeEncryptedMessage(gfmsg, version, out);
byte[] requestBytes = out.toByteArray();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
DistributionMessage distributionMessage =
messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
assertEquals(gfmsg, distributionMessage);
}
@Test
public void testEncryptedFindCoordinatorResponse() throws Exception {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
messenger.getMemberID());
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
FindCoordinatorResponse gfmsg = new FindCoordinatorResponse(messenger.getMemberID(),
messenger.getMemberID(), messenger.getClusterSecretKey(), 1);
Set<InternalDistributedMember> recipients = new HashSet<>();
recipients.add(otherMbr);
gfmsg.setRecipients(recipients);
short version = Version.CURRENT_ORDINAL;
HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
messenger.writeEncryptedMessage(gfmsg, version, out);
byte[] requestBytes = out.toByteArray();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
messenger.addRequestId(1, messenger.getMemberID());
DistributionMessage distributionMessage =
messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
assertEquals(gfmsg, distributionMessage);
}
@Test
public void testEncryptedJoinRequest() throws Exception {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
JoinRequestMessage gfmsg =
new JoinRequestMessage(otherMbr, messenger.getMemberID(), null, 9789, 1);
short version = Version.CURRENT_ORDINAL;
HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
messenger.writeEncryptedMessage(gfmsg, version, out);
byte[] requestBytes = out.toByteArray();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
DistributionMessage distributionMessage =
messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
assertEquals(gfmsg, distributionMessage);
}
@Test
public void testEncryptedJoinResponse() throws Exception {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
messenger.getMemberID());
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
JoinResponseMessage gfmsg =
new JoinResponseMessage(otherMbr, messenger.getClusterSecretKey(), 1);
short version = Version.CURRENT_ORDINAL;
HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
messenger.writeEncryptedMessage(gfmsg, version, out);
byte[] requestBytes = out.toByteArray();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
messenger.addRequestId(1, messenger.getMemberID());
DistributionMessage gfMessageAtOtherMbr =
messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
assertEquals(gfmsg, gfMessageAtOtherMbr);
// lets send view as well..
InstallViewMessage installViewMessage = new InstallViewMessage(v, null, true);
out = new HeapDataOutputStream(Version.CURRENT);
messenger.writeEncryptedMessage(installViewMessage, version, out);
requestBytes = out.toByteArray();
otherMbrEncrptor.setClusterKey(((JoinResponseMessage) gfMessageAtOtherMbr).getSecretPk());
dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
assertEquals(installViewMessage, gfMessageAtOtherMbr);
}
/**
* creates an InternalDistributedMember address that can be used with the doctored JGroups
* channel. This includes a logical (UUID) address and a physical (IpAddress) address.
*
* @param port the UDP port to use for the new address
*/
private InternalDistributedMember createAddress(int port) {
GMSMember gms = new GMSMember("localhost", port);
gms.setUUID(UUID.randomUUID());
gms.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE);
gms.setVersionOrdinal(Version.CURRENT_ORDINAL);
return new InternalDistributedMember(gms);
}
}