blob: 8525e88e2a9a8d2fccc8076622b66f3391f14550 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal;
import java.util.Collections;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionListener;
import org.apache.geode.internal.cache.extension.Extension;
import org.apache.geode.logging.internal.log4j.api.LogService;
public abstract class LuceneIndexImpl implements InternalLuceneIndex {
protected static final Logger logger = LogService.getLogger();
protected final String indexName;
protected final String regionPath;
protected final InternalCache cache;
protected final LuceneIndexStats indexStats;
protected Map<String, Analyzer> fieldAnalyzers;
protected String[] searchableFieldNames;
protected RepositoryManager repositoryManager;
protected Analyzer analyzer;
protected LuceneSerializer luceneSerializer;
protected LocalRegion dataRegion;
protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache) {
this.indexName = indexName;
this.regionPath = regionPath;
this.cache = cache;
final String statsName = indexName + "-" + regionPath;
this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName);
}
@Override
public String getName() {
return this.indexName;
}
@Override
public String getRegionPath() {
return this.regionPath;
}
protected LocalRegion assignDataRegion() {
return (LocalRegion) cache.getRegion(regionPath);
}
protected LocalRegion getDataRegion() {
return dataRegion;
}
protected boolean withPersistence() {
RegionAttributes ra = dataRegion.getAttributes();
DataPolicy dp = ra.getDataPolicy();
final boolean withPersistence = dp.withPersistence();
return withPersistence;
}
protected void setSearchableFields(String[] fields) {
searchableFieldNames = fields;
}
@Override
public String[] getFieldNames() {
return searchableFieldNames;
}
@Override
public Map<String, Analyzer> getFieldAnalyzers() {
return this.fieldAnalyzers;
}
@Override
public RepositoryManager getRepositoryManager() {
return this.repositoryManager;
}
public void setAnalyzer(Analyzer analyzer) {
if (analyzer == null) {
this.analyzer = new StandardAnalyzer();
} else {
this.analyzer = analyzer;
}
}
public Analyzer getAnalyzer() {
return this.analyzer;
}
@Override
public LuceneSerializer getLuceneSerializer() {
return this.luceneSerializer;
}
public void setLuceneSerializer(LuceneSerializer serializer) {
this.luceneSerializer = serializer;
}
@Override
public Cache getCache() {
return this.cache;
}
public void setFieldAnalyzers(Map<String, Analyzer> fieldAnalyzers) {
this.fieldAnalyzers =
fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers);
}
@Override
public LuceneIndexStats getIndexStats() {
return indexStats;
}
@Override
public void initialize() {
/* create index region */
dataRegion = assignDataRegion();
createLuceneListenersAndFileChunkRegions((PartitionedRepositoryManager) repositoryManager);
addExtension(dataRegion);
}
protected void setupRepositoryManager(LuceneSerializer luceneSerializer) {
repositoryManager = createRepositoryManager(luceneSerializer);
}
protected abstract RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer);
protected abstract void createLuceneListenersAndFileChunkRegions(
PartitionedRepositoryManager partitionedRepositoryManager);
protected AsyncEventQueue createAEQ(Region dataRegion) {
String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId);
}
protected AsyncEventQueue createAEQ(RegionAttributes attributes, String aeqId) {
if (attributes.getPartitionAttributes() != null) {
if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) {
// accessor will not create AEQ
return null;
}
}
return createAEQ(createAEQFactory(attributes), aeqId);
}
private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String aeqId) {
LuceneEventListener listener = new LuceneEventListener(cache, repositoryManager);
factory.setGatewayEventSubstitutionListener(new LuceneEventSubstitutionFilter());
AsyncEventQueue indexQueue = factory.create(aeqId, listener);
return indexQueue;
}
private AsyncEventQueueFactoryImpl createAEQFactory(final RegionAttributes attributes) {
AsyncEventQueueFactoryImpl factory =
(AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
if (attributes.getPartitionAttributes() != null) {
factory.setParallel(true); // parallel AEQ for PR
} else {
factory.setParallel(false); // TODO: not sure if serial AEQ working or not
}
factory.setMaximumQueueMemory(1000);
factory.setDispatcherThreads(10);
factory.setBatchSize(1000);
factory.setIsMetaQueue(true);
if (attributes.getDataPolicy().withPersistence()) {
factory.setPersistent(true);
}
factory.setDiskStoreName(attributes.getDiskStoreName());
factory.setDiskSynchronous(true);
factory.setForwardExpirationDestroy(true);
return factory;
}
/**
* Register an extension with the region so that xml will be generated for this index.
*/
protected void addExtension(LocalRegion dataRegion) {
LuceneIndexCreation creation = new LuceneIndexCreation();
creation.setName(this.getName());
creation.addFieldNames(this.getFieldNames());
creation.setRegion(dataRegion);
creation.setFieldAnalyzers(this.getFieldAnalyzers());
creation.setLuceneSerializer(this.getLuceneSerializer());
dataRegion.getExtensionPoint().addExtension(creation);
}
@Override
public void destroy(boolean initiator) {
// Find and delete the appropriate extension
Extension extensionToDelete = null;
for (Extension extension : getDataRegion().getExtensionPoint().getExtensions()) {
LuceneIndexCreation index = (LuceneIndexCreation) extension;
if (index.getName().equals(indexName)) {
extensionToDelete = extension;
break;
}
}
if (extensionToDelete != null) {
getDataRegion().getExtensionPoint().removeExtension(extensionToDelete);
}
// Destroy the async event queue
destroyAsyncEventQueue(initiator);
// Close the repository manager
repositoryManager.close();
RegionListener listenerToRemove = getRegionListener();
if (listenerToRemove != null) {
cache.removeRegionListener(listenerToRemove);
}
// Remove cache service profile
dataRegion
.removeCacheServiceProfile(LuceneIndexCreationProfile.generateId(indexName, regionPath));
}
private RegionListener getRegionListener() {
RegionListener rl = null;
for (RegionListener listener : cache.getRegionListeners()) {
if (listener instanceof LuceneRegionListener) {
LuceneRegionListener lrl = (LuceneRegionListener) listener;
if (lrl.getRegionPath().equals(regionPath) && lrl.getIndexName().equals(indexName)) {
rl = lrl;
break;
}
}
}
return rl;
}
protected <K, V> Region<K, V> createRegion(final String regionName,
final RegionAttributes<K, V> attributes) {
// Create InternalRegionArguments to set isUsedForMetaRegion true to suppress xml generation
// (among other things)
InternalRegionArguments ira =
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null).setIsUsedForMetaRegion(true);
// Create the region
try {
return this.cache.createVMRegion(regionName, attributes, ira);
} catch (Exception e) {
InternalGemFireError ige = new InternalGemFireError(
"unexpected exception");
ige.initCause(e);
throw ige;
}
}
private void destroyAsyncEventQueue(boolean initiator) {
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
// Get the AsyncEventQueue
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
// Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender)
// The AsyncEventQueue can be null in an accessor member
if (aeq != null) {
aeq.stop();
}
// Remove the id from the dataRegion's AsyncEventQueue ids
// Note: The region may already have been destroyed by a remote member
Region region = getDataRegion();
if (!region.isDestroyed()) {
region.getAttributesMutator().removeAsyncEventQueueId(aeqId);
}
// Destroy the aeq (this also removes it from the GemFireCacheImpl)
// The AsyncEventQueue can be null in an accessor member
if (aeq != null) {
aeq.destroy(initiator);
}
if (logger.isDebugEnabled()) {
logger.debug("Destroyed aeqId=" + aeqId);
}
}
}