| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.lucene.internal; |
| |
| import java.util.Collections; |
| import java.util.Map; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.apache.lucene.analysis.Analyzer; |
| import org.apache.lucene.analysis.standard.StandardAnalyzer; |
| |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.asyncqueue.AsyncEventQueue; |
| import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; |
| import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; |
| import org.apache.geode.cache.lucene.LuceneSerializer; |
| import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; |
| import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalRegionArguments; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.RegionListener; |
| import org.apache.geode.internal.cache.extension.Extension; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public abstract class LuceneIndexImpl implements InternalLuceneIndex { |
| protected static final Logger logger = LogService.getLogger(); |
| |
| protected final String indexName; |
| protected final String regionPath; |
| protected final InternalCache cache; |
| protected final LuceneIndexStats indexStats; |
| |
| protected Map<String, Analyzer> fieldAnalyzers; |
| protected String[] searchableFieldNames; |
| protected RepositoryManager repositoryManager; |
| protected Analyzer analyzer; |
| protected LuceneSerializer luceneSerializer; |
| protected LocalRegion dataRegion; |
| |
| protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache) { |
| this.indexName = indexName; |
| this.regionPath = regionPath; |
| this.cache = cache; |
| |
| final String statsName = indexName + "-" + regionPath; |
| this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName); |
| } |
| |
| @Override |
| public String getName() { |
| return this.indexName; |
| } |
| |
| @Override |
| public String getRegionPath() { |
| return this.regionPath; |
| } |
| |
| protected LocalRegion assignDataRegion() { |
| return (LocalRegion) cache.getRegion(regionPath); |
| } |
| |
| protected LocalRegion getDataRegion() { |
| return dataRegion; |
| } |
| |
| protected boolean withPersistence() { |
| RegionAttributes ra = dataRegion.getAttributes(); |
| DataPolicy dp = ra.getDataPolicy(); |
| final boolean withPersistence = dp.withPersistence(); |
| return withPersistence; |
| } |
| |
| protected void setSearchableFields(String[] fields) { |
| searchableFieldNames = fields; |
| } |
| |
| @Override |
| public String[] getFieldNames() { |
| return searchableFieldNames; |
| } |
| |
| @Override |
| public Map<String, Analyzer> getFieldAnalyzers() { |
| return this.fieldAnalyzers; |
| } |
| |
| @Override |
| public RepositoryManager getRepositoryManager() { |
| return this.repositoryManager; |
| } |
| |
| public void setAnalyzer(Analyzer analyzer) { |
| if (analyzer == null) { |
| this.analyzer = new StandardAnalyzer(); |
| } else { |
| this.analyzer = analyzer; |
| } |
| } |
| |
| public Analyzer getAnalyzer() { |
| return this.analyzer; |
| } |
| |
| @Override |
| public LuceneSerializer getLuceneSerializer() { |
| return this.luceneSerializer; |
| } |
| |
| public void setLuceneSerializer(LuceneSerializer serializer) { |
| this.luceneSerializer = serializer; |
| } |
| |
| @Override |
| public Cache getCache() { |
| return this.cache; |
| } |
| |
| public void setFieldAnalyzers(Map<String, Analyzer> fieldAnalyzers) { |
| this.fieldAnalyzers = |
| fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers); |
| } |
| |
| @Override |
| public LuceneIndexStats getIndexStats() { |
| return indexStats; |
| } |
| |
| @Override |
| public void initialize() { |
| /* create index region */ |
| dataRegion = assignDataRegion(); |
| createLuceneListenersAndFileChunkRegions((PartitionedRepositoryManager) repositoryManager); |
| addExtension(dataRegion); |
| } |
| |
| protected void setupRepositoryManager(LuceneSerializer luceneSerializer) { |
| repositoryManager = createRepositoryManager(luceneSerializer); |
| } |
| |
| protected abstract RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer); |
| |
| protected abstract void createLuceneListenersAndFileChunkRegions( |
| PartitionedRepositoryManager partitionedRepositoryManager); |
| |
| protected AsyncEventQueue createAEQ(Region dataRegion) { |
| String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); |
| return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId); |
| } |
| |
| protected AsyncEventQueue createAEQ(RegionAttributes attributes, String aeqId) { |
| if (attributes.getPartitionAttributes() != null) { |
| if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) { |
| // accessor will not create AEQ |
| return null; |
| } |
| } |
| return createAEQ(createAEQFactory(attributes), aeqId); |
| } |
| |
| private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String aeqId) { |
| LuceneEventListener listener = new LuceneEventListener(cache, repositoryManager); |
| factory.setGatewayEventSubstitutionListener(new LuceneEventSubstitutionFilter()); |
| AsyncEventQueue indexQueue = factory.create(aeqId, listener); |
| return indexQueue; |
| } |
| |
| private AsyncEventQueueFactoryImpl createAEQFactory(final RegionAttributes attributes) { |
| AsyncEventQueueFactoryImpl factory = |
| (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); |
| if (attributes.getPartitionAttributes() != null) { |
| factory.setParallel(true); // parallel AEQ for PR |
| } else { |
| factory.setParallel(false); // TODO: not sure if serial AEQ working or not |
| } |
| factory.setMaximumQueueMemory(1000); |
| factory.setDispatcherThreads(10); |
| factory.setBatchSize(1000); |
| factory.setIsMetaQueue(true); |
| if (attributes.getDataPolicy().withPersistence()) { |
| factory.setPersistent(true); |
| } |
| factory.setDiskStoreName(attributes.getDiskStoreName()); |
| factory.setDiskSynchronous(true); |
| factory.setForwardExpirationDestroy(true); |
| return factory; |
| } |
| |
| /** |
| * Register an extension with the region so that xml will be generated for this index. |
| */ |
| protected void addExtension(LocalRegion dataRegion) { |
| LuceneIndexCreation creation = new LuceneIndexCreation(); |
| creation.setName(this.getName()); |
| creation.addFieldNames(this.getFieldNames()); |
| creation.setRegion(dataRegion); |
| creation.setFieldAnalyzers(this.getFieldAnalyzers()); |
| creation.setLuceneSerializer(this.getLuceneSerializer()); |
| dataRegion.getExtensionPoint().addExtension(creation); |
| } |
| |
| @Override |
| public void destroy(boolean initiator) { |
| // Find and delete the appropriate extension |
| Extension extensionToDelete = null; |
| for (Extension extension : getDataRegion().getExtensionPoint().getExtensions()) { |
| LuceneIndexCreation index = (LuceneIndexCreation) extension; |
| if (index.getName().equals(indexName)) { |
| extensionToDelete = extension; |
| break; |
| } |
| } |
| if (extensionToDelete != null) { |
| getDataRegion().getExtensionPoint().removeExtension(extensionToDelete); |
| } |
| |
| // Destroy the async event queue |
| destroyAsyncEventQueue(initiator); |
| |
| // Close the repository manager |
| repositoryManager.close(); |
| |
| RegionListener listenerToRemove = getRegionListener(); |
| if (listenerToRemove != null) { |
| cache.removeRegionListener(listenerToRemove); |
| } |
| |
| // Remove cache service profile |
| dataRegion |
| .removeCacheServiceProfile(LuceneIndexCreationProfile.generateId(indexName, regionPath)); |
| } |
| |
| private RegionListener getRegionListener() { |
| RegionListener rl = null; |
| for (RegionListener listener : cache.getRegionListeners()) { |
| if (listener instanceof LuceneRegionListener) { |
| LuceneRegionListener lrl = (LuceneRegionListener) listener; |
| if (lrl.getRegionPath().equals(regionPath) && lrl.getIndexName().equals(indexName)) { |
| rl = lrl; |
| break; |
| } |
| } |
| } |
| return rl; |
| } |
| |
| protected <K, V> Region<K, V> createRegion(final String regionName, |
| final RegionAttributes<K, V> attributes) { |
| // Create InternalRegionArguments to set isUsedForMetaRegion true to suppress xml generation |
| // (among other things) |
| InternalRegionArguments ira = |
| new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) |
| .setSnapshotInputStream(null).setImageTarget(null).setIsUsedForMetaRegion(true); |
| |
| // Create the region |
| try { |
| return this.cache.createVMRegion(regionName, attributes, ira); |
| } catch (Exception e) { |
| InternalGemFireError ige = new InternalGemFireError( |
| "unexpected exception"); |
| ige.initCause(e); |
| throw ige; |
| } |
| } |
| |
| private void destroyAsyncEventQueue(boolean initiator) { |
| String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); |
| |
| // Get the AsyncEventQueue |
| AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); |
| |
| // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender) |
| // The AsyncEventQueue can be null in an accessor member |
| if (aeq != null) { |
| aeq.stop(); |
| } |
| |
| // Remove the id from the dataRegion's AsyncEventQueue ids |
| // Note: The region may already have been destroyed by a remote member |
| Region region = getDataRegion(); |
| if (!region.isDestroyed()) { |
| region.getAttributesMutator().removeAsyncEventQueueId(aeqId); |
| } |
| |
| // Destroy the aeq (this also removes it from the GemFireCacheImpl) |
| // The AsyncEventQueue can be null in an accessor member |
| if (aeq != null) { |
| aeq.destroy(initiator); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Destroyed aeqId=" + aeqId); |
| } |
| } |
| } |