| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.client.internal; |
| |
| import static org.apache.geode.internal.logging.LogWriterLevel.FINE; |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ScheduledExecutorService; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.cache.client.NoAvailableServersException; |
| import org.apache.geode.cache.client.ServerConnectivityException; |
| import org.apache.geode.cache.client.ServerOperationException; |
| import org.apache.geode.cache.client.internal.pooling.ConnectionManager; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ServerLocation; |
| import org.apache.geode.internal.cache.tier.sockets.Message; |
| import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LocalLogWriter; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| |
| @Category({ClientServerTest.class}) |
| public class OpExecutorImplJUnitTest { |
| |
| DummyManager manager; |
| private LogWriter logger; |
| private DummyEndpointManager endpointManager; |
| private DummyQueueManager queueManager; |
| private RegisterInterestTracker riTracker; |
| |
| protected int borrows; |
| protected int returns; |
| protected int invalidateConnections; |
| protected int exchanges; |
| protected int serverCrashes; |
| protected int getPrimary; |
| protected int getBackups; |
| private CancelCriterion cancelCriterion; |
| |
| @Before |
| public void setUp() { |
| this.logger = new LocalLogWriter(FINE.intLevel(), System.out); |
| this.endpointManager = new DummyEndpointManager(); |
| this.queueManager = new DummyQueueManager(); |
| this.manager = new DummyManager(); |
| riTracker = new RegisterInterestTracker(); |
| cancelCriterion = new CancelCriterion() { |
| |
| @Override |
| public String cancelInProgress() { |
| return null; |
| } |
| |
| @Override |
| public RuntimeException generateCancelledException(Throwable e) { |
| return null; |
| } |
| }; |
| } |
| |
| @Test |
| public void testExecute() throws Exception { |
| OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, |
| 10, cancelCriterion, null); |
| Object result = exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| return "hello"; |
| } |
| }); |
| assertEquals("hello", result); |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(0, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| try { |
| result = exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new SocketTimeoutException(); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(3, exchanges); |
| assertEquals(1, returns); |
| assertEquals(4, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| try { |
| result = exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new ServerOperationException("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerOperationException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(0, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| try { |
| result = exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new IOException("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(3, exchanges); |
| assertEquals(1, returns); |
| assertEquals(4, invalidateConnections); |
| assertEquals(4, serverCrashes); |
| } |
| |
| private void reset() { |
| borrows = 0; |
| returns = 0; |
| invalidateConnections = 0; |
| exchanges = 0; |
| serverCrashes = 0; |
| getPrimary = 0; |
| getBackups = 0; |
| } |
| |
| @Test |
| public void testExecuteOncePerServer() throws Exception { |
| OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, |
| 10, cancelCriterion, null); |
| |
| manager.numServers = 5; |
| try { |
| exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new IOException("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(4, exchanges); |
| assertEquals(1, returns); |
| assertEquals(6, invalidateConnections); |
| assertEquals(6, serverCrashes); |
| } |
| |
| @Test |
| public void testRetryFailedServers() throws Exception { |
| OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 10, |
| 10, cancelCriterion, null); |
| |
| manager.numServers = 5; |
| try { |
| exec.execute(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new IOException("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(10, exchanges); |
| assertEquals(1, returns); |
| assertEquals(11, invalidateConnections); |
| assertEquals(11, serverCrashes); |
| } |
| |
| @Test |
| public void testExecuteOn() throws Exception { |
| OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, |
| 10, cancelCriterion, null); |
| ServerLocation server = new ServerLocation("localhost", -1); |
| Object result = exec.executeOn(server, new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| return "hello"; |
| } |
| }); |
| assertEquals("hello", result); |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(0, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| try { |
| result = exec.executeOn(server, new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new SocketTimeoutException(); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(1, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| try { |
| result = exec.executeOn(server, new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new ServerOperationException("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerOperationException expected) { |
| // do nothing |
| } |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(0, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| |
| reset(); |
| |
| { |
| final String expectedEx = "java.lang.Exception"; |
| final String addExpected = |
| "<ExpectedException action=add>" + expectedEx + "</ExpectedException>"; |
| final String removeExpected = |
| "<ExpectedException action=remove>" + expectedEx + "</ExpectedException>"; |
| logger.info(addExpected); |
| try { |
| result = exec.executeOn(server, new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new Exception("Something didn't work"); |
| } |
| }); |
| fail("Should have got an exception"); |
| } catch (ServerConnectivityException expected) { |
| // do nothing |
| } finally { |
| logger.info(removeExpected); |
| } |
| } |
| assertEquals(1, borrows); |
| assertEquals(1, returns); |
| assertEquals(1, invalidateConnections); |
| assertEquals(1, serverCrashes); |
| } |
| |
| @Test |
| public void testExecuteOnAllQueueServers() { |
| OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, |
| 10, cancelCriterion, null); |
| exec.executeOnAllQueueServers(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| return "hello"; |
| } |
| }); |
| assertEquals(0, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| assertEquals(1, getPrimary); |
| assertEquals(1, getBackups); |
| |
| reset(); |
| |
| queueManager.backups = 3; |
| exec.executeOnAllQueueServers(new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new SocketTimeoutException(); |
| } |
| }); |
| |
| assertEquals(4, invalidateConnections); |
| assertEquals(0, serverCrashes); |
| assertEquals(1, getPrimary); |
| assertEquals(1, getBackups); |
| |
| reset(); |
| |
| queueManager.backups = 3; |
| Object result = exec.executeOnQueuesAndReturnPrimaryResult(new Op() { |
| int i = 0; |
| |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| i++; |
| if (i < 15) { |
| throw new IOException(); |
| } |
| return "hello"; |
| } |
| }); |
| |
| assertEquals("hello", result); |
| assertEquals(14, serverCrashes); |
| assertEquals(14, invalidateConnections); |
| assertEquals(12, getPrimary); |
| assertEquals(1, getBackups); |
| |
| } |
| |
| @Test |
| public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() { |
| OpExecutorImpl opExecutor = |
| new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, |
| 10, cancelCriterion, mock(PoolImpl.class)); |
| Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); |
| ServerLocation serverLocation = mock(ServerLocation.class); |
| opExecutor.setAffinityRetryCount(0); |
| |
| opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); |
| |
| assertEquals(0, opExecutor.getAffinityRetryCount()); |
| } |
| |
| @Test |
| public void executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() { |
| OpExecutorImpl opExecutor = |
| new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, |
| 10, cancelCriterion, mock(PoolImpl.class)); |
| |
| Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); |
| ServerLocation serverLocation = mock(ServerLocation.class); |
| opExecutor.setAffinityRetryCount(1); |
| |
| opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); |
| |
| assertNotEquals(0, opExecutor.getAffinityRetryCount()); |
| } |
| |
| @Test |
| public void executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero() { |
| OpExecutorImpl opExecutor = |
| spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, |
| 10, cancelCriterion, mock(PoolImpl.class))); |
| |
| Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); |
| ServerLocation serverLocation = mock(ServerLocation.class); |
| ServerConnectivityException serverConnectivityException = new ServerConnectivityException(); |
| |
| doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation, |
| txSynchronizationOp, true, false); |
| opExecutor.setupServerAffinity(true); |
| when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class)); |
| opExecutor.setAffinityRetryCount(0); |
| |
| opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); |
| |
| verify(opExecutor, times(1)).setAffinityRetryCount(1); |
| assertEquals(0, opExecutor.getAffinityRetryCount()); |
| } |
| |
| @Test |
| public void executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException() { |
| OpExecutorImpl opExecutor = |
| spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, |
| 10, cancelCriterion, mock(PoolImpl.class))); |
| |
| Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); |
| ServerLocation serverLocation = mock(ServerLocation.class); |
| ServerConnectivityException serverConnectivityException = new ServerConnectivityException(); |
| |
| doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation, |
| txSynchronizationOp, true, false); |
| opExecutor.setupServerAffinity(true); |
| when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class)); |
| opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1); |
| |
| assertThatThrownBy( |
| () -> opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp)) |
| .isSameAs(serverConnectivityException); |
| } |
| |
| |
| private class DummyManager implements ConnectionManager { |
| |
| protected int numServers = Integer.MAX_VALUE; |
| private int currentServer = 0; |
| |
| public DummyManager() {} |
| |
| @Override |
| public void emergencyClose() {} |
| |
| @Override |
| public Connection borrowConnection(long aquireTimeout) { |
| borrows++; |
| return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers)); |
| } |
| |
| @Override |
| public Connection borrowConnection(ServerLocation server, |
| boolean onlyUseExistingCnx) { |
| borrows++; |
| return new DummyConnection(server); |
| } |
| |
| @Override |
| public void close(boolean keepAlive) {} |
| |
| @Override |
| public void returnConnection(Connection connection) { |
| returns++; |
| |
| } |
| |
| @Override |
| public void returnConnection(Connection connection, boolean accessed) { |
| returns++; |
| |
| } |
| |
| @Override |
| public void start(ScheduledExecutorService backgroundProcessor) {} |
| |
| @Override |
| public Connection exchangeConnection(Connection conn, Set<ServerLocation> excludedServers) { |
| if (excludedServers.size() >= numServers) { |
| throw new NoAvailableServersException(); |
| } |
| exchanges++; |
| return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers)); |
| } |
| |
| @Override |
| public int getConnectionCount() { |
| return 0; |
| } |
| } |
| |
| private class DummyConnection implements Connection { |
| |
| private ServerLocation server; |
| |
| public DummyConnection(ServerLocation serverLocation) { |
| this.server = serverLocation; |
| } |
| |
| @Override |
| public void close(boolean keepAlive) throws Exception {} |
| |
| @Override |
| public void destroy() { |
| invalidateConnections++; |
| } |
| |
| @Override |
| public boolean isDestroyed() { |
| return false; |
| } |
| |
| @Override |
| public ByteBuffer getCommBuffer() { |
| return null; |
| } |
| |
| @Override |
| public ServerLocation getServer() { |
| return server; |
| } |
| |
| @Override |
| public Socket getSocket() { |
| return null; |
| } |
| |
| @Override |
| public ConnectionStats getStats() { |
| return null; |
| } |
| |
| @Override |
| public int getDistributedSystemId() { |
| return 0; |
| } |
| |
| @Override |
| public Endpoint getEndpoint() { |
| return new Endpoint(null, null, null, null, null); |
| } |
| |
| @Override |
| public ServerQueueStatus getQueueStatus() { |
| return null; |
| } |
| |
| @Override |
| public Object execute(Op op) throws Exception { |
| return op.attempt(this); |
| } |
| |
| @Override |
| public void emergencyClose() {} |
| |
| @Override |
| public short getWanSiteVersion() { |
| return -1; |
| } |
| |
| @Override |
| public void setWanSiteVersion(short wanSiteVersion) {} |
| |
| @Override |
| public InputStream getInputStream() { |
| return null; |
| } |
| |
| @Override |
| public OutputStream getOutputStream() { |
| return null; |
| } |
| |
| @Override |
| public void setConnectionID(long id) {} |
| |
| @Override |
| public long getConnectionID() { |
| return 0; |
| } |
| } |
| |
| private class DummyEndpointManager implements EndpointManager { |
| |
| @Override |
| public void addListener(EndpointListener listener) {} |
| |
| @Override |
| public void close() {} |
| |
| @Override |
| public Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId) { |
| return null; |
| } |
| |
| @Override |
| public Map getEndpointMap() { |
| return null; |
| } |
| |
| @Override |
| public void removeListener(EndpointListener listener) {} |
| |
| @Override |
| public void serverCrashed(Endpoint endpoint) { |
| serverCrashes++; |
| } |
| |
| @Override |
| public int getConnectedServerCount() { |
| return 0; |
| } |
| |
| @Override |
| public Map getAllStats() { |
| return null; |
| } |
| |
| @Override |
| public String getPoolName() { |
| return null; |
| } |
| } |
| |
| private class DummyQueueManager implements QueueManager { |
| |
| int backups = 0; |
| int currentServer = 0; |
| |
| @Override |
| public QueueConnections getAllConnectionsNoWait() { |
| return getAllConnections(); |
| } |
| |
| @Override |
| public void emergencyClose() {} |
| |
| @Override |
| public QueueConnections getAllConnections() { |
| return new QueueConnections() { |
| @Override |
| public List getBackups() { |
| getBackups++; |
| ArrayList result = new ArrayList(backups); |
| for (int i = 0; i < backups; i++) { |
| result.add(new DummyConnection(new ServerLocation("localhost", currentServer++))); |
| } |
| return result; |
| } |
| |
| @Override |
| public Connection getPrimary() { |
| getPrimary++; |
| return new DummyConnection(new ServerLocation("localhost", currentServer++)); |
| } |
| |
| @Override |
| public QueueConnectionImpl getConnection(Endpoint ep) { |
| return null; |
| } |
| }; |
| } |
| |
| @Override |
| public void close(boolean keepAlive) {} |
| |
| @Override |
| public void start(ScheduledExecutorService background) {} |
| |
| @Override |
| public QueueState getState() { |
| return null; |
| } |
| |
| @Override |
| public InternalPool getPool() { |
| return null; |
| } |
| |
| @Override |
| public void readyForEvents(InternalDistributedSystem system) {} |
| |
| @Override |
| public InternalLogWriter getSecurityLogger() { |
| return null; |
| } |
| |
| @Override |
| public void checkEndpoint(ClientUpdater qc, Endpoint endpoint) {} |
| } |
| |
| } |