blob: 17de988df7bc82ff9f013c396ffbf7e051dbe318 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
protected Region fileAndChunkRegion;
protected final FileSystemStats fileSystemStats;
public static final String FILES_REGION_SUFFIX = ".files";
private final ExecutorService waitingThreadPoolFromDM;
public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
super(indexName, regionPath, cache);
this.waitingThreadPoolFromDM =
cache.getDistributionManager().getExecutors().getWaitingThreadPool();
final String statsName = indexName + "-" + regionPath;
this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
}
@Override
protected RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer) {
LuceneSerializer mapper = luceneSerializer;
if (mapper == null) {
mapper = new HeterogeneousLuceneSerializer();
}
PartitionedRepositoryManager partitionedRepositoryManager =
new PartitionedRepositoryManager(this, mapper, this.waitingThreadPoolFromDM);
return partitionedRepositoryManager;
}
@Override
public boolean isIndexingInProgress() {
PartitionedRegion userRegion = (PartitionedRegion) cache.getRegion(this.getRegionPath());
Set<Integer> fileRegionPrimaryBucketIds =
this.getFileAndChunkRegion().getDataStore().getAllLocalPrimaryBucketIds();
for (Integer bucketId : fileRegionPrimaryBucketIds) {
BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId);
if (!userBucket.isEmpty() && !this.isIndexAvailable(bucketId)) {
return true;
}
}
return false;
}
@Override
protected void createLuceneListenersAndFileChunkRegions(
PartitionedRepositoryManager partitionedRepositoryManager) {
partitionedRepositoryManager.setUserRegionForRepositoryManager((PartitionedRegion) dataRegion);
RegionShortcut regionShortCut;
final boolean withPersistence = withPersistence();
RegionAttributes regionAttributes = dataRegion.getAttributes();
final boolean withStorage = regionAttributes.getPartitionAttributes().getLocalMaxMemory() > 0;
// TODO: 1) dataRegion should be withStorage
// 2) Persistence to Persistence
// 3) Replicate to Replicate, Partition To Partition
// 4) Offheap to Offheap
if (!withStorage) {
regionShortCut = RegionShortcut.PARTITION_PROXY;
} else if (withPersistence) {
// TODO: add PartitionedRegionAttributes instead
regionShortCut = RegionShortcut.PARTITION_PERSISTENT;
} else {
regionShortCut = RegionShortcut.PARTITION;
}
// create PR fileAndChunkRegion, but not to create its buckets for now
final String fileRegionName = createFileRegionName();
PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes();
DistributionManager dm = this.cache.getInternalDistributedSystem().getDistributionManager();
LuceneBucketListener lucenePrimaryBucketListener =
new LuceneBucketListener(partitionedRepositoryManager, dm);
if (!fileRegionExists(fileRegionName)) {
fileAndChunkRegion = createRegion(fileRegionName, regionShortCut, this.regionPath,
partitionAttributes, regionAttributes, lucenePrimaryBucketListener);
}
fileSystemStats
.setBytesSupplier(() -> getFileAndChunkRegion().getPrStats().getDataStoreBytesInUse());
}
public PartitionedRegion getFileAndChunkRegion() {
return (PartitionedRegion) fileAndChunkRegion;
}
public FileSystemStats getFileSystemStats() {
return fileSystemStats;
}
boolean fileRegionExists(String fileRegionName) {
return cache.getRegion(fileRegionName) != null;
}
public String createFileRegionName() {
return LuceneServiceImpl.getUniqueIndexRegionName(indexName, regionPath, FILES_REGION_SUFFIX);
}
private PartitionAttributesFactory configureLuceneRegionAttributesFactory(
PartitionAttributesFactory attributesFactory,
PartitionAttributes<?, ?> dataRegionAttributes) {
attributesFactory.setTotalNumBuckets(dataRegionAttributes.getTotalNumBuckets());
attributesFactory.setRedundantCopies(dataRegionAttributes.getRedundantCopies());
attributesFactory.setPartitionResolver(getPartitionResolver(dataRegionAttributes));
attributesFactory.setRecoveryDelay(dataRegionAttributes.getRecoveryDelay());
attributesFactory.setStartupRecoveryDelay(dataRegionAttributes.getStartupRecoveryDelay());
return attributesFactory;
}
private PartitionResolver getPartitionResolver(PartitionAttributes dataRegionAttributes) {
if (dataRegionAttributes.getPartitionResolver() instanceof FixedPartitionResolver) {
return new BucketTargetingFixedResolver();
} else {
return new BucketTargetingResolver();
}
}
protected <K, V> Region<K, V> createRegion(final String regionName,
final RegionShortcut regionShortCut, final String colocatedWithRegionName,
final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes,
PartitionListener lucenePrimaryBucketListener) {
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
if (lucenePrimaryBucketListener != null) {
partitionAttributesFactory.addPartitionListener(lucenePrimaryBucketListener);
}
partitionAttributesFactory.setColocatedWith(colocatedWithRegionName);
configureLuceneRegionAttributesFactory(partitionAttributesFactory, partitionAttributes);
// Create AttributesFactory based on input RegionShortcut
RegionAttributes baseAttributes = this.cache.getRegionAttributes(regionShortCut.toString());
AttributesFactory factory = new AttributesFactory(baseAttributes);
factory.setPartitionAttributes(partitionAttributesFactory.create());
if (regionAttributes.getDataPolicy().withPersistence()) {
factory.setDiskStoreName(regionAttributes.getDiskStoreName());
}
RegionAttributes<K, V> attributes = factory.create();
return createRegion(regionName, attributes);
}
public void close() {}
@Override
public void dumpFiles(final String directory) {
ResultCollector results = FunctionService.onRegion(getDataRegion())
.setArguments(new String[] {directory, indexName}).execute(DumpDirectoryFiles.ID);
results.getResult();
}
@Override
public void destroy(boolean initiator) {
if (logger.isDebugEnabled()) {
logger.debug("Destroying index regionPath=" + regionPath + "; indexName=" + indexName
+ "; initiator=" + initiator);
}
// Invoke super destroy to remove the extension and async event queue
super.destroy(initiator);
// Destroy index on remote members if necessary
if (initiator) {
destroyOnRemoteMembers();
}
// Destroy the file region (colocated with the application region) if necessary
// localDestroyRegion can't be used because locally destroying regions is not supported on
// colocated regions
if (initiator) {
try {
fileAndChunkRegion.destroyRegion();
if (logger.isDebugEnabled()) {
logger.debug("Destroyed fileAndChunkRegion=" + fileAndChunkRegion.getName());
}
} catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("Already destroyed fileAndChunkRegion=" + fileAndChunkRegion.getName());
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Destroyed index regionPath=" + regionPath + "; indexName=" + indexName
+ "; initiator=" + initiator);
}
}
@Override
public boolean isIndexAvailable(int id) {
PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion();
return (fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null
|| !LuceneServiceImpl.LUCENE_REINDEX);
}
private void destroyOnRemoteMembers() {
DistributionManager dm = getDataRegion().getDistributionManager();
Set<InternalDistributedMember> recipients = dm.getOtherNormalDistributionManagerIds();
if (!recipients.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("LuceneIndexForPartitionedRegion: About to send destroy message recipients="
+ recipients);
}
ReplyProcessor21 processor = new ReplyProcessor21(dm, recipients);
DestroyLuceneIndexMessage message = new DestroyLuceneIndexMessage(recipients,
processor.getProcessorId(), regionPath, indexName);
dm.putOutgoing(message);
if (logger.isDebugEnabled()) {
logger.debug("LuceneIndexForPartitionedRegion: Sent message recipients=" + recipients);
}
try {
processor.waitForReplies();
} catch (ReplyException e) {
Throwable cause = e.getCause();
if (cause instanceof IllegalArgumentException) {
// If the IllegalArgumentException is index not found, then its ok; otherwise rethrow it.
String fullRegionPath =
regionPath.startsWith(Region.SEPARATOR) ? regionPath : Region.SEPARATOR + regionPath;
String indexNotFoundMessage = String.format("Lucene index %s was not found in region %s",
indexName, fullRegionPath);
if (!cause.getLocalizedMessage().equals(indexNotFoundMessage)) {
throw e;
}
} else if (!(cause instanceof CancelException)) {
throw e;
}
} catch (InterruptedException e) {
dm.getCancelCriterion().checkCancelInProgress(e);
Thread.currentThread().interrupt();
}
}
}
}