blob: c91816fa3988d09cfa677a3d4716bb912e12c587 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.OngoingStubbing;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor;
class ExecuteFunctionTestSupport {
static final boolean OPTIMIZE_FOR_WRITE_SETTING = false;
static final byte FUNCTION_HAS_RESULT = (byte) 1;
static final int NUMBER_OF_SERVERS = 2;
static final String REGION_NAME = "REGION1";
static final String FUNCTION_NAME = "FUNCTION1";
static final boolean ALL_BUCKETS_SETTING = true;
public enum HAStatus {
NOT_HA, HA
}
enum FunctionIdentifierType {
STRING, OBJECT_REFERENCE
}
enum FailureMode {
NO_FAILURE,
THROW_SERVER_CONNECTIVITY_EXCEPTION,
THROW_SERVER_OPERATION_EXCEPTION,
THROW_NO_AVAILABLE_SERVERS_EXCEPTION,
THROW_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION
}
private final Function<Integer> function;
private final ServerRegionFunctionExecutor executor;
private final ResultCollector<Integer, Collection<Integer>> resultCollector;
/*
* It would be nice to make this variable have type Cache (an interface) but that
* won't work because the methods we are testing cast the reference to a InternalCache.
*/
private final InternalCache cache;
/*
* It would be nice to make this variable have type Region (an interface) but that
* won't work because the methods we are testing cast the reference to a LocalRegion.
*/
private final LocalRegion region;
/*
* It would be nice to make this variable have type ExecutablePool (an interface) but that
* won't work because the methods we are testing cast the reference to a PoolImpl.
*/
private final PoolImpl executablePool;
/**
* Add a {@link OngoingStubbing#thenThrow(Throwable...)} behavior to
* {@param whenPoolExecute}
*
* This method has to be {@code static} because it is called before
* {@link ExecuteFunctionTestSupport} is constructed.
*
* @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()}
* methods on {@link PoolImpl}
* @param failureMode is the {@link FailureMode} that determines the kind of exception
* to {@code throw}
*/
static void thenThrow(final OngoingStubbing<Object> whenPoolExecute,
final FailureMode failureMode) {
/*
* We know execute() handles three kinds of exception from the pool:
*
* InternalFunctionInvocationTargetException
*
* keep trying without regard to retry attempt limit
*
* ServerOperationException | NoAvailableServersException
*
* re-throw
*
* ServerConnectivityException
*
* keep trying up to retry attempt limit
*/
switch (failureMode) {
case NO_FAILURE:
whenPoolExecute.thenReturn(null);
break;
case THROW_SERVER_CONNECTIVITY_EXCEPTION:
whenPoolExecute.thenThrow(new ServerConnectivityException("testing"));
break;
case THROW_SERVER_OPERATION_EXCEPTION:
whenPoolExecute.thenThrow(new ServerOperationException("testing"));
break;
case THROW_NO_AVAILABLE_SERVERS_EXCEPTION:
whenPoolExecute.thenThrow(new NoAvailableServersException("testing"));
break;
case THROW_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION:
/*
* The product assumes that InternalFunctionInvocationTargetException will only be
* encountered
* when the target cache is going down. That condition is transient so the product
* assume
* it's
* ok to keep trying forever. In order to test this situation (and for the test to not
* hang)
* we throw this exception first, then we throw ServerConnectivityException
*/
whenPoolExecute.thenThrow(new InternalFunctionInvocationTargetException("testing"))
.thenThrow(new ServerConnectivityException("testing"));
break;
default:
throw new AssertionError("unknown FailureMode type: " + failureMode);
}
}
@SuppressWarnings("unchecked")
ExecuteFunctionTestSupport(
final HAStatus haStatus,
final FailureMode failureMode,
final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) {
final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class);
when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS);
final ConnectionSource connectionSource = mock(ConnectionSource.class);
when(connectionSource.getAllServers()).thenReturn(servers);
function = (Function<Integer>) mock(Function.class);
when(function.isHA()).thenReturn(toBoolean(haStatus));
when(function.hasResult()).thenReturn(true);
executor = mock(ServerRegionFunctionExecutor.class);
when(executor.withFilter(ArgumentMatchers.<Set<Integer>>any())).thenReturn(executor);
resultCollector = (ResultCollector<Integer, Collection<Integer>>) mock(ResultCollector.class);
cache = mock(InternalCache.class);
when(cache.getClientMetadataService()).thenReturn(mock(ClientMetadataService.class));
region = mock(LocalRegion.class);
when(region.getCache()).thenReturn(cache);
executablePool = mock(PoolImpl.class);
when(executablePool.getConnectionSource()).thenReturn(connectionSource);
addPoolMockBehavior.accept(executablePool, failureMode);
}
Function<Integer> getFunction() {
return function;
}
ServerRegionFunctionExecutor getExecutor() {
return executor;
}
ResultCollector<Integer, Collection<Integer>> getResultCollector() {
return resultCollector;
}
InternalCache getCache() {
return cache;
}
LocalRegion getRegion() {
return region;
}
PoolImpl getExecutablePool() {
return executablePool;
}
boolean toBoolean(final HAStatus haStatus) {
return haStatus == HA;
}
}