blob: e6abad59ffea44be2c96ade4b0f58c29f531ca46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.parallel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.test.fake.Fakes;
public class ParallelGatewaySenderEventProcessorJUnitTest {
private GemFireCacheImpl cache;
private AbstractGatewaySender sender;
@Before
public void setUpGemFire() {
createCache();
createGatewaySender();
}
private void createCache() {
// Mock cache
this.cache = Fakes.cache();
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(this.cache.getDistributedSystem()).thenReturn(ids);
}
private void createGatewaySender() {
// Mock gateway sender
this.sender = ParallelGatewaySenderHelper.createGatewaySender(this.cache);
when(this.sender.isBatchConflationEnabled()).thenReturn(true);
when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
}
@Test
public void validateBatchConflationWithBatchContainingDuplicateConflatableEvents()
throws Exception {
// Create a ParallelGatewaySenderEventProcessor
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
// Create a batch of conflatable events with duplicates
List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
when(lr.getCache()).thenReturn(this.cache);
Object lastUpdateValue = "Object_13964_5";
long lastUpdateSequenceId = 104, lastUpdateShadowKey = 28161;
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_13964", "Object_13964_1", 100, 27709));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", "Object_13964_2", 101, 27822));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", "Object_13964_3", 102, 27935));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", "Object_13964_4", 103, 28048));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", lastUpdateValue, lastUpdateSequenceId, lastUpdateShadowKey));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_13964", null, 104, 28274));
// Conflate the batch of events
List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
// Verify:
// - the batch contains 3 events after conflation
// - they are CREATE, UPDATE, and DESTROY
// - the UPDATE event is the correct one
assertThat(conflatedEvents.size()).isEqualTo(3);
GatewaySenderEventImpl gsei1 = conflatedEvents.get(0);
assertThat(gsei1.getOperation()).isEqualTo(Operation.CREATE);
GatewaySenderEventImpl gsei2 = conflatedEvents.get(1);
assertThat(gsei2.getOperation()).isEqualTo(Operation.UPDATE);
GatewaySenderEventImpl gsei3 = conflatedEvents.get(2);
assertThat(gsei3.getOperation()).isEqualTo(Operation.DESTROY);
assertThat(gsei2.getDeserializedValue()).isEqualTo(lastUpdateValue);
assertThat(gsei2.getEventId().getSequenceID()).isEqualTo(lastUpdateSequenceId);
assertThat(gsei2.getShadowKey()).isEqualTo(lastUpdateShadowKey);
}
// See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
// and the processor started dispatching events before the actual region was available.
@Test
public void verifyBatchConflationWithNullEventRegionDoesNowThrowException()
throws Exception {
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
List<GatewaySenderEventImpl> events = new ArrayList<GatewaySenderEventImpl>();
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
when(lr.getCache()).thenReturn(this.cache);
// Create two events for the same key, so that conflation will be needed. Mock the getRegion()
// value to return as null so we will hit the NPE if
// it is referenced.
GatewaySenderEventImpl gsei1 =
spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_13964", "Object_13964_1", 100, 27709));
doReturn(null).when(gsei1).getRegion();
GatewaySenderEventImpl gsei2 =
spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", "Object_13964_2", 101, 27822));
doReturn(null).when(gsei2).getRegion();
events.add(gsei1);
events.add(gsei2);
assertThatCode(() -> processor.conflate(events)).doesNotThrowAnyException();
}
@Test
public void validateBatchConflationWithBatchContainingDuplicateNonConflatableEvents()
throws Exception {
// In certain cases, a batch could contain multiple CREATE events on the same region and key.
// These events should not be conflated such that the order is incorrect.
// Here is an example batch that contains 2 CREATE events on key=Object_14024:
// SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=100;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_13964;shadowKey=27709]
// SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=101;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_14024;shadowKey=27822]
// SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=102;bucketId=24];operation=DESTROY;region=/dataStoreRegion;key=Object_13964;shadowKey=27935]
// SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=104;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_14024;shadowKey=28161]
// Create a ParallelGatewaySenderEventProcessor
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
// Create a batch of non-conflatable events with one duplicate (not including the shadowKey)
List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
when(lr.getCache()).thenReturn(this.cache);
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_13964", "Object_13964", 100, 27709));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_14024", "Object_14024", 101, 27822));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_13964", null, 102, 27935));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_14024", "Object_14024", 104, 28161));
// Conflate the batch of events
List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
// Assert no events were conflated incorrectly
assertThat(originalEvents).isEqualTo(conflatedEvents);
}
@Test
public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents()
throws Exception {
// In certain cases, a batch could contain multiple equal conflatable events. These events
// should not be conflated such that the order is incorrect.
// Here is an example batch with multiple equal events on shadowKey=16813 and shadowKey=16700:
// SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587]
// SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
// SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
// SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926]
// SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
// SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
// Create a ParallelGatewaySenderEventProcessor
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
// Create mock region
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
when(lr.getCache()).thenReturn(this.cache);
// Create a batch of conflatable and non-conflatable events with one duplicate conflatable event
// and one duplicate non-conflatable event (including the shadowKey)
List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_6079", "Object_6079", 104, 2, 89, 16587));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_6079", null, 104, 3, 89, 16700));
originalEvents
.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
"Object_7731", "Object_7731", 112, 9, 89, 16813));
originalEvents
.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
"Object_6591", "Object_6591", 112, 12, 89, 16926));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_6079", null, 104, 3, 89, 16700));
originalEvents
.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
"Object_7731", "Object_7731", 112, 9, 89, 16813));
logEvents("original", originalEvents);
// Conflate the batch of events
List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
logEvents("conflated", conflatedEvents);
assertThat(conflatedEvents.size()).isEqualTo(4);
assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0));
assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1));
assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2));
assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
}
private void logEvents(String message, List<GatewaySenderEventImpl> events) {
StringBuilder builder = new StringBuilder();
builder.append("The list contains the following ").append(events.size()).append(" ")
.append(message).append(" events:");
for (GatewaySenderEventImpl event : events) {
builder.append("\t\n").append(event.toSmallString());
}
System.out.println(builder);
}
}