blob: ceda46a867a1ae2f6c71a9a8b1ea3a489bb10579 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.PartitionRebalanceDetailsImpl;
import org.apache.geode.internal.cache.control.ResourceManagerStats;
import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator.Completion;
public class BucketOperatorWrapperTest {
private ResourceManagerStats stats;
private PartitionedRegion leaderRegion;
private PartitionedRegion colocatedRegion;
private Set<PartitionRebalanceDetailsImpl> rebalanceDetails;
private BucketOperatorWrapper wrapper;
private BucketOperatorImpl delegate;
private Map<String, Long> colocatedRegionBytes;
private int bucketId = 1;
private InternalDistributedMember sourceMember, targetMember;
private static final String PR_LEADER_REGION_NAME = "leadregion1";
private static final String PR_COLOCATED_REGION_NAME = "coloregion1";
@Before
public void setUp() throws UnknownHostException {
colocatedRegionBytes = new HashMap<String, Long>();
colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L);
colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L);
sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
stats = mock(ResourceManagerStats.class);
doNothing().when(stats).startBucketCreate(anyInt());
doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
leaderRegion = mock(PartitionedRegion.class);
doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath();
colocatedRegion = mock(PartitionedRegion.class);
doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath();
rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion));
rebalanceDetails.add(details);
delegate = mock(BucketOperatorImpl.class);
wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion);
}
@Test
public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() {
Completion completionSentToWrapper = mock(Completion.class);
doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId),
eq(colocatedRegionBytes), any(Completion.class));
}
@Test
public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
// 3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
((Completion) invocation.getArguments()[3]).onSuccess();
return null;
}
}).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId),
eq(colocatedRegionBytes), any(Completion.class));
Completion completionSentToWrapper = mock(Completion.class);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
// verify create buckets is recorded
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
if (details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)),
anyLong());
else if (details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME),
0); // elapsed is recorded only if its leader
}
}
@Test
public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
// 3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
((Completion) invocation.getArguments()[3]).onFailure();
return null;
}
}).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId),
eq(colocatedRegionBytes), any(Completion.class));
Completion completionSentToWrapper = mock(Completion.class);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
// verify create buckets is not recorded
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
verify(details, times(0)).incTransfers(anyLong(), anyLong());
}
}
@Test
public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
// 3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
((Completion) invocation.getArguments()[3]).onFailure();
return null;
}
}).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId),
eq(colocatedRegionBytes), any(Completion.class));
Completion completionSentToWrapper = mock(Completion.class);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
// verify onFailure is invoked
verify(completionSentToWrapper, times(1)).onFailure();
}
@Test
public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
// 3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
((Completion) invocation.getArguments()[3]).onSuccess();
return null;
}
}).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId),
eq(colocatedRegionBytes), any(Completion.class));
Completion completionSentToWrapper = mock(Completion.class);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
completionSentToWrapper);
verify(completionSentToWrapper, times(1)).onSuccess();
}
@Test
public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() {
doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId,
colocatedRegionBytes);
wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
// verify the delegate is invoked
verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId,
colocatedRegionBytes);
// verify we recorded necessary stats
verify(stats, times(1)).startBucketTransfer(anyInt());
verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() {
doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId,
colocatedRegionBytes);
wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
// verify the details is updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
if (details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)),
anyLong());
else if (details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME),
0); // elapsed is recorded only if its leader
}
// verify we recorded necessary stats
verify(stats, times(1)).startBucketTransfer(anyInt());
verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() {
doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId,
colocatedRegionBytes);
wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
// verify the details is not updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
verify(details, times(0)).incTransfers(anyLong(), anyLong());
}
// verify we recorded necessary stats
verify(stats, times(1)).startBucketTransfer(anyInt());
verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() {
wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
// verify the delegate is invoked
verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes);
// verify we recorded necessary stats
verify(stats, times(1)).startBucketRemove(anyInt());
verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() {
doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
// verify the details is updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
if (details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))),
anyLong());
else if (details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); // elapsed
// is
// recorded
// only
// if
// its
// leader
}
// verify we recorded necessary stats
verify(stats, times(1)).startBucketRemove(anyInt());
verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() {
doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
// verify the details is not updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
verify(details, times(0)).incTransfers(anyLong(), anyLong());
}
// verify we recorded necessary stats
verify(stats, times(1)).startBucketRemove(anyInt());
verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
}
@Test
public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() {
wrapper.movePrimary(sourceMember, targetMember, bucketId);
// verify the delegate is invoked
verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId);
// verify we recorded necessary stats
verify(stats, times(1)).startPrimaryTransfer(anyInt());
verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
}
@Test
public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() {
doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
wrapper.movePrimary(sourceMember, targetMember, bucketId);
// verify the details is updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
if (details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
verify(details, times(1)).incPrimaryTransfers(anyLong());
else if (details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
verify(details, times(1)).incPrimaryTransfers(0); // elapsed is recorded only if its leader
}
// verify we recorded necessary stats
verify(stats, times(1)).startPrimaryTransfer(anyInt());
verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
}
@Test
public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() {
doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
wrapper.movePrimary(sourceMember, targetMember, bucketId);
// verify the details is not updated with bytes transfered
for (PartitionRebalanceDetailsImpl details : rebalanceDetails) {
verify(details, times(0)).incTransfers(anyLong(), anyLong());
}
// verify we recorded necessary stats
verify(stats, times(1)).startPrimaryTransfer(anyInt());
verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
}
}