blob: 23849835e3f8a041fae4809424f72248f7d98f2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.entries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
import org.apache.geode.cache.util.GatewayConflictResolver;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionClearedException;
import org.apache.geode.internal.cache.RegionEntryContext;
import org.apache.geode.internal.cache.TimestampedEntryEventImpl;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
import org.apache.geode.internal.offheap.OffHeapMemoryStats;
import org.apache.geode.internal.offheap.OutOfOffHeapMemoryListener;
import org.apache.geode.internal.offheap.SlabImpl;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
public class AbstractRegionEntryTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void whenMakeTombstoneHasSetValueThatThrowsExceptionDoesNotChangeValueToTombstone()
throws RegionClearedException {
LocalRegion lr = mock(LocalRegion.class);
RegionVersionVector<?> rvv = mock(RegionVersionVector.class);
when(lr.getVersionVector()).thenReturn(rvv);
VersionTag<?> vt = mock(VersionTag.class);
Object value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
assertEquals(value, re.getValueField());
Assertions.assertThatThrownBy(() -> re.makeTombstone(lr, vt))
.isInstanceOf(RuntimeException.class).hasMessage("throw exception on setValue(TOMBSTONE)");
Assert.assertEquals(Token.REMOVED_PHASE2, re.getValueField());
}
@Test
public void whenRegionEntryIsTombstoneThenIndexShouldNotBeUpdated() throws QueryException {
InternalRegion region = mock(InternalRegion.class);
IndexManager indexManager = mock(IndexManager.class);
when(region.getIndexManager()).thenReturn(indexManager);
AbstractRegionEntry abstractRegionEntry = mock(AbstractRegionEntry.class);
when(abstractRegionEntry.isTombstone()).thenReturn(true);
abstractRegionEntry.updateIndexOnDestroyOperation(region);
verify(indexManager, never()).updateIndexes(any(AbstractRegionEntry.class),
eq(IndexManager.REMOVE_ENTRY), eq(IndexProtocol.OTHER_OP));
}
@Test
public void whenPrepareValueForCacheCalledWithOffHeapEntryHasNewCachedSerializedValue()
throws RegionClearedException, IOException, ClassNotFoundException {
LocalRegion lr = mock(LocalRegion.class);
RegionEntryContext regionEntryContext = mock(RegionEntryContext.class);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
SlabImpl slab = new SlabImpl(1024); // 1k
MemoryAllocatorImpl ma =
MemoryAllocatorImpl.createForUnitTest(ooohml, stats, new SlabImpl[] {slab});
try {
when(regionEntryContext.getOffHeap()).thenReturn(true);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
assertEquals(value, re.getValueField());
EntryEventImpl entryEvent = new EntryEventImpl();
StoredObject valueForCache =
(StoredObject) re.prepareValueForCache(regionEntryContext, value, entryEvent, true);
final byte[] cachedSerializedNewValue = entryEvent.getCachedSerializedNewValue();
assertNotNull(cachedSerializedNewValue);
valueForCache.checkDataEquals(cachedSerializedNewValue);
DataInputStream dataInputStream =
new DataInputStream(new ByteArrayInputStream(cachedSerializedNewValue));
Object o = DataSerializer.readObject(dataInputStream);
assertEquals(o, value);
} finally {
MemoryAllocatorImpl.freeOffHeapMemory();
}
}
@Test
public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() {
// create 2 gateway events with the same dsid, but different timestamp
// apply them in misorder, it should throw CME before calling resolver
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
when(cache.getGatewayConflictResolver()).thenReturn(resolver);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
tag1.setDistributedSystemId(3);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(2);
tag2.setDistributedSystemId(3);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag2);
assertEquals(tag2.getVersionTimeStamp(),
re.getVersionStamp().asVersionTag().getVersionTimeStamp());
assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag1 with smaller timestamp should throw CME
entryEvent1.setVersionTag(tag1);
expectedException.expect(ConcurrentCacheModificationException.class);
expectedException.expectMessage("conflicting WAN event detected");
re.processVersionTag(entryEvent1);
verify(resolver, never()).onEvent(any(), any());
}
@Test
public void gatewayEventsFromSameDSInCorrectOrderOfTimestampShouldPass() {
// create 2 gateway events with the same dsid, but different timestamp
// apply them in correct order, it should pass
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
when(cache.getGatewayConflictResolver()).thenReturn(null);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
tag1.setDistributedSystemId(3);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(2);
tag2.setDistributedSystemId(3);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag1);
assertEquals(tag1.getVersionTimeStamp(),
re.getVersionStamp().asVersionTag().getVersionTimeStamp());
assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag2 should be accepted
entryEvent1.setVersionTag(tag2);
re.processVersionTag(entryEvent1);
}
@Test
public void stampWithoutDSIDShouldAcceptAnyTag() {
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
when(cache.getGatewayConflictResolver()).thenReturn(null);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
tag1.setDistributedSystemId(-1);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(2);
tag2.setDistributedSystemId(2);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag1);
assertEquals(tag1.getVersionTimeStamp(),
re.getVersionStamp().asVersionTag().getVersionTimeStamp());
assertEquals(-1, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag2 should be accepted
entryEvent1.setVersionTag(tag2);
re.processVersionTag(entryEvent1);
}
@Test
public void applyingGatewayEventsFromDifferentDSShouldAcceptBiggerTimestamp() {
// create 2 gateway events:
// tag1 with smaller distributed system ids (DSIDs) and bigger timestamp
// tag2 with bigger DSID and smaller timestamp
// set tag2 into stamp. Apply event with tag1 should pass
// i.e. We compare timestamp first, then DSID
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
when(cache.getGatewayConflictResolver()).thenReturn(null);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(2);
tag1.setDistributedSystemId(1);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(1);
tag2.setDistributedSystemId(2);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag2);
assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag1 with bigger timestamp should pass
entryEvent1.setVersionTag(tag1);
re.processVersionTag(entryEvent1);
}
@Test
public void applyingGatewayEventsFromSmallerDSWithSameTimestampShouldThrowCMEIfNoResolver() {
// create 2 gateway events with different distributed system ids (DSIDs), with same timestamp
// set the one with bigger DSID into stamp.
// Apply the one with smaller DSID show throw CME
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
when(cache.getGatewayConflictResolver()).thenReturn(null);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
tag1.setDistributedSystemId(1);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(1);
tag2.setDistributedSystemId(2);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag2);
assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag1 with smaller timestamp should throw CME
entryEvent1.setVersionTag(tag1);
expectedException.expect(ConcurrentCacheModificationException.class);
expectedException.expectMessage("conflicting WAN event detected");
re.processVersionTag(entryEvent1);
}
@Test
public void resolverShouldHandleConflictEventsFromDifferentDS() {
// create 2 gateway events with different distributed system ids (DSIDs), with same timestamp
// set the one with bigger DSID into stamp.
// Usually, apply the one with smaller DSID should throw CME, but since there's resolver
// resolver will accept the event
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
AbstractRegionEntry re = new TestableRegionEntry(lr, value);
InternalDistributedMember member1 = mock(InternalDistributedMember.class);
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1 = Mockito.spy(entryEvent1);
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
when(cache.getGatewayConflictResolver()).thenReturn(resolver);
doNothing().when(resolver).onEvent(any(), any());
TimestampedEntryEventImpl timestampedEvent = mock(TimestampedEntryEventImpl.class);
// when(entryEvent1.getTimestampedEvent(anyInt(), anyInt(), anyLong(), anyLong()));
doReturn(timestampedEvent).when(entryEvent1).getTimestampedEvent(anyInt(), anyInt(), anyLong(),
anyLong());
when(timestampedEvent.hasOldValue()).thenReturn(true);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
tag1.setDistributedSystemId(1);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(1);
tag2.setDistributedSystemId(2);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag2);
assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
entryEvent1.setVersionTag(tag1);
re.processVersionTag(entryEvent1);
verify(resolver, Mockito.times(1)).onEvent(any(), any());
}
public static class TestableRegionEntry extends AbstractRegionEntry
implements OffHeapRegionEntry, VersionStamp {
private Object value;
private VersionTag tag;
private long timeStamp = 0;
private int dsId;
protected TestableRegionEntry(RegionEntryContext context, Object value) {
super(context, value);
}
@Override
public void setVersionTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
@Override
public void setVersions(VersionTag tag) {
this.tag = tag;
timeStamp = tag.getVersionTimeStamp();
dsId = tag.getDistributedSystemId();
}
@Override
public void setMemberID(VersionSource memberID) {
}
@Override
public VersionTag asVersionTag() {
return tag;
}
@Override
public void processVersionTag(InternalRegion region, VersionTag tag, boolean isTombstoneFromGII,
boolean hasDelta, VersionSource versionSource,
InternalDistributedMember sender, boolean checkConflicts) {
}
@Override
public VersionStamp getVersionStamp() {
return this;
}
@Override
protected Object getValueField() {
return value;
}
@Override
protected void setValueField(Object v) {
value = v;
}
@Override
public void setValue(RegionEntryContext context, @Unretained Object value)
throws RegionClearedException {
super.setValue(context, value);
if (value == Token.TOMBSTONE) {
throw new RuntimeException("throw exception on setValue(TOMBSTONE)");
}
}
@Override
public int getEntryHash() {
return 0;
}
@Override
public HashEntry<Object, Object> getNextEntry() {
return null;
}
@Override
public void setNextEntry(HashEntry<Object, Object> n) {}
@Override
public Object getKey() {
return null;
}
@Override
protected long getLastModifiedField() {
return 0;
}
@Override
protected boolean compareAndSetLastModifiedField(long expectedValue, long newValue) {
return false;
}
@Override
protected void setEntryHash(int v) {}
@Override
public void release() {
}
@Override
public long getAddress() {
return 0;
}
@Override
public boolean setAddress(long expectedAddr, long newAddr) {
return false;
}
@Override
public int getEntryVersion() {
return 0;
}
@Override
public long getRegionVersion() {
return 0;
}
@Override
public long getVersionTimeStamp() {
return timeStamp;
}
@Override
public VersionSource getMemberID() {
return null;
}
@Override
public int getDistributedSystemId() {
return dsId;
}
@Override
public short getRegionVersionHighBytes() {
return 0;
}
@Override
public int getRegionVersionLowBytes() {
return 0;
}
}
}