blob: 392a5993c0b24043e7af67435404d5f2082692b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.internal.tcp;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.direct.DirectChannel;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SSLConfig;
import org.apache.geode.internal.net.SocketCreator;
public class TCPConduitTest {
private Membership<InternalDistributedMember> membership;
private DirectChannel directChannel;
private InetAddress localHost;
private ConnectionTable connectionTable;
private SocketCreator socketCreator;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
@Before
public void setUp() throws Exception {
membership = uncheckedCast(mock(Membership.class));
directChannel = mock(DirectChannel.class);
connectionTable = mock(ConnectionTable.class);
socketCreator = new SocketCreator(new SSLConfig.Builder().build());
localHost = LocalHostUtil.getLocalHost();
when(directChannel.getDM())
.thenReturn(mock(DistributionManager.class));
}
@Test
public void closedConduitDoesNotThrowNPEWhenAskedForBufferPool() {
directChannel.getDM(); // Mockito demands that this mock be used in this test
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
tcpConduit.stop(null);
assertThat(tcpConduit.getBufferPool()).isNotNull();
}
@Test
public void getConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting()
throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
.when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
AlertingAction.execute(() -> {
Throwable thrown = catchThrowable(() -> {
tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
.isInstanceOf(AlertingIOException.class);
});
}
@Test
public void getConnectionRethrows_ifCaughtIOException_whileNotAlerting() throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
Connection connection = mock(Connection.class);
when(connection.getRemoteAddress())
.thenReturn(member);
doThrow(new IOException("Cannot form connection to alert listener"))
// getConnection will loop indefinitely until connectionTable returns connection
.doReturn(connection)
.when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
assertThat(value)
.isSameAs(connection);
}
@Test
public void getConnectionRethrows_ifCaughtIOException_whenMemberDoesNotExist() throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
.when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(false);
Throwable thrown = catchThrowable(() -> {
tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
.isInstanceOf(IOException.class)
.isNotInstanceOf(AlertingIOException.class);
}
@Test
public void getConnectionRethrows_ifCaughtIOException_whenMemberIsShunned() throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
.when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
.isInstanceOf(IOException.class)
.isNotInstanceOf(AlertingIOException.class);
}
@Test
public void getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress()
throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
.when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
when(membership.shutdownInProgress())
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
.isInstanceOf(DistributedSystemDisconnectedException.class)
.hasMessage("Abandoned because shutdown is in progress");
}
@Test
public void getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress_andCancelIsInProgress()
throws Exception {
TCPConduit tcpConduit =
new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
.when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
when(membership.shutdownInProgress())
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
.isInstanceOf(DistributedSystemDisconnectedException.class)
.hasMessage("Abandoned because shutdown is in progress");
}
private Runnable doNothing() {
return () -> {
// nothing
};
}
}