blob: 892ec67a706fee1dcdcf1094b1a683635934d8d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.index.lucene;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexInputSplit;
import org.apache.carbondata.core.index.IndexLevel;
import org.apache.carbondata.core.index.IndexMeta;
import org.apache.carbondata.core.index.IndexStoreManager;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.TableIndex;
import org.apache.carbondata.core.index.dev.Index;
import org.apache.carbondata.core.index.dev.IndexBuilder;
import org.apache.carbondata.core.index.dev.IndexFactory;
import org.apache.carbondata.core.index.dev.IndexWriter;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
/**
* Base implementation for CG and FG lucene index factory.
*/
@InterfaceAudience.Internal
abstract class LuceneIndexFactoryBase<T extends Index> extends IndexFactory<T> {
/**
* Size of the cache to maintain in Lucene writer, if specified then it tries to aggregate the
* unique data till the cache limit and flush to Lucene.
* It is best suitable for low cardinality dimensions.
*/
static final String FLUSH_CACHE = "flush_cache";
/**
* By default it does not use any cache.
*/
static final String FLUSH_CACHE_DEFAULT_SIZE = "-1";
/**
* when made as true then store the data in blocklet wise in lucene , it means new folder will be
* created for each blocklet thus it eliminates storing on blockletid in lucene.
* And also it makes lucene small chuns of data
*/
static final String SPLIT_BLOCKLET = "split_blocklet";
/**
* By default it is false
*/
static final String SPLIT_BLOCKLET_DEFAULT = "true";
/**
* Logger
*/
final Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
/**
* table's index columns
*/
IndexMeta indexMeta = null;
/**
* analyzer for lucene
*/
Analyzer analyzer = null;
/**
* index name
*/
String indexName = null;
/**
* table identifier
*/
AbsoluteTableIdentifier tableIdentifier = null;
List<CarbonColumn> indexedCarbonColumns = null;
int flushCacheSize;
boolean storeBlockletWise;
public LuceneIndexFactoryBase(CarbonTable carbonTable, IndexSchema indexSchema)
throws MalformedIndexCommandException {
super(carbonTable, indexSchema);
Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier());
Objects.requireNonNull(indexSchema);
this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
this.indexName = indexSchema.getIndexName();
indexedCarbonColumns = carbonTable.getIndexedColumns(indexSchema.getIndexColumns());
flushCacheSize = validateAndGetWriteCacheSize(indexSchema);
storeBlockletWise = validateAndGetStoreBlockletWise(indexSchema);
// add optimizedOperations
List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
// optimizedOperations.add(ExpressionType.EQUALS);
// optimizedOperations.add(ExpressionType.GREATERTHAN);
// optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
// optimizedOperations.add(ExpressionType.LESSTHAN);
// optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
// optimizedOperations.add(ExpressionType.NOT);
optimizedOperations.add(ExpressionType.TEXT_MATCH);
this.indexMeta = new IndexMeta(indexedCarbonColumns, optimizedOperations);
// get analyzer
// TODO: how to get analyzer ?
analyzer = new StandardAnalyzer();
}
public static int validateAndGetWriteCacheSize(IndexSchema schema) {
String cacheStr = schema.getProperties().get(FLUSH_CACHE);
if (cacheStr == null) {
cacheStr = FLUSH_CACHE_DEFAULT_SIZE;
}
int cacheSize;
try {
cacheSize = Integer.parseInt(cacheStr);
} catch (NumberFormatException e) {
cacheSize = -1;
}
return cacheSize;
}
public static boolean validateAndGetStoreBlockletWise(IndexSchema schema) {
String splitBlockletStr = schema.getProperties().get(SPLIT_BLOCKLET);
if (splitBlockletStr == null) {
splitBlockletStr = SPLIT_BLOCKLET_DEFAULT;
}
boolean splitBlockletWise;
try {
splitBlockletWise = Boolean.parseBoolean(splitBlockletStr);
} catch (NumberFormatException e) {
splitBlockletWise = true;
}
return splitBlockletWise;
}
/**
* this method will delete the index folders during drop index
*/
private void deleteIndex() throws MalformedIndexCommandException {
SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
try {
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isMV()).getValidSegments();
for (Segment segment : validSegments) {
deleteIndexData(segment);
}
} catch (IOException | RuntimeException ex) {
throw new MalformedIndexCommandException(
"drop index failed, failed to delete index directory");
}
}
/**
* Return a new write for this index
*/
@Override
public IndexWriter createWriter(Segment segment, String shardName,
SegmentProperties segmentProperties) {
LOGGER.info("lucene data write to " + shardName);
return new LuceneIndexWriter(getCarbonTable().getTablePath(), indexName,
indexMeta.getIndexedColumns(), segment, shardName, flushCacheSize,
storeBlockletWise);
}
@Override
public IndexBuilder createBuilder(Segment segment, String shardName,
SegmentProperties segmentProperties) {
return new LuceneIndexBuilder(getCarbonTable().getTablePath(), indexName,
segment, shardName, indexMeta.getIndexedColumns(), flushCacheSize, storeBlockletWise);
}
/**
* Get all distributable objects of a segmentId
*/
@Override
public List<IndexInputSplit> toDistributable(Segment segment) {
List<IndexInputSplit> splits = new ArrayList<>();
CarbonFile[] indexDirs =
getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());
if (segment.getFilteredIndexShardNames().size() == 0) {
for (CarbonFile indexDir : indexDirs) {
IndexInputSplit luceneIndexInputSplit =
new LuceneIndexInputSplit(tableIdentifier.getTablePath(),
indexDir.getAbsolutePath());
luceneIndexInputSplit.setSegment(segment);
luceneIndexInputSplit.setIndexSchema(getIndexSchema());
splits.add(luceneIndexInputSplit);
}
return splits;
}
for (CarbonFile indexDir : indexDirs) {
// Filter out the tasks which are filtered through CG index.
if (getIndexLevel() != IndexLevel.FG &&
!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
continue;
}
IndexInputSplit luceneIndexInputSplit = new LuceneIndexInputSplit(
CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
indexDir.getAbsolutePath());
luceneIndexInputSplit.setSegment(segment);
luceneIndexInputSplit.setIndexSchema(getIndexSchema());
splits.add(luceneIndexInputSplit);
}
return splits;
}
@Override
public void fireEvent(Event event) {
}
/**
* Clear all indexes from memory
*/
@Override
public void clear() {
}
@Override
public void deleteIndexData(Segment segment) throws IOException {
deleteSegmentIndexData(segment.getSegmentNo());
}
@Override
public void deleteSegmentIndexData(String segmentId) throws IOException {
try {
String indexPath = CarbonTablePath
.getIndexesStorePath(tableIdentifier.getTablePath(), segmentId, indexName);
if (FileFactory.isFileExist(indexPath)) {
CarbonFile file = FileFactory.getCarbonFile(indexPath);
CarbonUtil.deleteFoldersAndFilesSilent(file);
}
} catch (InterruptedException ex) {
throw new IOException("drop index failed, failed to delete index directory");
}
}
@Override
public void deleteIndexData() {
try {
deleteIndex();
} catch (MalformedIndexCommandException ex) {
LOGGER.error("failed to delete index directory ", ex);
}
}
/**
* Return metadata of this index
*/
public IndexMeta getMeta() {
return indexMeta;
}
/**
* returns all the directories of lucene index files for query
* @param tablePath
* @param segmentId
* @return
*/
private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
List<CarbonFile> indexDirs = new ArrayList<>();
List<TableIndex> indexes = new ArrayList<>();
try {
// there can be multiple lucene index present on a table, so get all indexes and form
// the path till the index file directories in all index folders present in each segment
indexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(getCarbonTable());
} catch (IOException ex) {
LOGGER.error("failed to get indexes");
}
if (indexes.size() > 0) {
for (TableIndex index : indexes) {
if (index.getIndexSchema().getIndexName().equals(this.indexName)) {
List<CarbonFile> indexFiles;
String dmPath = CarbonTablePath.getIndexesStorePath(tablePath, segmentId,
index.getIndexSchema().getIndexName());
final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath);
indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.isDirectory();
}
}));
indexDirs.addAll(indexFiles);
}
}
}
return indexDirs.toArray(new CarbonFile[0]);
}
}