blob: 5fa489fa9cd890a29d4d2cab2f600af36994bf34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.store.AlreadyClosedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.LuceneTest;
/**
* Unit test that async event listener dispatched the events to the appropriate repository.
*/
@Category({LuceneTest.class})
public class LuceneEventListenerJUnitTest {
private RepositoryManager manager;
private LuceneEventListener listener;
private InternalCache cache;
@Before
public void setup() {
cache = Fakes.cache();
manager = Mockito.mock(RepositoryManager.class);
listener = new LuceneEventListener(cache, manager);
listener = spy(listener);
}
@After
public void clearExceptionListener() {
LuceneEventListener.setExceptionObserver(null);
}
@Test
public void pdxReadSerializedFlagShouldBeResetBackToOriginalValueAfterProcessingEvents() {
ArgumentCaptor valueCapture = ArgumentCaptor.forClass(Boolean.class);
doNothing().when(cache).setPdxReadSerializedOverride((Boolean) valueCapture.capture());
boolean originalPdxReadSerialized = cache.getPdxReadSerializedOverride();
try {
cache.setPdxReadSerializedOverride(true);
Assert.assertTrue((Boolean) valueCapture.getValue());
listener.process(new LinkedList<>());
Assert.assertTrue(!(Boolean) valueCapture.getValue());
} finally {
cache.setPdxReadSerializedOverride(originalPdxReadSerialized);
}
}
@Test
public void testProcessBatch() throws Exception {
IndexRepository repo1 = Mockito.mock(IndexRepository.class);
IndexRepository repo2 = Mockito.mock(IndexRepository.class);
Region region1 = Mockito.mock(Region.class);
Region region2 = Mockito.mock(Region.class);
Object callback1 = new Object();
Mockito.when(manager.getRepository(eq(region1), any(), eq(callback1))).thenReturn(repo1);
Mockito.when(manager.getRepository(eq(region2), any(), eq(null))).thenReturn(repo2);
List<AsyncEvent> events = new ArrayList<AsyncEvent>();
int numEntries = 100;
for (int i = 0; i < numEntries; i++) {
AsyncEvent event = Mockito.mock(AsyncEvent.class);
Region region = i % 2 == 0 ? region1 : region2;
Object callback = i % 2 == 0 ? callback1 : null;
Mockito.when(event.getRegion()).thenReturn(region);
Mockito.when(event.getKey()).thenReturn(i);
Mockito.when(event.getCallbackArgument()).thenReturn(callback);
switch (i % 4) {
case 0:
case 1:
final EntrySnapshot entry = mock(EntrySnapshot.class);
when(entry.getRawValue(true)).thenReturn(i);
when(region.getEntry(eq(i))).thenReturn(entry);
break;
case 2:
case 3:
// Do nothing, get value will return a destroy
break;
}
events.add(event);
}
listener.processEvents(events);
verify(repo1, atLeast(numEntries / 4)).delete(any());
verify(repo1, atLeast(numEntries / 4)).update(any(), any());
verify(repo2, atLeast(numEntries / 4)).delete(any());
verify(repo2, atLeast(numEntries / 4)).update(any(), any());
verify(repo1, times(1)).commit();
verify(repo2, times(1)).commit();
}
@Test
public void shouldHandleBucketNotFoundException()
throws BucketNotFoundException {
Mockito.when(manager.getRepository(any(), any(), any()))
.thenThrow(BucketNotFoundException.class);
AsyncEvent event = Mockito.mock(AsyncEvent.class);
boolean result = listener.processEvents(Arrays.asList(event));
assertFalse(result);
verify(listener, times(1))
.logDebugMessage(startsWith("Bucket not found"), any(BucketNotFoundException.class));
}
@Test
public void shouldHandleCacheClosedException()
throws BucketNotFoundException {
Mockito.when(manager.getRepository(any(), any(), any()))
.thenThrow(CacheClosedException.class);
AsyncEvent event = Mockito.mock(AsyncEvent.class);
boolean result = listener.processEvents(Arrays.asList(event));
assertFalse(result);
verify(listener, times(1))
.logDebugMessage(contains("cache has been closed"), any(CacheClosedException.class));
}
@Test
public void shouldHandleAlreadyClosedException()
throws BucketNotFoundException {
Mockito.when(manager.getRepository(any(), any(), any()))
.thenThrow(AlreadyClosedException.class);
AsyncEvent event = Mockito.mock(AsyncEvent.class);
boolean result = listener.processEvents(Arrays.asList(event));
assertFalse(result);
verify(listener, times(1))
.logDebugMessage(contains("the lucene index is already closed"),
any(AlreadyClosedException.class));
}
@Test
public void shouldThrowAndCaptureIOException() throws BucketNotFoundException {
doAnswer((m) -> {
throw new IOException();
}).when(manager).getRepository(any(), any(), any());
AtomicReference<Throwable> lastException = new AtomicReference<>();
LuceneEventListener.setExceptionObserver(lastException::set);
AsyncEvent event = Mockito.mock(AsyncEvent.class);
try {
listener.processEvents(Arrays.asList(event));
fail("should have thrown an exception");
} catch (InternalGemFireError expected) {
assertEquals(expected, lastException.get());
}
}
}