blob: 4e6dcef227d015b5e7ebc5544db3815d1bed1bf9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import static org.apache.geode.internal.logging.LogWriterLevel.FINE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category({ClientServerTest.class})
public class OpExecutorImplJUnitTest {
DummyManager manager;
private LogWriter logger;
private DummyEndpointManager endpointManager;
private DummyQueueManager queueManager;
private RegisterInterestTracker riTracker;
protected int borrows;
protected int returns;
protected int invalidateConnections;
protected int exchanges;
protected int serverCrashes;
protected int getPrimary;
protected int getBackups;
private CancelCriterion cancelCriterion;
@Before
public void setUp() {
this.logger = new LocalLogWriter(FINE.intLevel(), System.out);
this.endpointManager = new DummyEndpointManager();
this.queueManager = new DummyQueueManager();
this.manager = new DummyManager();
riTracker = new RegisterInterestTracker();
cancelCriterion = new CancelCriterion() {
@Override
public String cancelInProgress() {
return null;
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
return null;
}
};
}
@Test
public void testExecute() throws Exception {
OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3,
10, cancelCriterion, null);
Object result = exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
return "hello";
}
});
assertEquals("hello", result);
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(0, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
try {
result = exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new SocketTimeoutException();
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(3, exchanges);
assertEquals(1, returns);
assertEquals(4, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
try {
result = exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new ServerOperationException("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerOperationException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(0, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
try {
result = exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new IOException("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(3, exchanges);
assertEquals(1, returns);
assertEquals(4, invalidateConnections);
assertEquals(4, serverCrashes);
}
private void reset() {
borrows = 0;
returns = 0;
invalidateConnections = 0;
exchanges = 0;
serverCrashes = 0;
getPrimary = 0;
getBackups = 0;
}
@Test
public void testExecuteOncePerServer() throws Exception {
OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1,
10, cancelCriterion, null);
manager.numServers = 5;
try {
exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new IOException("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(4, exchanges);
assertEquals(1, returns);
assertEquals(6, invalidateConnections);
assertEquals(6, serverCrashes);
}
@Test
public void testRetryFailedServers() throws Exception {
OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 10,
10, cancelCriterion, null);
manager.numServers = 5;
try {
exec.execute(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new IOException("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(10, exchanges);
assertEquals(1, returns);
assertEquals(11, invalidateConnections);
assertEquals(11, serverCrashes);
}
@Test
public void testExecuteOn() throws Exception {
OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3,
10, cancelCriterion, null);
ServerLocation server = new ServerLocation("localhost", -1);
Object result = exec.executeOn(server, new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
return "hello";
}
});
assertEquals("hello", result);
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(0, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
try {
result = exec.executeOn(server, new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new SocketTimeoutException();
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(1, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
try {
result = exec.executeOn(server, new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new ServerOperationException("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerOperationException expected) {
// do nothing
}
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(0, invalidateConnections);
assertEquals(0, serverCrashes);
reset();
{
final String expectedEx = "java.lang.Exception";
final String addExpected =
"<ExpectedException action=add>" + expectedEx + "</ExpectedException>";
final String removeExpected =
"<ExpectedException action=remove>" + expectedEx + "</ExpectedException>";
logger.info(addExpected);
try {
result = exec.executeOn(server, new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new Exception("Something didn't work");
}
});
fail("Should have got an exception");
} catch (ServerConnectivityException expected) {
// do nothing
} finally {
logger.info(removeExpected);
}
}
assertEquals(1, borrows);
assertEquals(1, returns);
assertEquals(1, invalidateConnections);
assertEquals(1, serverCrashes);
}
@Test
public void testExecuteOnAllQueueServers() {
OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3,
10, cancelCriterion, null);
exec.executeOnAllQueueServers(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
return "hello";
}
});
assertEquals(0, invalidateConnections);
assertEquals(0, serverCrashes);
assertEquals(1, getPrimary);
assertEquals(1, getBackups);
reset();
queueManager.backups = 3;
exec.executeOnAllQueueServers(new Op() {
@Override
public Object attempt(Connection cnx) throws Exception {
throw new SocketTimeoutException();
}
});
assertEquals(4, invalidateConnections);
assertEquals(0, serverCrashes);
assertEquals(1, getPrimary);
assertEquals(1, getBackups);
reset();
queueManager.backups = 3;
Object result = exec.executeOnQueuesAndReturnPrimaryResult(new Op() {
int i = 0;
@Override
public Object attempt(Connection cnx) throws Exception {
i++;
if (i < 15) {
throw new IOException();
}
return "hello";
}
});
assertEquals("hello", result);
assertEquals(14, serverCrashes);
assertEquals(14, invalidateConnections);
assertEquals(12, getPrimary);
assertEquals(1, getBackups);
}
@Test
public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() {
OpExecutorImpl opExecutor =
new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1,
10, cancelCriterion, mock(PoolImpl.class));
Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
ServerLocation serverLocation = mock(ServerLocation.class);
opExecutor.setAffinityRetryCount(0);
opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
assertEquals(0, opExecutor.getAffinityRetryCount());
}
@Test
public void executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() {
OpExecutorImpl opExecutor =
new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1,
10, cancelCriterion, mock(PoolImpl.class));
Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
ServerLocation serverLocation = mock(ServerLocation.class);
opExecutor.setAffinityRetryCount(1);
opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
assertNotEquals(0, opExecutor.getAffinityRetryCount());
}
@Test
public void executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero() {
OpExecutorImpl opExecutor =
spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1,
10, cancelCriterion, mock(PoolImpl.class)));
Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
ServerLocation serverLocation = mock(ServerLocation.class);
ServerConnectivityException serverConnectivityException = new ServerConnectivityException();
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
txSynchronizationOp, true, false);
opExecutor.setupServerAffinity(true);
when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
opExecutor.setAffinityRetryCount(0);
opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
verify(opExecutor, times(1)).setAffinityRetryCount(1);
assertEquals(0, opExecutor.getAffinityRetryCount());
}
@Test
public void executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException() {
OpExecutorImpl opExecutor =
spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1,
10, cancelCriterion, mock(PoolImpl.class)));
Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
ServerLocation serverLocation = mock(ServerLocation.class);
ServerConnectivityException serverConnectivityException = new ServerConnectivityException();
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
txSynchronizationOp, true, false);
opExecutor.setupServerAffinity(true);
when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1);
assertThatThrownBy(
() -> opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp))
.isSameAs(serverConnectivityException);
}
private class DummyManager implements ConnectionManager {
protected int numServers = Integer.MAX_VALUE;
private int currentServer = 0;
public DummyManager() {}
@Override
public void emergencyClose() {}
@Override
public Connection borrowConnection(long aquireTimeout) {
borrows++;
return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers));
}
@Override
public Connection borrowConnection(ServerLocation server,
boolean onlyUseExistingCnx) {
borrows++;
return new DummyConnection(server);
}
@Override
public void close(boolean keepAlive) {}
@Override
public void returnConnection(Connection connection) {
returns++;
}
@Override
public void returnConnection(Connection connection, boolean accessed) {
returns++;
}
@Override
public void start(ScheduledExecutorService backgroundProcessor) {}
@Override
public Connection exchangeConnection(Connection conn, Set<ServerLocation> excludedServers) {
if (excludedServers.size() >= numServers) {
throw new NoAvailableServersException();
}
exchanges++;
return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers));
}
@Override
public int getConnectionCount() {
return 0;
}
}
private class DummyConnection implements Connection {
private ServerLocation server;
public DummyConnection(ServerLocation serverLocation) {
this.server = serverLocation;
}
@Override
public void close(boolean keepAlive) throws Exception {}
@Override
public void destroy() {
invalidateConnections++;
}
@Override
public boolean isDestroyed() {
return false;
}
@Override
public ByteBuffer getCommBuffer() {
return null;
}
@Override
public ServerLocation getServer() {
return server;
}
@Override
public Socket getSocket() {
return null;
}
@Override
public ConnectionStats getStats() {
return null;
}
@Override
public int getDistributedSystemId() {
return 0;
}
@Override
public Endpoint getEndpoint() {
return new Endpoint(null, null, null, null, null);
}
@Override
public ServerQueueStatus getQueueStatus() {
return null;
}
@Override
public Object execute(Op op) throws Exception {
return op.attempt(this);
}
@Override
public void emergencyClose() {}
@Override
public short getWanSiteVersion() {
return -1;
}
@Override
public void setWanSiteVersion(short wanSiteVersion) {}
@Override
public InputStream getInputStream() {
return null;
}
@Override
public OutputStream getOutputStream() {
return null;
}
@Override
public void setConnectionID(long id) {}
@Override
public long getConnectionID() {
return 0;
}
}
private class DummyEndpointManager implements EndpointManager {
@Override
public void addListener(EndpointListener listener) {}
@Override
public void close() {}
@Override
public Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId) {
return null;
}
@Override
public Map getEndpointMap() {
return null;
}
@Override
public void removeListener(EndpointListener listener) {}
@Override
public void serverCrashed(Endpoint endpoint) {
serverCrashes++;
}
@Override
public int getConnectedServerCount() {
return 0;
}
@Override
public Map getAllStats() {
return null;
}
@Override
public String getPoolName() {
return null;
}
}
private class DummyQueueManager implements QueueManager {
int backups = 0;
int currentServer = 0;
@Override
public QueueConnections getAllConnectionsNoWait() {
return getAllConnections();
}
@Override
public void emergencyClose() {}
@Override
public QueueConnections getAllConnections() {
return new QueueConnections() {
@Override
public List getBackups() {
getBackups++;
ArrayList result = new ArrayList(backups);
for (int i = 0; i < backups; i++) {
result.add(new DummyConnection(new ServerLocation("localhost", currentServer++)));
}
return result;
}
@Override
public Connection getPrimary() {
getPrimary++;
return new DummyConnection(new ServerLocation("localhost", currentServer++));
}
@Override
public QueueConnectionImpl getConnection(Endpoint ep) {
return null;
}
};
}
@Override
public void close(boolean keepAlive) {}
@Override
public void start(ScheduledExecutorService background) {}
@Override
public QueueState getState() {
return null;
}
@Override
public InternalPool getPool() {
return null;
}
@Override
public void readyForEvents(InternalDistributedSystem system) {}
@Override
public InternalLogWriter getSecurityLogger() {
return null;
}
@Override
public void checkEndpoint(ClientUpdater qc, Endpoint endpoint) {}
}
}