blob: 9fbd8fcfcefb282f3f26155e590df3be0b4c4fc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentMap;
import org.junit.Test;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.event.BulkOperationHolder;
import org.apache.geode.internal.cache.event.EventTracker;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.VMVersionTag;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.statistics.StatisticsClock;
public class DistributedRegionJUnitTest
extends AbstractDistributedRegionJUnitTest {
@Override
protected void setInternalRegionArguments(InternalRegionArguments ira) {}
@Override
protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache,
StatisticsClock statisticsClock) {
DistributedRegion region = new DistributedRegion("testRegion", ra, null,
cache, ira, disabledClock());
if (isConcurrencyChecksEnabled) {
region.enableConcurrencyChecks();
}
// since it is a real region object, we need to tell mockito to monitor it
region = spy(region);
doNothing().when(region)
.distributeUpdate(any(), anyLong(), anyBoolean(), anyBoolean(), any(),
anyBoolean());
doNothing().when(region).distributeDestroy(any(), any());
doNothing().when(region).distributeInvalidate(any());
doNothing().when(region).distributeUpdateEntryVersion(any());
return region;
}
@Override
protected void verifyDistributeUpdate(DistributedRegion region,
EntryEventImpl event, int cnt) {
region.virtualPut(event, false, false, null, false, 12345L, false);
// verify the result
if (cnt > 0) {
verify(region, times(cnt))
.distributeUpdate(eq(event), eq(12345L), anyBoolean(), anyBoolean(),
any(), anyBoolean());
} else {
verify(region, never())
.distributeUpdate(eq(event), eq(12345L), anyBoolean(), anyBoolean(),
any(), anyBoolean());
}
}
@Override
protected void verifyDistributeDestroy(DistributedRegion region,
EntryEventImpl event, int cnt) {
region.basicDestroy(event, false, null);
// verify the result
if (cnt > 0) {
verify(region, times(cnt)).distributeDestroy(eq(event), any());
} else {
verify(region, never()).distributeDestroy(eq(event), any());
}
}
@Override
protected void verifyDistributeInvalidate(DistributedRegion region,
EntryEventImpl event,
int cnt) {
region.basicInvalidate(event);
// verify the result
if (cnt > 0) {
verify(region, times(cnt)).distributeInvalidate(eq(event));
} else {
verify(region, never()).distributeInvalidate(eq(event));
}
}
@Override
protected void verifyDistributeUpdateEntryVersion(DistributedRegion region,
EntryEventImpl event,
int cnt) {
region.basicUpdateEntryVersion(event);
// verify the result
if (cnt > 0) {
verify(region, times(cnt)).distributeUpdateEntryVersion(eq(event));
} else {
verify(region, never()).distributeUpdateEntryVersion(eq(event));
}
}
@Test
public void retriedBulkOpGetsSavedVersionTag() {
DistributedRegion region = prepare(true, true);
DistributedMember member = mock(DistributedMember.class);
ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class);
byte[] memId = {1, 2, 3};
long threadId = 1;
long retrySeqId = 1;
ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
EventID retryEventID = new EventID(memId, threadId, retrySeqId);
boolean skipCallbacks = true;
int size = 2;
recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId,
size);
EventTracker eventTracker = region.getEventTracker();
ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map =
eventTracker.getRecordedBulkOpVersionTags();
BulkOperationHolder holder = map.get(tid);
EntryEventImpl retryEvent = EntryEventImpl
.create(region, Operation.PUTALL_CREATE, "key1",
"value1", null, false, member, !skipCallbacks, retryEventID);
retryEvent.setContext(memberId);
retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class));
region.hasSeenEvent(retryEvent);
assertTrue(retryEvent.getVersionTag()
.equals(holder.getEntryVersionTags().get(retryEventID)));
}
protected void recordPutAllEvents(DistributedRegion region, byte[] memId,
long threadId,
boolean skipCallbacks, DistributedMember member,
ClientProxyMembershipID memberId, int size) {
EntryEventImpl[] events = new EntryEventImpl[size];
EventTracker eventTracker = region.getEventTracker();
for (int i = 0; i < size; i++) {
events[i] = EntryEventImpl
.create(region, Operation.PUTALL_CREATE, "key" + i, "value" + i,
null, false, member, !skipCallbacks,
new EventID(memId, threadId, i + 1));
events[i].setContext(memberId);
events[i].setVersionTag(mock(VersionTag.class));
eventTracker.recordEvent(events[i]);
}
}
@Test
public void testThatMemoryThresholdInfoRelectsStateOfRegion() {
InternalDistributedMember internalDM = mock(
InternalDistributedMember.class);
DistributedRegion distRegion = prepare(true, false);
distRegion.addCriticalMember(internalDM);
MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
assertThat(distRegion.isMemoryThresholdReached()).isTrue();
assertThat(
distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
.containsExactly(internalDM);
assertThat(info.isMemoryThresholdReached()).isTrue();
assertThat(info.getMembersThatReachedThreshold())
.containsExactly(internalDM);
}
@Test
public void testThatMemoryThresholdInfoDoesNotChangeWhenRegionChanges() {
InternalDistributedMember internalDM = mock(
InternalDistributedMember.class);
DistributedRegion distRegion = prepare(true, false);
MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
distRegion.addCriticalMember(internalDM);
assertThat(distRegion.isMemoryThresholdReached()).isTrue();
assertThat(
distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
.containsExactly(internalDM);
assertThat(info.isMemoryThresholdReached()).isFalse();
assertThat(info.getMembersThatReachedThreshold()).isEmpty();
}
@Test
public void testRetriedBasicBridgeReplaceShouldNotCreateNewVersionTag() {
DistributedRegion region = prepare(true, true);
DistributedMember member = mock(DistributedMember.class);
ClientProxyMembershipID client = mock(ClientProxyMembershipID.class);
when(client.getDistributedMember()).thenReturn(member);
byte[] memId = {1, 2, 3};
long threadId = 1;
long retrySeqId = 1;
EventID retryEventID = new EventID(memId, threadId, retrySeqId);
final EventIDHolder clientEvent = new EventIDHolder(retryEventID);
clientEvent.setOperation(Operation.UPDATE);
clientEvent.setPossibleDuplicate(true);
VMVersionTag tag = new VMVersionTag();
tag.setEntryVersion(1);
tag.setRegionVersion(1);
clientEvent.setVersionTag(tag);
doReturn(true).when(region)
.basicPut(any(), anyBoolean(), anyBoolean(), any(), anyBoolean());
assertFalse(region.shouldGenerateVersionTag(mock(RegionEntry.class), clientEvent));
region.basicBridgeReplace("key1", null, "value1", false, null, client, true,
clientEvent);
assertThat(clientEvent.getVersionTag().equals(tag));
region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent);
assertThat(clientEvent.getVersionTag().equals(tag));
}
}