blob: e1e4171df2b61fa77905fdb0719fc225c19e566d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.index.lucene;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.dev.IndexModel;
import org.apache.carbondata.core.index.dev.fgindex.FineGrainBlocklet;
import org.apache.carbondata.core.index.dev.fgindex.FineGrainIndex;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.MatchExpression;
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.store.hdfs.HdfsDirectory;
@InterfaceAudience.Internal
public class LuceneFineGrainIndex extends FineGrainIndex {
/**
* log information
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(LuceneFineGrainIndex.class.getName());
/**
* search limit will help in deciding the size of priority queue which is used by lucene to store
* the documents in heap. By default it is 100 means in one search max of 10 documents can be
* stored in heap by lucene. This way it will help in reducing the GC.
* Note: If it is removed or it's value is increased it will lead to almost 90%
* of the query time in GC in worst case scenarios if it's value is increased beyond a limit
*/
private static final int SEARCH_LIMIT = 100;
/**
* searcher object for this index
*/
private Map<String, IndexSearcher> indexSearcherMap = null;
/**
* analyzer for lucene index
*/
private Analyzer analyzer;
private String filePath;
private int writeCacheSize;
private boolean storeBlockletWise;
private IndexReader indexReader;
LuceneFineGrainIndex(Analyzer analyzer, IndexSchema schema) {
this.analyzer = analyzer;
writeCacheSize = LuceneIndexFactoryBase.validateAndGetWriteCacheSize(schema);
storeBlockletWise = LuceneIndexFactoryBase.validateAndGetStoreBlockletWise(schema);
}
/**
* It is called to load the index to memory or to initialize it.
*/
public void init(IndexModel indexModel) throws IOException {
long startTime = System.currentTimeMillis();
// get this path from file path
Path indexPath = FileFactory.getPath(indexModel.getFilePath());
LOGGER.info("Lucene index read path " + indexPath.toString());
this.filePath = indexPath.getName();
this.indexSearcherMap = new HashMap<>();
// get file system , use hdfs file system , realized in solr project
CarbonFile indexFilePath = FileFactory.getCarbonFile(indexPath.toString());
// check this path valid
if (!indexFilePath.exists()) {
String errorMessage = String.format("index directory %s not exists.", indexPath);
LOGGER.error(errorMessage);
throw new IOException(errorMessage);
}
if (!indexFilePath.isDirectory()) {
String errorMessage = String.format("error index path %s, must be directory", indexPath);
LOGGER.error(errorMessage);
throw new IOException(errorMessage);
}
if (storeBlockletWise) {
CarbonFile[] blockletDirs = indexFilePath.listFiles();
for (CarbonFile blockletDir : blockletDirs) {
IndexSearcher indexSearcher = createIndexSearcher(new Path(blockletDir.getAbsolutePath()));
indexSearcherMap.put(blockletDir.getName(), indexSearcher);
}
} else {
IndexSearcher indexSearcher = createIndexSearcher(indexPath);
indexSearcherMap.put("-1", indexSearcher);
}
LOGGER.info(
"Time taken to initialize lucene searcher: " + (System.currentTimeMillis() - startTime));
}
private IndexSearcher createIndexSearcher(Path indexPath) throws IOException {
// open this index path , use HDFS default configuration
Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
this.indexReader = DirectoryReader.open(indexDir);
if (indexReader == null) {
throw new RuntimeException("failed to create index reader object");
}
// create a index searcher object
return new IndexSearcher(indexReader);
}
/**
* Return the query string in the first TEXT_MATCH expression in the expression tree
*/
private String getQueryString(Expression expression) {
if (expression.getFilterExpressionType() == ExpressionType.TEXT_MATCH) {
return expression.getString();
}
for (Expression child : expression.getChildren()) {
String queryString = getQueryString(child);
if (queryString != null) {
return queryString;
}
}
return null;
}
/**
* Return Maximum records
* @return
*/
private int getMaxDoc(Expression expression) {
if (expression.getFilterExpressionType() == ExpressionType.TEXT_MATCH) {
int maxDoc = ((MatchExpression) expression).getMaxDoc();
if (maxDoc < 0) {
maxDoc = Integer.MAX_VALUE;
}
return maxDoc;
}
for (Expression child : expression.getChildren()) {
return getMaxDoc(child);
}
return Integer.MAX_VALUE;
}
/**
* Prune the index with filter expression. It returns the list of
* blocklets where these filters can exist.
*/
@Override
public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp,
SegmentProperties segmentProperties, FilterExecutor filterExecutor,
CarbonTable carbonTable) throws IOException {
// convert filter expr into lucene list query
List<String> fields = new ArrayList<String>();
// only for test , query all data
String strQuery = getQueryString(filterExp.getFilterExpression());
int maxDocs;
try {
maxDocs = getMaxDoc(filterExp.getFilterExpression());
} catch (NumberFormatException e) {
maxDocs = Integer.MAX_VALUE;
}
if (null == strQuery) {
return null;
}
String[] sFields = new String[fields.size()];
fields.toArray(sFields);
// get analyzer
if (analyzer == null) {
analyzer = new StandardAnalyzer();
}
// use MultiFieldQueryParser to parser query
QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
queryParser.setAllowLeadingWildcard(true);
Query query;
try {
query = queryParser.parse(strQuery);
} catch (ParseException e) {
String errorMessage = String.format(
"failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
LOGGER.error(errorMessage, e);
return null;
}
// temporary data, delete duplicated data
// Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>();
long luceneSearchStartTime = System.currentTimeMillis();
for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) {
IndexSearcher indexSearcher = searcherEntry.getValue();
// take the min of total documents available in the reader and limit if set by the user
maxDocs = Math.min(maxDocs, indexSearcher.getIndexReader().maxDoc());
// execute index search
TopDocs result = null;
// the number of documents to be queried in one search. It will always be minimum of
// search result and maxDocs
int numberOfDocumentsToBeQueried = 0;
// counter for maintaining the total number of documents finished querying
int documentHitCounter = 0;
try {
numberOfDocumentsToBeQueried = Math.min(maxDocs, SEARCH_LIMIT);
result = indexSearcher.search(query, numberOfDocumentsToBeQueried);
documentHitCounter += numberOfDocumentsToBeQueried;
} catch (IOException e) {
String errorMessage =
String.format("failed to search lucene data, detail is %s", e.getMessage());
LOGGER.error(errorMessage, e);
throw new IOException(errorMessage, e);
}
ByteBuffer intBuffer = ByteBuffer.allocate(4);
// last scoreDoc in a result to be used in searchAfter API
ScoreDoc lastScoreDoc = null;
while (true) {
for (ScoreDoc scoreDoc : result.scoreDocs) {
// get a document
Document doc = indexSearcher.doc(scoreDoc.doc);
// get all fields
List<IndexableField> fieldsInDoc = doc.getFields();
if (writeCacheSize > 0) {
// It fills rowids to the map, its value is combined with multiple rows.
fillMapForCombineRows(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
} else {
// Fill rowids to the map
fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
}
lastScoreDoc = scoreDoc;
}
// result will have the total number of hits therefore we always need to query on the
// left over documents
int remainingHits = result.totalHits - documentHitCounter;
// break the loop if count reaches maxDocs to be searched or remaining hits become <=0
if (remainingHits <= 0 || documentHitCounter >= maxDocs) {
break;
}
numberOfDocumentsToBeQueried = Math.min(remainingHits, SEARCH_LIMIT);
result = indexSearcher.searchAfter(lastScoreDoc, query, numberOfDocumentsToBeQueried);
documentHitCounter += numberOfDocumentsToBeQueried;
}
}
LOGGER.info(
"Time taken for lucene search: " + (System.currentTimeMillis() - luceneSearchStartTime)
+ " ms");
// result blocklets
List<FineGrainBlocklet> blocklets = new ArrayList<>();
// transform all blocks into result type blocklets
// Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
for (Map.Entry<String, Map<Integer, List<Short>>> mapBlocklet :
mapBlocks.entrySet()) {
String blockletId = mapBlocklet.getKey();
Map<Integer, List<Short>> mapPageIds = mapBlocklet.getValue();
List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>();
// for pages in this blocklet Map<PageId, Set<RowId>>>
for (Map.Entry<Integer, List<Short>> mapPageId : mapPageIds.entrySet()) {
// construct array rowid
int[] rowIds = new int[mapPageId.getValue().size()];
int i = 0;
// for rowids in this page Set<RowId>
for (Short rowid : mapPageId.getValue()) {
rowIds[i++] = rowid;
}
// construct one page
FineGrainBlocklet.Page page = new FineGrainBlocklet.Page();
page.setPageId(mapPageId.getKey());
page.setRowId(rowIds);
// add this page into list pages
pages.add(page);
}
// add a FineGrainBlocklet
blocklets.add(new FineGrainBlocklet(filePath, blockletId, pages));
}
return blocklets;
}
/**
* It fills the rowids to the map, its value is combined with multiple rowids as we store group
* rows and combine as per there uniqueness.
*/
private void fillMapForCombineRows(ByteBuffer intBuffer,
Map<String, Map<Integer, List<Short>>> mapBlocks, List<IndexableField> fieldsInDoc,
String blockletId) {
for (int i = 0; i < fieldsInDoc.size(); i++) {
BytesRef bytesRef = fieldsInDoc.get(i).binaryValue();
ByteBuffer buffer = ByteBuffer.wrap(bytesRef.bytes);
int pageId;
if (storeBlockletWise) {
// If we store as per blockletwise then just read pageid only we don't store blockletid
pageId = buffer.getShort();
} else {
int combineKey = buffer.getInt();
intBuffer.clear();
intBuffer.putInt(combineKey);
intBuffer.rewind();
blockletId = String.valueOf(intBuffer.getShort());
pageId = intBuffer.getShort();
}
Map<Integer, List<Short>> mapPageIds = mapBlocks.get(blockletId);
if (mapPageIds == null) {
mapPageIds = new HashMap<>();
mapBlocks.put(blockletId, mapPageIds);
}
List<Short> setRowId = mapPageIds.get(pageId);
if (setRowId == null) {
setRowId = new ArrayList<>();
mapPageIds.put(pageId, setRowId);
}
while (buffer.hasRemaining()) {
setRowId.add(buffer.getShort());
}
}
}
/**
* Fill the map with rowids from documents
*/
private void fillMap(ByteBuffer intBuffer, Map<String, Map<Integer, List<Short>>> mapBlocks,
List<IndexableField> fieldsInDoc, String blockletId) {
int combineKey = fieldsInDoc.get(0).numericValue().intValue();
intBuffer.clear();
intBuffer.putInt(combineKey);
intBuffer.rewind();
short rowId;
int pageId;
if (storeBlockletWise) {
// If we store as per blockletwise then just read pageid and rowid
// only we don't store blockletid
pageId = intBuffer.getShort();
rowId = intBuffer.getShort();
} else {
blockletId = String.valueOf(intBuffer.getShort());
pageId = intBuffer.getShort();
rowId = fieldsInDoc.get(1).numericValue().shortValue();
}
Map<Integer, List<Short>> mapPageIds = mapBlocks.get(blockletId);
if (mapPageIds == null) {
mapPageIds = new HashMap<>();
mapBlocks.put(blockletId, mapPageIds);
}
List<Short> setRowId = mapPageIds.get(pageId);
if (setRowId == null) {
setRowId = new ArrayList<>();
mapPageIds.put(pageId, setRowId);
}
setRowId.add(rowId);
}
@Override
public boolean isScanRequired(FilterResolverIntf filterExp) {
return true;
}
/**
* Clear complete index table and release memory.
*/
@Override
public void clear() {
}
@Override
public void finish() {
if (null != indexReader) {
try {
int referenceCount = indexReader.getRefCount();
if (referenceCount > 0) {
indexReader.decRef();
if (null != indexSearcherMap) {
indexSearcherMap.clear();
}
}
} catch (IOException e) {
LOGGER.error("Ignoring the exception, Error while closing the lucene index reader", e);
}
}
}
}