blob: 43950240e293cfdb01833255b0dd4412457f0520 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.opt;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.DateTools;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.MultiFieldQueryParser;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermRangeFilter;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Version;
import org.h2.util.JdbcUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.KEY_FIELD_NAME;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.VAL_FIELD_NAME;
/**
* Lucene fulltext index.
*/
public class GridLuceneIndex implements AutoCloseable {
/** Field name for string representation of value. */
public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
/** Field name for value version. */
public static final String VER_FIELD_NAME = "_gg_ver__";
/** Field name for value expiration time. */
public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";
/** */
private final String spaceName;
/** */
private final GridQueryTypeDescriptor type;
/** */
private final IndexWriter writer;
/** */
private final String[] idxdFields;
/** */
private final AtomicLong updateCntr = new GridAtomicLong();
/** */
private final GridLuceneDirectory dir;
/** */
private final GridKernalContext ctx;
/**
* Constructor.
*
* @param ctx Kernal context.
* @param mem Unsafe memory.
* @param spaceName Space name.
* @param type Type descriptor.
* @throws IgniteCheckedException If failed.
*/
public GridLuceneIndex(GridKernalContext ctx, @Nullable GridUnsafeMemory mem,
@Nullable String spaceName, GridQueryTypeDescriptor type) throws IgniteCheckedException {
this.ctx = ctx;
this.spaceName = spaceName;
this.type = type;
dir = new GridLuceneDirectory(mem == null ? new GridUnsafeMemory(0) : mem);
try {
writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_30, new StandardAnalyzer(
Version.LUCENE_30)));
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
GridQueryIndexDescriptor idx = type.textIndex();
if (idx != null) {
Collection<String> fields = idx.fields();
idxdFields = new String[fields.size() + 1];
fields.toArray(idxdFields);
}
else {
assert type.valueTextIndex() || type.valueClass() == String.class;
idxdFields = new String[1];
}
idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
}
/**
* @return Cache object context.
*/
private CacheObjectContext objectContext() {
if (ctx == null)
return null;
return ctx.cache().internalCache(spaceName).context().cacheObjectContext();
}
/**
* Stores given data in this fulltext index.
*
* @param k Key.
* @param v Value.
* @param ver Version.
* @param expires Expiration time.
* @throws IgniteCheckedException If failed.
*/
public void store(CacheObject k, CacheObject v, GridCacheVersion ver, long expires) throws IgniteCheckedException {
CacheObjectContext coctx = objectContext();
Object key = k.isPlatformType() ? k.value(coctx, false) : k;
Object val = v.isPlatformType() ? v.value(coctx, false) : v;
Document doc = new Document();
boolean stringsFound = false;
if (type.valueTextIndex() || type.valueClass() == String.class) {
doc.add(new Field(VAL_STR_FIELD_NAME, val.toString(), Field.Store.YES, Field.Index.ANALYZED));
stringsFound = true;
}
for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
Object fieldVal = type.value(idxdFields[i], key, val);
if (fieldVal != null) {
doc.add(new Field(idxdFields[i], fieldVal.toString(), Field.Store.YES, Field.Index.ANALYZED));
stringsFound = true;
}
}
String keyStr = org.apache.commons.codec.binary.Base64.encodeBase64String(k.valueBytes(coctx));
try {
// Delete first to avoid duplicates.
writer.deleteDocuments(new Term(KEY_FIELD_NAME, keyStr));
if (!stringsFound)
return; // We did not find any strings to be indexed, will not store data at all.
doc.add(new Field(KEY_FIELD_NAME, keyStr, Field.Store.YES, Field.Index.NOT_ANALYZED));
if (type.valueClass() != String.class)
doc.add(new Field(VAL_FIELD_NAME, v.valueBytes(coctx)));
doc.add(new Field(VER_FIELD_NAME, ver.toString().getBytes()));
doc.add(new Field(EXPIRATION_TIME_FIELD_NAME, DateTools.timeToString(expires,
DateTools.Resolution.MILLISECOND), Field.Store.YES, Field.Index.NOT_ANALYZED));
writer.addDocument(doc);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
finally {
updateCntr.incrementAndGet();
}
}
/**
* Removes entry for given key from this index.
*
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
public void remove(CacheObject key) throws IgniteCheckedException {
try {
writer.deleteDocuments(new Term(KEY_FIELD_NAME,
org.apache.commons.codec.binary.Base64.encodeBase64String(key.valueBytes(objectContext()))));
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
finally {
updateCntr.incrementAndGet();
}
}
/**
* Runs lucene fulltext query over this index.
*
* @param qry Query.
* @param filters Filters over result.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String qry,
IndexingQueryFilter filters) throws IgniteCheckedException {
IndexReader reader;
try {
long updates = updateCntr.get();
if (updates != 0) {
writer.commit();
updateCntr.addAndGet(-updates);
}
reader = IndexReader.open(writer, true);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
IndexSearcher searcher = new IndexSearcher(reader);
MultiFieldQueryParser parser = new MultiFieldQueryParser(Version.LUCENE_30, idxdFields,
writer.getAnalyzer());
// Filter expired items.
Filter f = new TermRangeFilter(EXPIRATION_TIME_FIELD_NAME, DateTools.timeToString(U.currentTimeMillis(),
DateTools.Resolution.MILLISECOND), null, false, false);
TopDocs docs;
try {
docs = searcher.search(parser.parse(qry), f, Integer.MAX_VALUE);
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
IgniteBiPredicate<K, V> fltr = null;
if (filters != null)
fltr = filters.forSpace(spaceName);
return new It<>(reader, searcher, docs.scoreDocs, fltr);
}
/** {@inheritDoc} */
@Override public void close() {
U.closeQuiet(writer);
U.closeQuiet(dir);
}
/**
* Key-value iterator over fulltext search result.
*/
private class It<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final IndexReader reader;
/** */
private final IndexSearcher searcher;
/** */
private final ScoreDoc[] docs;
/** */
private final IgniteBiPredicate<K, V> filters;
/** */
private int idx;
/** */
private IgniteBiTuple<K, V> curr;
/** */
private CacheObjectContext coctx;
/**
* Constructor.
*
* @param reader Reader.
* @param searcher Searcher.
* @param docs Docs.
* @param filters Filters over result.
* @throws IgniteCheckedException if failed.
*/
private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] docs, IgniteBiPredicate<K, V> filters)
throws IgniteCheckedException {
this.reader = reader;
this.searcher = searcher;
this.docs = docs;
this.filters = filters;
coctx = objectContext();
findNext();
}
/**
* Filters key using predicates.
*
* @param key Key.
* @param val Value.
* @return {@code True} if key passes filter.
*/
private boolean filter(K key, V val) {
return filters == null || filters.apply(key, val) ;
}
/**
* @param bytes Bytes.
* @param ldr Class loader.
* @return Object.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
if (coctx == null) // For tests.
return (Z)JdbcUtils.deserialize(bytes, null);
return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
}
/**
* Finds next element.
*
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private void findNext() throws IgniteCheckedException {
curr = null;
while (idx < docs.length) {
Document doc;
try {
doc = searcher.doc(docs[idx++].doc);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
ClassLoader ldr = null;
if (ctx != null && ctx.deploy().enabled())
ldr = ctx.cache().internalCache(spaceName).context().deploy().globalLoader();
K k = unmarshall(org.apache.commons.codec.binary.Base64.decodeBase64(doc.get(KEY_FIELD_NAME)), ldr);
V v = type.valueClass() == String.class ?
(V)doc.get(VAL_STR_FIELD_NAME) :
this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME), ldr);
assert v != null;
if (!filter(k, v))
continue;
curr = new IgniteBiTuple<>(k, v);
break;
}
}
/** {@inheritDoc} */
@Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
IgniteBiTuple<K, V> res = curr;
findNext();
return res;
}
/** {@inheritDoc} */
@Override protected boolean onHasNext() throws IgniteCheckedException {
return curr != null;
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
U.closeQuiet(searcher);
U.closeQuiet(reader);
}
}
}