blob: 2714fff0309d95644d40ed32d425fc1c9d23fd82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.stanbol.enhancer.engines.lucenefstlinking;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.clerezza.rdf.core.Language;
import org.apache.clerezza.rdf.core.Literal;
import org.apache.clerezza.rdf.core.UriRef;
import org.apache.clerezza.rdf.core.impl.PlainLiteralImpl;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.RefCounted;
import org.apache.stanbol.enhancer.engines.lucenefstlinking.Match.FieldLoader;
import org.apache.stanbol.enhancer.engines.lucenefstlinking.Match.FieldType;
import org.apache.stanbol.enhancer.engines.lucenefstlinking.cache.EntityCache;
import org.apache.stanbol.enhancer.nlp.ner.NerTag;
import org.opensextant.solrtexttagger.TaggerFstCorpus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Profile created based on the {@link IndexConfiguration} for processing a
* parsed ContentItem. <p>
*
* @author Rupert Westenthaler
*
*/
public class TaggingSession implements Closeable {
private final Logger log = LoggerFactory.getLogger(TaggingSession.class);
private String language;
private Corpus langCorpus;
private Corpus defaultCorpus;
/**
* The Solr document id field holding the URI of the Entity.
*/
protected final String idField;
/**
* The Solr field holding the labels in the language of the current Document
*/
protected final String labelField;
protected final Language labelLang;
/**
* The Solr field holding the labels in the default matching language or
* <code>null</code> if the same as {@link #labelField}
*/
protected final String defaultLabelField;
protected final Language defaultLabelLang;
protected final Set<String> solrDocfields = new HashSet<String>();
protected final IndexConfiguration config;
protected final String typeField;
protected final String redirectField;
protected final String rankingField;
/**
* Used in the {@link LinkingModeEnum#NER} to store the {@link NerTag#getTag()}
* and {@link NerTag#getType()} values for the span of the Named Entity.<p>
* This information is collected by the {@link NamedEntityTokenFilter} while
* iterating over the parsed text and is used in the processing of
* {@link Tag}s to filter Entities based on their types. <p>
* Not used in any linking mode other than <code>NER</code>
*/
protected final NavigableMap<int[],Set<String>> entityMentionTypes =
new TreeMap<int[],Set<String>>(Tag.SPAN_COMPARATOR);
private final RefCounted<SolrIndexSearcher> searcherRef;
/**
* Document Cache and session statistics for the cache
*/
private RefCounted<EntityCache> documentCacheRef;
private int docLoaded = 0;
private int docCached = 0;
private int docAppended = 0;
//private final ValueSourceAccessor uniqueKeyCache;
//private final Map<Integer,Match> matchPool = new HashMap<Integer,Match>(2048);
private final FieldLoaderImpl fieldLoader;
/**
* The current version of the SolIndex (as reported by
* {@link DirectoryReader#getVersion()}) of the
* {@link IndexConfiguration#getIndex()}
*/
private final Long indexVersion;
TaggingSession(String language, IndexConfiguration config) throws CorpusException {
this.language = language;
this.config = config;
//init the SolrIndexSearcher
searcherRef = config.getIndex().getSearcher();
SolrIndexSearcher searcher = searcherRef.get();
DirectoryReader indexReader = searcher.getIndexReader();
indexVersion = Long.valueOf(indexReader.getVersion());
//check if the IndexConfiguration is up to date with the version of the index
long confVersion = config.getVersion();
if(confVersion != indexVersion){
log.debug("> update IndexConfiguration (from: {} | to: {}",confVersion, indexVersion);
config.update(indexVersion, searcher);
}
//get the corpusInfo
CorpusInfo langCorpusInfo = config.getCorpus(language);
log.debug("> language Corpus: {}", langCorpusInfo);
CorpusInfo defaultCorpusInfo = config.getDefaultCorpus();
log.debug("> default Corpus: {}", defaultCorpusInfo);
//obtain the Solr Document Id field
SchemaField idSchemaField = config.getIndex().getLatestSchema().getUniqueKeyField();
idField = idSchemaField.getName();
solrDocfields.add(idField);
//obtain the language specific fields for the session
if(langCorpusInfo == null && defaultCorpusInfo == null){
//this should not happen, because the canEnhance method of the
//engine should already reject such calls
throw new IllegalStateException("No FST Corpus configured for language '"
+language+"' and also no default FST Corpus is present.!");
}
if(langCorpusInfo != null){
this.langCorpus = new Corpus(langCorpusInfo,
obtainFstCorpus(indexVersion,langCorpusInfo));
this.labelField = langCorpusInfo.storedField;
solrDocfields.add(labelField);
this.labelLang = langCorpusInfo.language == null ||
StringUtils.isBlank(langCorpusInfo.language) ? null :
new Language(langCorpusInfo.language);
} else {
this.labelField = null;
this.labelLang = null;
}
if(defaultCorpusInfo != null && !defaultCorpusInfo.equals(langCorpusInfo)){
this.defaultCorpus = new Corpus(defaultCorpusInfo,
obtainFstCorpus(indexVersion,defaultCorpusInfo));
this.defaultLabelField = defaultCorpusInfo.storedField;
solrDocfields.add(defaultLabelField);
this.defaultLabelLang = defaultCorpusInfo.language == null ||
StringUtils.isBlank(defaultCorpusInfo.language) ? null :
new Language(defaultCorpusInfo.language);
} else {
this.defaultCorpus = null;
this.defaultLabelField = null;
this.defaultLabelLang = null;
}
if(this.defaultCorpus == null && this.langCorpus == null){
throw new CorpusException("Unable to initialise a FST corpus for language '"
+ language+"'. Neigher the language specific Coprpus (field : "
+ langCorpusInfo != null ? langCorpusInfo.indexedField : "<undefined>"
+ ") nor for the default language (field: "
+ defaultCorpusInfo != null ? defaultCorpusInfo.indexedField : "<undefined>"
+ ") is currently available!",null);
}
if(config.getEncodedTypeField() != null){
this.typeField = config.getEncodedTypeField();
solrDocfields.add(typeField);
} else {
this.typeField = null;
}
if(config.getEncodedRedirectField() != null){
this.redirectField = config.getEncodedRedirectField();
solrDocfields.add(redirectField);
} else {
this.redirectField = null;
}
if(config.getEncodedRankingField() != null){
this.rankingField = config.getEncodedRankingField();
solrDocfields.add(rankingField);
} else {
this.rankingField = null;
}
if(config.getEntityCacheManager() != null){
documentCacheRef = config.getEntityCacheManager().getCache(indexVersion);
}
// uniqueKeyCache = null; //no longer used.
// uniqueKeyCache = new ValueSourceAccessor(searcher, idSchemaField.getType()
// .getValueSource(idSchemaField, null));
fieldLoader = new FieldLoaderImpl(searcher.getIndexReader());
}
/**
* Used to instantiate {@link Match}es
* @param docId the Lucene document Id as returned by the FST corpus
* @return the Match instance
*/
public Match createMatch(int docId){
return new Match(docId,fieldLoader);
}
public void close(){
//matchPool.clear(); //clean up the matchpool
searcherRef.decref(); //clean up the Solr index searcher reference
if(documentCacheRef != null){
documentCacheRef.decref(); //clean up the DocumentCache reference
}
}
/**
* The language of this Session. This is typically the language detected for
* the document.
* @return the language of this Session
*/
public String getLanguage() {
return language;
}
// public String getTypeField() {
// return config.getTypeField();
// }
// public String getRedirectField() {
// return config.getRedirectField();
// }
// public String getDefaultLabelField() {
// return defaultLabelField;
// }
// public Language getDefaultLabelLanguage() {
// return defaultLabelLang;
// }
// public String getLabelField() {
// return labelField;
// }
// public Language getLabelLanguage() {
// return labelLang;
// }
// /**
// * @return the langCorpus
// */
// public final CorpusInfo getLangCorpus() {
// return langCorpusInfo;
// }
// /**
// * @return the defaultCorpus
// */
// public final CorpusInfo getDefaultCorpus() {
// return defaultCorpusInfo;
// }
public Corpus getDefaultCorpus() {
return defaultCorpus;
}
public Corpus getLanguageCorpus() {
return langCorpus;
}
public SolrIndexSearcher getSearcher() {
return searcherRef.get();
}
public static TaggingSession createSession(IndexConfiguration indexConfig,
String language) throws CorpusException {
TaggingSession session = new TaggingSession(language, indexConfig);
return session;
}
/**
* Getter for the EntityCache
* @return the cache or <code>null</code> if no one is configured
*/
public EntityCache getDocumentCache(){
return documentCacheRef != null ? documentCacheRef.get() : null;
}
/**
* The number of Lucene Documents loaded form disc in this session so far
* @return
*/
public int getSessionDocLoaded(){
return docLoaded;
}
/**
* The number of Lucene Documents retrieved from the {@link #getDocumentCache()}
* in this session so far
* @return
*/
public int getSessionDocCached(){
return docCached;
}
/**
* The number of Lucene Documents retrived from the {@link #getDocumentCache()},
* but with missing fields from the Cache. For such documents the additional
* fields (typically labels of different languages) where readed from disc and
* added to the cached document.
* @return
*/
public int getSessionDocAppended(){
return docAppended;
}
/**
* Obtains the FST corpus for the parsed CorpusInfo. The other parameters
* are just used for error messages in case this is not successful.
* @param indexVersion the current version of the index
* @param fstInfo the info about the corpus
* @return the TaggerFstCorpus
* @throws CorpusException if the requested corpus is currently not available
*/
private TaggerFstCorpus obtainFstCorpus(Long indexVersion, CorpusInfo fstInfo) throws CorpusException {
TaggerFstCorpus fstCorpus;
fstCorpus = fstInfo.getCorpus();
Future<TaggerFstCorpus> enqueuedCorpus = null;
if (fstCorpus == null) {
if (!fstInfo.allowCreation && fstInfo.isFstCreationError()) {
throw new CorpusException(fstInfo.getErrorMessage(), null);
}
fstInfo.corpusLock.readLock().lock();
try {
enqueuedCorpus = fstInfo.getEnqueued();
} finally {
fstInfo.corpusLock.readLock().unlock();
}
if(enqueuedCorpus == null && //not enqueued
fstInfo.allowCreation){
log.debug(" - enqueue creation of {}", fstInfo);
enqueuedCorpus = enqueue(fstInfo);
}
if(enqueuedCorpus == null){
throw new CorpusException("Unable to abtain Fst Corpus for " + fstInfo
+ "(message: " + fstInfo.getErrorMessage() + ")!", null);
}
} else { //fstCorpus != null
//check if the current FST corpus is up to date with the Solr index
if(indexVersion != null && indexVersion.longValue() != fstCorpus.getIndexVersion()){
log.debug(" - FST corpus for language '{}' is outdated", fstInfo.language);
fstInfo.corpusLock.readLock().lock();
try {
enqueuedCorpus = fstInfo.getEnqueued();
} finally {
fstInfo.corpusLock.readLock().unlock();
}
if(enqueuedCorpus == null && //not already enqueued
fstInfo.allowCreation && config.getExecutorService() != null){
log.debug(" - enqueue creation of {}", fstInfo);
enqueuedCorpus = enqueue(fstInfo);
} else {
log.warn("Unable to update outdated FST corpus for language '{}' "
+ "because runtimeCreation is {} and ExecutorServic "
+ "is {} available!", new Object[]{fstInfo.language,
fstInfo.allowCreation ? "enabled" : "disabled" ,
config.getExecutorService() == null ? "not" : ""});
log.warn(" ... please adapt the Engine configuration for up "
+ "to date FST corpora!");
}
} else { //FST corpus is up to date with the current Solr index version
log.debug("FST corpus for language '{}' is up to date", fstInfo.language);
}
}
//TODO: maybe make this configurable
int waitTime = fstCorpus == null ? 30 : 10;
if(enqueuedCorpus != null){ //we needed to build a new corpus
try {
log.debug(" - will wait max {}sec for creation of {}", waitTime, fstInfo);
fstCorpus = enqueuedCorpus.get(waitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); //recover interrupted state
} catch (ExecutionException e) {
log.warn("Unable to update outdated FST corpus " + fstInfo
+ " (message: " + fstInfo.getErrorMessage() + ")",e);
} catch (TimeoutException e) {
if(fstCorpus != null){
log.debug("unable to build FST corpus for {} in time ({}sec). Will use "
+ "previouse version ",fstInfo, waitTime);
} else {
throw new CorpusException("Unable to build Fst Corpus for " + fstInfo
+ "within " + waitTime+ "sec! Try again later.", null);
}
} catch (CancellationException e) {
if(fstCorpus != null){
log.debug("building of FST corpus for {} was cancelled. Will use "
+ "previouse version.",fstInfo);
} else {
throw new CorpusException("Building of FST Corpus " + fstInfo
+ "was cancelled!", null);
}
}
}
return fstCorpus;
}
/**
* @param fstInfo
* @return
*/
private Future<TaggerFstCorpus> enqueue(CorpusInfo fstInfo) {
Future<TaggerFstCorpus> enqueuedCorpus;
fstInfo.corpusLock.writeLock().lock();
try {
enqueuedCorpus = fstInfo.getEnqueued(); //check again in write lock
if(enqueuedCorpus == null){
//enqueue for re-creation
enqueuedCorpus = config.getExecutorService().submit(
new CorpusCreationTask(config, fstInfo));
fstInfo.enqueued(enqueuedCorpus);;
}
} finally {
fstInfo.corpusLock.writeLock().unlock();
}
return enqueuedCorpus;
}
/**
* The current version of the SolrIndex as reported by the {@link IndexReader}
* used by this TaggingSession.
* @return the current version of the SolrIndex.
*/
public Long getIndexVersion() {
return indexVersion;
}
/**
* {@link FieldLoader} implementation used to create {@link Match} instances
*/
private class FieldLoaderImpl implements FieldLoader {
private static final String LOADED_FIELDS_FIELD_NAME = "__loadedFields__";
private List<Field> loadedFieldsFields;
private final IndexReader reader;
/**
* Cache similar to the {@link EntityCache}, but with a scope bound to
* life cycle of this FieldLoaderImpl instance (a single TaggingSession).
* This cache ensures the Lucene Documents are not loaded twice while
* processing the same document (even if no EntiyCache is configured or
* the size of the EntityCache is to small).
*/
private final Map<Integer,Document> sessionCache = new HashMap<Integer,Document>();
/**
* The EntityCache instance that caches entity data over multiple sessions
*/
private final EntityCache cache;
public FieldLoaderImpl(IndexReader reader) {
this.reader = reader;
loadedFieldsFields = new ArrayList<Field>(solrDocfields.size());
for(String loadedFieldName : solrDocfields){
loadedFieldsFields.add(new StringField(LOADED_FIELDS_FIELD_NAME,
loadedFieldName, Store.NO));
}
if(documentCacheRef != null){
this.cache = documentCacheRef.get();
} else {
this.cache = null;
}
}
@Override
public Map<FieldType,Object> load(int id) throws IOException {
//load the Lucene Document for the id
Integer ID = Integer.valueOf(id);
Document doc = sessionCache.get(ID);
if(doc == null){
doc = cache != null ? cache.get(ID) : null;
if(doc == null){
doc = reader.document(id, solrDocfields);
//if we read a doc from the index we need to add information about
//the fields we loaded (especially the languages of labels loaded
//NOTE that those information will never be stored in the index. They
//are only kept in-memory when caching this document.
for(Field loadedFieldsField : loadedFieldsFields){
doc.add(loadedFieldsField);
}
docLoaded++;
if(cache != null){
cache.cache(ID, doc);
}
} else {
//we need to check if the fields of the cached doc are sufficient
//for the requested Solr Document fields
Set<String> fields = new HashSet<String>(solrDocfields);
String[] loaded = doc.getValues(LOADED_FIELDS_FIELD_NAME);
for(int i=0;i < loaded.length && !fields.isEmpty(); i++){
fields.remove(loaded[i]);
}
if(!fields.isEmpty()){ //we are missing some fields
//need to load it from the index
Document tmp = reader.document(id, fields);
//add the additional fields to the cached doc
for(IndexableField field : tmp.getFields()){
doc.add(field);
}
//also update the loaded fields
for(String loadedField : fields){
doc.add(new StringField(LOADED_FIELDS_FIELD_NAME,
loadedField, Store.NO));
}
//NOTE: no need to update the cache, as we have updated the
//cached value.
//cache.cache(ID, doc);
docAppended++;
} else {
docCached++;
}
//and put the doc in the sessionCache
}
//add this doc to the session cache
sessionCache.put(ID, doc);
} //else { //document is in the session cache ... just use it
//NOTE: The session cache has a minor side effect on the
// EntityCache. Because multiple occurrences of an Entity
// within the Document are not requested on the EntityCache
// LRU based implementations will get slightly different
// statistics. Assuming that the maximum size of the EntityCache
// is >> as the number of Documents matching for the current Text
// this effect can be considered as negligible.
//}
if(doc != null){
Map<FieldType,Object> values =
new EnumMap<Match.FieldType,Object>(FieldType.class);
//load the ID
values.put(FieldType.id, doc.get(idField));
//load the labels
Set<Literal> labels = new HashSet<Literal>();
for(String label : doc.getValues(labelField)){
labels.add(new PlainLiteralImpl(label, labelLang));
}
if(defaultLabelField != null){
for(String label : doc.getValues(defaultLabelField)){
labels.add(new PlainLiteralImpl(label, defaultLabelLang));
}
}
values.put(FieldType.label, labels);
//load the types
if(typeField != null){
Set<UriRef> types = new HashSet<UriRef>();
for(String type : doc.getValues(typeField)){
types.add(new UriRef(type));
}
values.put(FieldType.type, types);
}
//load the redirects
if(redirectField != null){
Set<UriRef> redirects = new HashSet<UriRef>();
for(String redirect : doc.getValues(redirectField)){
redirects.add(new UriRef(redirect));
}
values.put(FieldType.redirect, redirects);
}
//load the rankings
if(rankingField != null){
IndexableField field = doc.getField(rankingField);
if(field != null) {
Number num = field.numericValue();
Double ranking;
if(num instanceof Double){
ranking = (Double)num;
} else if (num != null){
ranking = Double.valueOf(num.doubleValue());
} else { //num == null
String value = field.stringValue();
if(value != null){
try {
ranking = Double.valueOf(value);
} catch (NumberFormatException e) {
ranking = null;
}
} else {
ranking = null;
}
}
if(ranking != null){
values.put(FieldType.ranking, ranking);
}
}
}
return values;
} else {
throw new IOException("No document found for Lucene doc id '"+id+"'!");
}
}
}
public class Corpus {
private CorpusInfo corpusInfo;
private TaggerFstCorpus fst;
Corpus(CorpusInfo corpusInfo, TaggerFstCorpus fst){
this.corpusInfo = corpusInfo;
this.fst = fst;
}
public String getLanugage(){
return corpusInfo.language;
}
public Analyzer getAnalyzer(){
return corpusInfo.analyzer;
}
public Analyzer getTaggingAnalyzer(){
return corpusInfo.taggingAnalyzer;
}
public TaggerFstCorpus getFst(){
return fst;
}
public String getIndexedField() {
return corpusInfo.indexedField;
}
public String getStoredField(){
return corpusInfo.storedField;
}
}
public boolean isSkipAltTokens() {
return config.isSkipAltTokens();
}
}