Revert "GEODE-8536: Allow limited retries when creating Lucene IndexWriter (#5589)" (#5656)
This reverts commit d2c1d67fd0010c208c320e41a1ef8d6e44710bd1.
diff --git a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
index bfe921f..a3f245d 100644
--- a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
+++ b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
@@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
+import junitparams.Parameters;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
@@ -110,6 +111,7 @@
}
@Test
+ @Parameters()
public void lockedBucketShouldPreventPrimaryFromMoving() {
dataStore1.invoke(this::initDataStoreAndLuceneIndex);
dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
diff --git a/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
deleted file mode 100644
index f5b41e7..0000000
--- a/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.cache.lucene.internal;
-
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.cache.lucene.LuceneQuery;
-import org.apache.geode.cache.lucene.LuceneQueryException;
-import org.apache.geode.cache.lucene.LuceneServiceProvider;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.PartitionedRegion;
-
-public class IndexRepositoryFactoryIntegrationTest {
- private Cache cache;
- public static final String INDEX_NAME = "testIndex";
- public static final String REGION_NAME = "testRegion";
- private IndexRepositoryFactory spyFactory;
- private LuceneQuery<Object, Object> luceneQuery;
-
- @Before
- public void setUp() {
- cache = new CacheFactory().create();
- LuceneServiceProvider.get(cache).createIndexFactory().setFields("field1", "field2")
- .create(INDEX_NAME, REGION_NAME);
-
- cache.createRegionFactory(RegionShortcut.PARTITION)
- .setPartitionAttributes(new PartitionAttributesFactory<>().setTotalNumBuckets(10).create())
- .create(REGION_NAME);
-
- spyFactory = spy(new IndexRepositoryFactory());
- PartitionedRepositoryManager.indexRepositoryFactory = spyFactory;
-
- luceneQuery = LuceneServiceProvider.get(cache).createLuceneQueryFactory()
- .create(INDEX_NAME, REGION_NAME, "hello", "field1");
- }
-
- @After
- public void tearDown() {
- ExecutorService lonerDistributionThreads =
- ((InternalCache) cache).getDistributionManager().getExecutors().getThreadPool();
- PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory();
- if (cache != null) {
- cache.close();
- }
- // Wait until the thread pool that uses the modified IndexRepositoryFactory behaviour has
- // terminated before allowing further tests, to prevent mocking exceptions
- await().until(lonerDistributionThreads::isTerminated);
- }
-
- @Test
- public void shouldRetryWhenIOExceptionEncounteredOnceDuringComputingRepository()
- throws IOException, LuceneQueryException {
- // To ensure that the specific bucket used in the query throws the IOException to trigger the
- // retry, throw once for every bucket in the region
- int numberOfBuckets =
- ((PartitionedRegion) cache.getRegion(REGION_NAME)).getTotalNumberOfBuckets();
-
- doAnswer(new Answer<Object>() {
- private int times = 0;
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- if (times < numberOfBuckets) {
- times++;
- throw new IOException();
- }
- return invocation.callRealMethod();
- }
- }).when(spyFactory).getIndexWriter(any(), any());
-
- luceneQuery.findKeys();
-
- // The invocation should throw once for each bucket, then retry once for each bucket
- verify(spyFactory, times(numberOfBuckets * 2)).getIndexWriter(any(), any());
- }
-
- @Test
- public void shouldThrowInternalGemfireErrorWhenIOExceptionEncounteredConsistentlyDuringComputingRepository()
- throws IOException {
- doThrow(new IOException()).when(spyFactory).getIndexWriter(any(), any());
-
- assertThatThrownBy(luceneQuery::findKeys).isInstanceOf(FunctionException.class)
- .hasCauseInstanceOf(InternalGemFireError.class);
- }
-}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index 7db8b96..7674a45 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -44,7 +44,6 @@
private static final Logger logger = LogService.getLogger();
public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
- protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200;
public IndexRepositoryFactory() {}
@@ -75,8 +74,7 @@
* This is a util function just to not let computeIndexRepository be a huge chunk of code.
*/
protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
- PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index)
- throws IOException {
+ PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
@@ -131,7 +129,7 @@
} catch (IOException e) {
logger.warn("Exception thrown while constructing Lucene Index for bucket:" + bucketId
+ " for file region:" + fileAndChunkBucket.getFullPath(), e);
- throw e;
+ return null;
} catch (CacheClosedException e) {
logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:"
+ bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
@@ -146,34 +144,11 @@
protected IndexWriter buildIndexWriter(int bucketId, BucketRegion fileAndChunkBucket,
LuceneIndexForPartitionedRegion indexForPR) throws IOException {
- int attempts = 0;
- // IOExceptions can occur if the fileAndChunk region is being modified while the IndexWriter is
- // being initialized, so allow limited retries here to account for that timing window
- while (true) {
- // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
- Map<Object, Object> bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
- RegionDirectory dir =
- new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
- IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
- try {
- attempts++;
- return getIndexWriter(dir, config);
- } catch (IOException e) {
- if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) {
- throw e;
- }
- logger.info("Encountered {} while attempting to get IndexWriter for index {}. Retrying...",
- e, indexForPR.getName());
- try {
- Thread.sleep(5);
- } catch (InterruptedException ignore) {
- }
- }
- }
- }
+ // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
+ Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
+ RegionDirectory dir = new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
+ IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
- protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig config)
- throws IOException {
return new IndexWriter(dir, config);
}
@@ -211,8 +186,8 @@
return value;
}
- protected Map<Object, Object> getBucketTargetingMap(BucketRegion region, int bucketId) {
- return new BucketTargetingMap<>(region, bucketId);
+ protected Map getBucketTargetingMap(BucketRegion region, int bucketId) {
+ return new BucketTargetingMap(region, bucketId);
}
protected String getLockName(final BucketRegion fileAndChunkBucket) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
index 38e6355..e301dcf 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.cache.lucene.internal;
-import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -23,13 +22,11 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import org.apache.lucene.index.IndexWriter;
import org.junit.Before;
import org.junit.Test;
@@ -80,8 +77,7 @@
}
@Test
- public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
- throws IOException {
+ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull() {
doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion, bucketId);
IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
@@ -91,8 +87,7 @@
}
@Test
- public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
- throws IOException {
+ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary() {
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false);
IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
@@ -102,8 +97,7 @@
}
@Test
- public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed()
- throws IOException {
+ public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() {
when(oldRepository.isClosed()).thenReturn(false);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
@@ -114,8 +108,7 @@
}
@Test
- public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
- throws IOException {
+ public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary() {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false);
when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(false);
@@ -126,7 +119,7 @@
}
@Test
- public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
+ public void finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
@@ -134,8 +127,9 @@
doThrow(new IOException("Test Exception")).when(indexRepositoryFactory)
.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex);
- assertThatThrownBy(() -> indexRepositoryFactory.finishComputingRepository(0,
- serializer, userRegion, oldRepository, luceneIndex)).isInstanceOf(IOException.class);
+ IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
+ serializer, userRegion, oldRepository, luceneIndex);
+ assertThat(indexRepository).isNull();
verify(distributedLockService).unlock(any());
}
@@ -152,27 +146,4 @@
userRegion, oldRepository, luceneIndex)).isInstanceOf(CacheClosedException.class);
verify(distributedLockService).unlock(any());
}
-
- @Test
- public void buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered()
- throws IOException {
- IndexWriter writer = mock(IndexWriter.class);
- doThrow(new IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(),
- any());
- assertThat(indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
- .isEqualTo(writer);
- verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any());
- }
-
- @Test
- public void buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered()
- throws IOException {
- IOException testException = new IOException("Test exception");
- doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(), any());
- assertThatThrownBy(
- () -> indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
- .isEqualTo(testException);
- verify(indexRepositoryFactory, times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(),
- any());
- }
}