blob: b3b38d85865a79d0b6c2dc877a5f61a8fd3171bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.test.dunit.Wait.pause;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
public abstract class FunctionServiceClientAccessorPRBase extends FunctionServiceClientBase {
public static final String REGION = "region";
protected transient Region<Object, Object> region;
@Before
public void createRegions() {
ClientCache cache = createServersAndClient(numberOfExecutions());
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
for (int i = 0; i < numberOfExecutions(); i++) {
VM vm = host.getVM(i);
createRegion(vm);
}
vm0.invoke(() -> {
Region region = getCache().getRegion(REGION);
PartitionRegionHelper.assignBucketsToPartitions(region);
});
region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(REGION);
}
private void createRegion(final VM vm) {
vm.invoke(() -> {
getCache().createRegionFactory(RegionShortcut.PARTITION).create(REGION);
});
}
@Override
public Execution getExecution() {
return FunctionService.onRegion(region);
}
/**
* Test that a custom result collector will still receive all partial results from other members
* when one source fails
*/
@Test
public void nonHAFunctionResultCollectorIsPassedPartialResultsAfterBucketMove() {
List<InternalDistributedMember> members = getAllMembers();
// Only run this test if there is more than two members
Assume.assumeTrue(members.size() >= 2);
final Iterator<InternalDistributedMember> iterator = members.iterator();
InternalDistributedMember firstMember = iterator.next();
InternalDistributedMember secondMember = iterator.next();
// Execute a function which will close the cache on one source.
try {
ResultCollector rc = getExecution().withCollector(customCollector)
.execute(new BucketMovingNonHAFunction(firstMember, secondMember));
rc.getResult();
fail("Should have thrown an exception");
} catch (Exception expected) {
// do nothing
}
assertEquals(new HashSet(members), new HashSet(customCollector.getResult()));
assertEquals(numberOfExecutions(), customCollector.getResult().size());
}
/**
* A function which will close the cache if the given source matches the source executing this
* function
*/
private class BucketMovingNonHAFunction implements Function {
private final InternalDistributedMember source;
private final InternalDistributedMember destination;
public BucketMovingNonHAFunction(final InternalDistributedMember source,
final InternalDistributedMember destination) {
this.source = source;
this.destination = destination;
}
@Override
public void execute(FunctionContext context) {
RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
final InternalDistributedMember myId =
InternalDistributedSystem.getAnyInstance().getDistributedMember();
// Move all buckets to the destination
if (myId.equals(source)) {
PartitionRegionHelper.moveData(regionFunctionContext.getDataSet(), source, destination,
100);
}
pause(1000);
context.getResultSender().lastResult(myId);
}
@Override
public boolean isHA() {
return false;
}
}
}