blob: 5cd858dd86367dda37bd62bc31240ee8829b66ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.test.dunit.Wait.pause;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
/*
* Base class for tests of FunctionService that are agnostic to the type of Execution that they are
* running on. The goal is to completely cover all common behavior of sending results and sending
* exceptions here and have them run with all topologies in child classes.
*/
public abstract class FunctionServiceBase extends JUnit4CacheTestCase {
@Rule
public transient ExpectedException thrown = ExpectedException.none();
protected transient CustomCollector customCollector;
@Before
public void createCollector() {
this.customCollector = new CustomCollector();
}
/**
* Return the execution used to execute functions for this test. Subclasses should override this
* to provide a specific execution, for example onMember.
*/
public abstract Execution getExecution();
/**
* Return the number of members the function is expected to execute on
*/
public abstract int numberOfExecutions();
@Override
public Properties getDistributedSystemProperties() {
Properties result = super.getDistributedSystemProperties();
result.put(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
"org.apache.geode.internal.cache.execute.**;org.apache.geode.test.dunit.**");
return result;
}
@Test
public void functionContextGetCacheIsNotNullAndOpen() {
ResultCollector rc = getExecution().execute((context) -> {
Cache cache = context.getCache();
assertNotNull(cache);
assertFalse(cache.isClosed());
context.getResultSender().lastResult("done");
});
List<String> results = (List<String>) rc.getResult();
assertEquals(numberOfExecutions(), results.size());
results.stream().forEach(element -> assertEquals("done", element));
}
@Test
public void defaultCollectorReturnsSingleResult() {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().lastResult("done");
});
List<String> results = (List<String>) rc.getResult();
assertEquals(numberOfExecutions(), results.size());
results.stream().forEach(element -> assertEquals("done", element));
}
@Test()
public void defaultCollectorReturnsAllIntermediateResults() {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendResult("one");
context.getResultSender().lastResult("two");
});
final List<String> result = (List<String>) rc.getResult();
assertEquals(numberOfExecutions(), result.stream().filter(s -> s.equals("one")).count());
assertEquals(numberOfExecutions(), result.stream().filter(s -> s.equals("two")).count());
}
@Test()
public void defaultCollectorThrowsExceptionAfterFunctionThrowsIllegalState() {
IgnoredException.addIgnoredException("java.lang.IllegalStateException");
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
thrown.expect(FunctionException.class);
// GEODE-1762 - clients wrap cause in a ServerOperationException
// thrown.expectCause(isA(IllegalStateException.class));
ResultCollector rc = getExecution().execute((context) -> {
throw new IllegalStateException();
});
final Object result = rc.getResult();
}
@Test()
public void defaultCollectorThrowsExceptionAfterFunctionThrowsFunctionException() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
thrown.expect(FunctionException.class);
ResultCollector rc = getExecution().execute((context) -> {
throw new FunctionException();
});
final Object result = rc.getResult();
}
/**
* Tests what happens if a function returns an exception as a result. This is kind a weird, but it
* seems that the default collector will just throw it as an exception
*/
@Test()
public void defaultCollectorThrowsExceptionAfterFunctionReturnsIllegalStateException() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
// GEODE-1762 - clients throw a ServerOperationException
thrown.expect(Exception.class);
// thrown.expect(FunctionException.class);
// thrown.expectCause(isA(IllegalStateException.class));
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().lastResult(new IllegalStateException());
});
final Object result = rc.getResult();
}
@Test
public void defaultCollectorThrowsExceptionAfterFunctionReturnsFunctionException() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
thrown.expect(FunctionException.class);
thrown.expectCause(is((Throwable) null));
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().lastResult(new FunctionException());
});
final Object result = rc.getResult();
}
@Test
public void defaultCollectorThrowsExceptionAfterFunctionReturnsIllegalStateExceptionAsIntermediateResult() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
// GEODE-1762 - client throws a ServerOperationException
thrown.expect(Exception.class);
// thrown.expect(FunctionException.class);
// thrown.expectCause(isA(IllegalStateException.class));
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendResult(new IllegalStateException());
context.getResultSender().lastResult("done");
});
final Object result = rc.getResult();
}
@Test
public void defaultCollectorThrowsExceptionAfterFunctionReturnsFunctionExceptionAsIntermediateResult() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
thrown.expect(FunctionException.class);
thrown.expectCause(is((Throwable) null));
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendResult(new FunctionException());
context.getResultSender().lastResult("done");
});
final Object result = rc.getResult();
}
@Test
public void defaultCollectorReturnsResultOfSendException() {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendException(new IllegalStateException());
});
final List<Object> result = (List<Object>) rc.getResult();
assertEquals(numberOfExecutions(), result.size());
result.stream()
.forEach(element -> assertEquals(IllegalStateException.class, element.getClass()));
}
@Test
public void defaultCollectorReturnsResultOfSendFunctionException() {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendException(new FunctionException());
});
final List<Object> result = (List<Object>) rc.getResult();
assertEquals(numberOfExecutions(), result.size());
result.stream().forEach(element -> assertEquals(FunctionException.class, element.getClass()));
}
@Test
public void customCollectorDoesNotSeeExceptionFunctionThrowsIllegalState() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
IgnoredException.addIgnoredException("java.lang.IllegalStateException");
try {
ResultCollector rc = getExecution().withCollector(customCollector).execute((context) -> {
throw new IllegalStateException();
});
rc.getResult();
fail("should have received an exception");
} catch (FunctionException expected) {
}
Assert.assertEquals(0, customCollector.getResult().size());
}
@Test
public void customCollectorDoesNotSeeExceptionFunctionThrowsFunctionException() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
try {
ResultCollector rc = getExecution().withCollector(customCollector).execute((context) -> {
throw new FunctionException();
});
rc.getResult();
fail("should have received an exception");
} catch (FunctionException expected) {
}
Assert.assertEquals(0, customCollector.getResult().size());
}
@Test
public void customCollectorDoesNotSeeExceptionAfterFunctionReturnsIllegalStateException() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
try {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().lastResult(new IllegalStateException());
});
rc.getResult();
fail("should have received an exception");
// GEODE-1762 - clients throw a ServerOperationException
} catch (Exception expected) {
}
Assert.assertEquals(0, customCollector.getResult().size());
}
@Test
public void customCollectorDoesNotSeeExceptionAfterFunctionReturnsIllegalStateExceptionAsIntermediateResult() {
// GEODE-1762 - clients throw from execute, but peers throw from rc.getResult
try {
ResultCollector rc = getExecution().execute((context) -> {
context.getResultSender().sendResult(new IllegalStateException());
context.getResultSender().lastResult("done");
});
rc.getResult();
fail("should have received an exception");
// GEODE-1762 - clients throw a ServerOperationException
} catch (Exception expected) {
}
Assert.assertEquals(0, customCollector.getResult().size());
}
@Test
public void customCollectorReturnsResultOfSendException() {
ResultCollector rc = getExecution().withCollector(customCollector).execute((context) -> {
context.getResultSender().sendException(new IllegalStateException());
});
final List<Object> result = (List<Object>) rc.getResult();
assertEquals(numberOfExecutions(), result.size());
result.stream()
.forEach(element -> assertEquals(IllegalStateException.class, element.getClass()));
assertEquals(result, customCollector.getResult());
}
@Test
public void customCollectorReturnsResultOfSendFunctionException() {
ResultCollector rc = getExecution().withCollector(customCollector).execute((context) -> {
context.getResultSender().sendException(new FunctionException());
});
final List<Object> result = (List<Object>) rc.getResult();
assertEquals(numberOfExecutions(), result.size());
result.stream().forEach(element -> assertEquals(FunctionException.class, element.getClass()));
assertEquals(result, customCollector.getResult());
}
/**
* Test that a custom result collector will still receive all partial results from other members
* when one member fails
*/
@Test
public void nonHAFunctionResultCollectorIsPassedPartialResultsAfterCloseCache() {
List<InternalDistributedMember> members = getAllMembers();
InternalDistributedMember firstMember = members.iterator().next();
// Execute a function which will close the cache on one member.
try {
ResultCollector rc = getExecution().withCollector(customCollector)
.execute(new CacheClosingNonHAFunction(firstMember));
rc.getResult();
fail("Should have thrown an exception");
} catch (Exception expected) {
// do nothing
}
members.remove(firstMember);
assertEquals(members, customCollector.getResult());
assertEquals(numberOfExecutions() - 1, customCollector.getResult().size());
}
/**
* Test the a result collector will timeout using the timeout provided
*/
@Test
public void resultCollectorHonorsFunctionTimeout() throws InterruptedException {
Function sleepingFunction = context -> {
try {
long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
while (!context.getCache().isClosed() && System.nanoTime() < endTime) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
// exit
}
context.getResultSender().sendResult("FAILED");
};
ResultCollector collector = getExecution().execute(sleepingFunction);
thrown.expect(FunctionException.class);
collector.getResult(1, TimeUnit.SECONDS);
}
protected List<InternalDistributedMember> getAllMembers() {
// Get a list of all of the members
ResultCollector rs = getExecution().execute(functionContext -> {
functionContext.getResultSender()
.lastResult(InternalDistributedSystem.getAnyInstance().getDistributedMember());
});
return (List<InternalDistributedMember>) rs.getResult();
}
public static class CustomCollector implements ResultCollector<Object, List<Object>> {
private ArrayList<Object> results = new ArrayList<Object>();
@Override
public List<Object> getResult() throws FunctionException {
return results;
}
@Override
public List<Object> getResult(final long timeout, final TimeUnit unit)
throws FunctionException, InterruptedException {
return results;
}
@Override
public void addResult(final DistributedMember memberID, final Object resultOfSingleExecution) {
results.add(resultOfSingleExecution);
}
@Override
public void endResults() {}
@Override
public void clearResults() {
results.clear();
}
}
/**
* A function which will close the cache if the given member matches the member executing this
* function
*/
public static class CacheClosingNonHAFunction implements Function, DataSerializable {
private InternalDistributedMember member;
public CacheClosingNonHAFunction() {} // for serialization
public CacheClosingNonHAFunction(final InternalDistributedMember member) {
this.member = member;
}
@Override
public void execute(FunctionContext context) {
final InternalDistributedMember myId =
InternalDistributedSystem.getAnyInstance().getDistributedMember();
if (myId.equals(member)) {
CacheFactory.getAnyInstance().close();
throw new CacheClosedException();
}
pause(1000);
context.getResultSender().lastResult(myId);
}
@Override
public boolean isHA() {
return false;
}
@Override
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(member, out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
member = DataSerializer.readObject(in);
}
}
}