blob: def45941d79c49d4dc0ef77d13780796c8fc6051 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.event;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.VersionTag;
public class DistributedEventTrackerTest {
private DistributedRegion region;
private DistributedEventTracker eventTracker;
private ClientProxyMembershipID memberId;
private DistributedMember member;
@Before
public void setup() {
region = mock(DistributedRegion.class);
RegionAttributes<?, ?> regionAttributes = mock(RegionAttributes.class);
memberId = mock(ClientProxyMembershipID.class);
when(region.getAttributes()).thenReturn(regionAttributes);
when(regionAttributes.getDataPolicy()).thenReturn(mock(DataPolicy.class));
when(region.getConcurrencyChecksEnabled()).thenReturn(true);
when(region.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
when(region.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(region.getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);
member = mock(DistributedMember.class);
eventTracker = new DistributedEventTracker(region);
}
@Test
public void retriedBulkOpDoesNotRemoveRecordedBulkOpVersionTags() {
byte[] memId = {1, 2, 3};
long threadId = 1;
long retrySeqId = 1;
ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
EventID retryEventID = new EventID(memId, threadId, retrySeqId);
boolean skipCallbacks = true;
int size = 5;
recordPutAllEvents(memId, threadId, skipCallbacks, size);
ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map =
eventTracker.getRecordedBulkOpVersionTags();
BulkOperationHolder holder = map.get(tid);
int beforeSize = holder.getEntryVersionTags().size();
eventTracker.recordBulkOpStart(retryEventID, tid);
map = eventTracker.getRecordedBulkOpVersionTags();
holder = map.get(tid);
// Retried bulk op should not remove exiting BulkOpVersionTags
assertTrue(holder.getEntryVersionTags().size() == beforeSize);
}
private void recordPutAllEvents(byte[] memId, long threadId, boolean skipCallbacks, int size) {
for (int i = 0; i < size; i++) {
putEvent("key" + i, "value" + i, memId, threadId, skipCallbacks, i + 1);
EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i,
"value" + i, null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1));
event.setContext(memberId);
event.setVersionTag(mock(VersionTag.class));
eventTracker.recordEvent(event);
}
}
private void putEvent(String key, String value, byte[] memId, long threadId,
boolean skipCallbacks, int sequenceId) {
EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null,
false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId));
event.setContext(memberId);
event.setVersionTag(mock(VersionTag.class));
eventTracker.recordEvent(event);
}
private void putEvent(String key, String value, byte[] memId, long threadId,
boolean skipCallbacks, int sequenceId, VersionTag tag) {
EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null,
false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId));
event.setContext(memberId);
event.setVersionTag(tag);
eventTracker.recordEvent(event);
}
@Test
public void returnsCorrectNameOfCache() {
String testName = "testing";
when(region.getName()).thenReturn(testName);
eventTracker = new DistributedEventTracker(region);
assertEquals("Event Tracker for " + testName, eventTracker.getName());
}
@Test
public void initializationCorrectlyReadiesTheTracker() throws InterruptedException {
assertFalse(eventTracker.isInitialized());
eventTracker.setInitialized();
assertTrue(eventTracker.isInitialized());
eventTracker.waitOnInitialization();
}
@Test
public void startAndStopAddAndRemoveTrackerFromExpiryTask() {
EventTrackerExpiryTask task = mock(EventTrackerExpiryTask.class);
InternalCache cache = mock(InternalCache.class);
when(region.getCache()).thenReturn(cache);
when(cache.getEventTrackerTask()).thenReturn(task);
eventTracker = new DistributedEventTracker(region);
eventTracker.start();
verify(task, times(1)).addTracker(eventTracker);
eventTracker.stop();
verify(task, times(1)).removeTracker(eventTracker);
}
@Test
public void returnsEmptyMapIfRecordedEventsAreEmpty() {
assertEquals(0, eventTracker.getState().size());
}
@Test
public void returnsMapContainingSequenceIdHoldersCurrentlyPresent() {
EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null);
ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
eventTracker.recordSequenceNumber(threadId, sequenceIdHolder);
Map<ThreadIdentifier, EventSequenceNumberHolder> state = eventTracker.getState();
assertEquals(1, state.size());
EventSequenceNumberHolder returnedHolder = state.get(threadId);
assertNotNull(returnedHolder);
// the version tag is stripped out on purpose, so passed in object and returned one are not
// equal to each other
assertNull(returnedHolder.getVersionTag());
assertEquals(sequenceIdHolder.getLastSequenceNumber(), returnedHolder.getLastSequenceNumber());
}
@Test
public void setToInitializedWhenStateRecorded() {
eventTracker.recordState(null, Collections.emptyMap());
assertTrue(eventTracker.isInitialized());
}
@Test
public void setsInitialImageProvidedWhenStateRecorded() {
InternalDistributedMember distributedMember = mock(InternalDistributedMember.class);
eventTracker.recordState(distributedMember, Collections.emptyMap());
assertTrue(eventTracker.isInitialImageProvider(distributedMember));
}
@Test
public void entryInRecordedStateStoredWhenNotInCurrentState() {
EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null);
ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
Map<ThreadIdentifier, EventSequenceNumberHolder> state =
Collections.singletonMap(threadId, sequenceIdHolder);
eventTracker.recordState(null, state);
Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState();
assertEquals(storedState.get(threadId).getLastSequenceNumber(),
sequenceIdHolder.getLastSequenceNumber());
}
@Test
public void entryInRecordedStateNotStoredIfAlreadyInCurrentState() {
EventSequenceNumberHolder originalSequenceIdHolder = new EventSequenceNumberHolder(0L, null);
ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
Map<ThreadIdentifier, EventSequenceNumberHolder> state =
Collections.singletonMap(threadId, originalSequenceIdHolder);
eventTracker.recordState(null, state);
EventSequenceNumberHolder newSequenceIdHolder = new EventSequenceNumberHolder(1L, null);
Map<ThreadIdentifier, EventSequenceNumberHolder> newState =
Collections.singletonMap(threadId, newSequenceIdHolder);
eventTracker.recordState(null, newState);
Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState();
assertEquals(storedState.get(threadId).getLastSequenceNumber(),
originalSequenceIdHolder.getLastSequenceNumber());
}
@Test
public void hasSeenEventReturnsFalseForEventWithNoID() {
InternalCacheEvent event = mock(InternalCacheEvent.class);
when(event.getEventId()).thenReturn(null);
assertFalse(eventTracker.hasSeenEvent(event));
}
@Test
public void hasSeenEventReturnsFalseForNullEventID() {
assertFalse(eventTracker.hasSeenEvent((EventID) null));
assertFalse(eventTracker.hasSeenEvent(null, null));
}
@Test
public void hasNotSeenEventIDThatIsNotInRecordedEvents() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
assertFalse(eventTracker.hasSeenEvent(eventID));
}
@Test
public void hasSeenEventIDThatIsInRecordedEvents() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
recordSequence(eventID);
assertTrue(eventTracker.hasSeenEvent(eventID));
}
@Test
public void hasNotSeenEventIDWhosSequenceIDIsMarkedRemoved() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
EventSequenceNumberHolder sequenceIdHolder =
new EventSequenceNumberHolder(eventID.getSequenceID(), null);
sequenceIdHolder.setRemoved(true);
ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
eventTracker.recordSequenceNumber(threadId, sequenceIdHolder);
assertFalse(eventTracker.hasSeenEvent(eventID));
}
@Test
public void hasNotSeeEventIDWhosSequenceIDIsLargerThanSeen() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
recordSequence(eventID);
EventID higherSequenceID = new EventID(new byte[0], 0L, 1);
assertFalse(eventTracker.hasSeenEvent(higherSequenceID));
}
@Test
public void returnsNoTagIfNoSequenceForEvent() {
EventID eventID = new EventID(new byte[0], 0L, 1L);
assertNull(eventTracker.findVersionTagForSequence(eventID));
}
@Test
public void returnsNoTagIfSequencesDoNotMatchForEvent() {
EventID eventID = new EventID(new byte[0], 0L, 1);
recordSequence(eventID);
assertNull(eventTracker.findVersionTagForSequence(eventID));
}
@Test
public void returnsCorrectTagForEvent() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
EventSequenceNumberHolder sequenceIdHolder = recordSequence(eventID);
assertEquals(sequenceIdHolder.getVersionTag(), eventTracker.findVersionTagForSequence(eventID));
}
@Test
public void returnsNoTagIfNoBulkOpWhenNoEventGiven() {
assertNull(eventTracker.findVersionTagForBulkOp(null));
}
@Test
public void returnsNoTagIfNoBulkOpForEventWithSequence() {
EventID eventID = new EventID(new byte[0], 0L, 1L);
assertNull(eventTracker.findVersionTagForBulkOp(eventID));
}
@Test
public void returnsNoTagIfBulkOpsDoNotMatchForEvent() {
putEvent("key", "value", new byte[0], 0, false, 0);
EventID eventIDWithoutBulkOp = new EventID(new byte[0], 0L, 1);
assertNull(eventTracker.findVersionTagForBulkOp(eventIDWithoutBulkOp));
}
@Test
public void returnsCorrectTagForEventWithBulkOp() {
EventID eventID = new EventID(new byte[0], 0L, 0L);
VersionTag tag = mock(VersionTag.class);
putEvent("key", "value", new byte[0], 0, false, 0, tag);
assertEquals(tag, eventTracker.findVersionTagForBulkOp(eventID));
}
@Test
public void executesABulkOperations() {
EventID eventID = new EventID(new byte[0], 0L, 1L);
Runnable bulkOperation = mock(Runnable.class);
eventTracker.syncBulkOp(bulkOperation, eventID, false);
verify(bulkOperation, times(1)).run();
}
@Test
public void executesRunnableIfNotPartOfATransaction() {
EventID eventID = new EventID(new byte[0], 0L, 1L);
Runnable bulkOperation = mock(Runnable.class);
eventTracker.syncBulkOp(bulkOperation, eventID, true);
verify(bulkOperation, times(1)).run();
}
private EventSequenceNumberHolder recordSequence(EventID eventID) {
EventSequenceNumberHolder sequenceIdHolder =
new EventSequenceNumberHolder(eventID.getSequenceID(), null);
ThreadIdentifier threadIdentifier = new ThreadIdentifier(new byte[0], eventID.getThreadID());
eventTracker.recordSequenceNumber(threadIdentifier, sequenceIdHolder);
return sequenceIdHolder;
}
}