blob: 433defa5b4688cdd453efc753352fe8a9281f41e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.lucene.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.SchemaFactory;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.gora.lucene.query.LuceneQuery;
import org.apache.gora.lucene.query.LuceneResult;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.FileSplitPartitionQuery;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.FileBackedDataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.GoraException;
import org.apache.gora.util.IOUtils;
import org.apache.gora.util.OperationNotSupportedException;
import org.apache.hadoop.conf.Configurable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
/**
* {@link org.apache.gora.lucene.store.LuceneStore} is the primary class
* responsible for GORA CRUD operations on Lucene.
*/
public class LuceneStore<K, T extends PersistentBase>
extends FileBackedDataStoreBase<K, T> implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(LuceneStore.class);
private static final String DEFAULT_MAPPING_FILE = "gora-lucene-mapping.xml";
private static final String XSD_MAPPING_FILE = "gora-lucene.xsd";
private static final String LUCENE_VERSION_KEY = "gora.lucene.index.version";
private static final String DEFAULT_LUCENE_VERSION = "LATEST";
private static final String LUCENE_RAM_BUFFER_KEY = "gora.lucene.index.writer.rambuffer";
private static final String DEFAULT_LUCENE_RAMBUFFER = "16";
private LuceneMapping mapping;
private IndexWriter writer;
private SearcherManager searcherManager;
private Directory dir;
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) throws GoraException {
try {
super.initialize(keyClass, persistentClass, properties);
} catch (GoraException ge) {
LOG.error(ge.getMessage(), ge);
throw new GoraException(ge);
}
String mappingFile = null;
try {
mappingFile = DataStoreFactory.getMappingFile(
properties, (DataStore<?, ?>) this, DEFAULT_MAPPING_FILE);
} catch (IOException ioe) {
LOG.error(ioe.getMessage(), ioe);
throw new GoraException(ioe);
}
String luceneVersion = properties.getProperty(
LUCENE_VERSION_KEY, DEFAULT_LUCENE_VERSION);
String ramBuffer = properties.getProperty(
LUCENE_RAM_BUFFER_KEY, DEFAULT_LUCENE_RAMBUFFER);
LOG.debug("Lucene index version: {}", luceneVersion);
LOG.debug("Lucene index writer RAM buffer size: {}", ramBuffer);
try {
mapping = readMapping(mappingFile);
} catch (IOException ioe) {
LOG.error(ioe.getMessage(), ioe);
throw new GoraException(ioe);
}
String persistentClassObject = persistentClass.getCanonicalName();
String dataStoreOutputPath = outputPath + "_" + persistentClassObject
.substring(persistentClassObject.lastIndexOf('.') + 1).toLowerCase(Locale.getDefault());
try {
dir = FSDirectory.open(FileSystems.getDefault().getPath(dataStoreOutputPath));
Analyzer analyzer = new StandardAnalyzer();
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
iwc.setOpenMode(OpenMode.CREATE_OR_APPEND);
iwc.setRAMBufferSizeMB(Double.parseDouble(ramBuffer));
writer = new IndexWriter(dir, iwc);
//TODO do we definately want all past deletions to be applied.
searcherManager = new SearcherManager(writer, true, true, new SearcherFactory());
} catch (IOException e) {
LOG.error("Error opening {} with Lucene FSDirectory.", outputPath, e);
}
}
private LuceneMapping readMapping(String filename) throws IOException {
try {
LuceneMapping mapping = new LuceneMapping();
javax.xml.validation.Schema newSchema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
.newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
newSchema.newValidator().validate(new StreamSource(getClass().getClassLoader().getResourceAsStream(filename)));
DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
org.w3c.dom.Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
Element root = dom.getDocumentElement();
NodeList nl = root.getElementsByTagName("class");
for (int i = 0; i < nl.getLength(); i++) {
Element classElement = (Element) nl.item(i);
if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
&& classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
NodeList fields;
Element fe;
fields = classElement.getElementsByTagName("primarykey");
for (int j = 0; j < fields.getLength(); j++) {
fe = (Element) fields.item(j);
mapping.setPrimaryKey(fe.getAttribute("column"));
}
fields = classElement.getElementsByTagName("field");
for (int j = 0; j < fields.getLength(); j++) {
fe = (Element) fields.item(j);
String name = fe.getAttribute("name");
String column = fe.getAttribute("column");
mapping.addField(name, column);
}
}
}
return mapping;
} catch (Exception ex) {
throw new IOException("Unable to read " + filename, ex);
}
}
@Override
public boolean delete(K key) {
try {
writer.deleteDocuments(new Term(mapping.getPrimaryKey(), key.toString()));
searcherManager.maybeRefresh();
return true;
} catch (IOException e) {
LOG.error("Unable to delete key: {}", key.toString(), e);
}
return false;
}
private boolean isPrimaryKeyIncluded(String[] fields) {
HashSet<String> luceneFields = new HashSet<>();
if (fields.length > 0) {
for (String field : fields) {
luceneFields.add(getMapping().getLuceneField(field));
}
}
return luceneFields.contains(getMapping().getPrimaryKey());
}
@Override
public long deleteByQuery(Query<K, T> query) {
try {
// Figure out how many were there before
LuceneQuery<K, T> q = (LuceneQuery<K, T>) query;
LuceneResult<K, T> r = (LuceneResult<K, T>) q.execute();
int before = r.getScoreDocs().length;
if (query.getFields() == null || (query.getFields().length == getFields().length)
|| isPrimaryKeyIncluded(query.getFields())) {
// Delete them
writer.deleteDocuments(q.toLuceneQuery());
searcherManager.maybeRefresh();
} else {
Query<K, T> selectQuery = this.newQuery();
selectQuery.setStartKey(q.getStartKey());
selectQuery.setEndKey(q.getEndKey());
LuceneResult<K, T> selectResult = (LuceneResult<K, T>) selectQuery.execute();
ScoreDoc[] scoreDocs = selectResult.getScoreDocs();
HashSet<String> fields = new HashSet<>();
fields.addAll(mapping.getLuceneFields());
IndexSearcher searcher = selectResult.getSearcher();
if (scoreDocs.length > 0) {
for (ScoreDoc scoreDoc : scoreDocs) {
Document doc = searcher.doc(scoreDoc.doc, fields);
for (String avroField : query.getFields()) {
String docField = mapping.getLuceneField(avroField);
if (doc.getField(docField) != null) {
doc.removeField(docField);
}
}
String key = doc.get(getMapping().getPrimaryKey());
doc.add(new StringField(mapping.getPrimaryKey(), key, Store.YES));
writer.updateDocument(new Term(mapping.getPrimaryKey(), key), doc);
searcherManager.maybeRefresh();
}
}
selectResult.close();
}
// Figure out how many there are after
r = (LuceneResult<K, T>) q.execute();
int after = r.getScoreDocs().length;
return before - after;
} catch (IOException e) {
LOG.error("Unable to deleteByQuery: {}", query.toString(), e);
}
return 0;
}
@Override
public void deleteSchema() {
try {
writer.deleteAll();
searcherManager.maybeRefresh();
} catch (IOException e) {
LOG.error("Unable to deleteAll: {}", e);
}
}
@Override
public boolean exists(K key) throws GoraException {
boolean resp = false;
try {
final IndexSearcher s = searcherManager.acquire();
TermQuery q = new TermQuery(new Term(mapping.getPrimaryKey(), key.toString()));
if (s.count(q) > 0) {
resp = true;
}
searcherManager.release(s);
} catch (IOException e) {
LOG.error("Error in exists: {}", e);
}
return resp;
}
@Override
public T get(K key, String[] fieldsToLoad) {
Set<String> fields;
if (fieldsToLoad != null) {
fields = new HashSet<>(fieldsToLoad.length);
fields.addAll(Arrays.asList(fieldsToLoad));
} else {
fields = new HashSet<>();
fields.addAll(mapping.getLuceneFields());
}
try {
final IndexSearcher s = searcherManager.acquire();
TermQuery q = new TermQuery(new Term(mapping.getPrimaryKey(), key.toString()));
ScoreDoc[] hits = s.search(q, 2).scoreDocs;
if (hits.length > 0) {
Document doc = s.doc(hits[0].doc, fields);
LOG.debug("get:Document: {}", doc.toString());
String[] a = {};
return newInstance(doc, fields.toArray(a));
}
searcherManager.release(s);
} catch (IOException e) {
LOG.error("Error in get: {}", e);
}
return null;
}
private Object convertDocFieldToAvroUnion(final Schema fieldSchema,
final Schema.Field field,
final String sf,
final Document doc) throws IOException {
Object result;
Schema.Type type0 = fieldSchema.getTypes().get(0).getType();
Schema.Type type1 = fieldSchema.getTypes().get(1).getType();
if (!type0.equals(type1)
&& (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
Schema innerSchema = null;
if (type0.equals(Schema.Type.NULL)) {
innerSchema = fieldSchema.getTypes().get(1);
} else {
innerSchema = fieldSchema.getTypes().get(0);
}
result = convertToIndexableFieldToAvroField(doc, field, innerSchema, sf);
} else {
throw new GoraException("LuceneStore only supports Union of two types field.");
}
return result;
}
private SpecificDatumReader getDatumReader(Schema fieldSchema) {
// reuse
return new SpecificDatumReader(fieldSchema);
}
private Object convertToIndexableFieldToAvroField(final Document doc,
final Schema.Field field,
final Schema fieldSchema,
final String sf) throws IOException {
Object result = null;
T persistent = newPersistent();
Object sv;
switch (fieldSchema.getType()) {
case MAP:
case ARRAY:
case RECORD:
sv = doc.getBinaryValue(sf);
if (sv == null) {
break;
}
BytesRef b = (BytesRef) sv;
SpecificDatumReader reader = getDatumReader(fieldSchema);
result = IOUtils.deserialize(b.bytes, reader, persistent.get(field.pos()));
break;
case UNION:
result = convertDocFieldToAvroUnion(fieldSchema, field, sf, doc);
break;
case ENUM:
sv = doc.get(sf);
if (sv == null) {
break;
}
result = AvroUtils.getEnumValue(fieldSchema, (String) sv);
break;
case BYTES:
sv = doc.getBinaryValue(sf);
if (sv == null) {
break;
}
result = ByteBuffer.wrap(((BytesRef) sv).bytes);
break;
default:
sv = doc.get(sf);
if (sv == null) {
break;
}
result = convertLuceneFieldToAvroField(fieldSchema.getType(), sv);
}
return result;
}
public T newInstance(Document doc, String[] fields) throws IOException {
T persistent = newPersistent();
if (fields == null) {
fields = fieldMap.keySet().toArray(new String[fieldMap.size()]);
}
String pk = mapping.getPrimaryKey();
for (String f : fields) {
org.apache.avro.Schema.Field field = fieldMap.get(f);
String sf;
if (pk.equals(f)) {
sf = f;
} else {
sf = mapping.getLuceneField(f);
}
Schema fieldSchema = field.schema();
Object fieldValue = convertToIndexableFieldToAvroField(doc, field, fieldSchema, sf);
if (fieldValue == null) {
continue;
}
persistent.put(field.pos(), fieldValue);
persistent.setDirty(field.pos());
}
persistent.clearDirty();
return persistent;
}
private Object convertLuceneFieldToAvroField(Type t, Object o) {
Object result = null;
switch (t) {
case FIXED:
// Could we combine this with the BYTES section below and
// either fix the size of the array or not depending on Type?
// This might be a buffer copy. Do we need to pad if the
// fixed sized data is smaller than the type? Do we truncate
// if the data is larger than the type?
LOG.error("Fixed-sized fields are not supported yet");
break;
case BOOLEAN:
result = Boolean.parseBoolean((String) o);
break;
case DOUBLE:
result = Double.parseDouble((String) o);
break;
case FLOAT:
result = Float.parseFloat((String) o);
break;
case INT:
result = Integer.parseInt((String) o);
break;
case LONG:
result = Long.parseLong((String) o);
break;
case STRING:
result = new Utf8(o.toString());
break;
default:
LOG.error("Unknown field type: {}", t);
}
return result;
}
@Override
public String getSchemaName() {
return "default";
}
@Override
public Query<K, T> newQuery() {
return new LuceneQuery<>(this);
}
private IndexableField convertAvroUnionToDocumentField(final String sf,
final Schema fieldSchema,
final Object value) {
IndexableField result;
Schema.Type type0 = fieldSchema.getTypes().get(0).getType();
Schema.Type type1 = fieldSchema.getTypes().get(1).getType();
if (!type0.equals(type1)
&& (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
Schema innerSchema = null;
if (type0.equals(Schema.Type.NULL)) {
innerSchema = fieldSchema.getTypes().get(1);
} else {
innerSchema = fieldSchema.getTypes().get(0);
}
result = convertToIndexableField(sf, innerSchema, value);
} else {
throw new IllegalStateException("LuceneStore only supports Union of two types field.");
}
return result;
}
private SpecificDatumWriter getDatumWriter(Schema fieldSchema) {
return new SpecificDatumWriter(fieldSchema);
}
private IndexableField convertToIndexableField(String sf, Schema fieldSchema, Object o) {
IndexableField result = null;
switch (fieldSchema.getType()) {
case MAP: //TODO: These should be handled better
case ARRAY:
case RECORD:
// For now we'll just store the bytes
byte[] data = new byte[0];
try {
SpecificDatumWriter writer = getDatumWriter(fieldSchema);
data = IOUtils.serialize(writer, o);
} catch (IOException e) {
LOG.error("Error occurred while serializing record", e);
}
result = new StoredField(sf, data);
break;
case UNION:
result = convertAvroUnionToDocumentField(sf, fieldSchema, o);
break;
case BYTES:
result = new StoredField(sf, ((ByteBuffer) o).array());
break;
case ENUM:
case STRING:
//TODO make this Text based on a mapping.xml attribute
result = new StringField(sf, o.toString(), Store.YES);
break;
case BOOLEAN:
result = new StringField(sf, o.toString(), Store.YES);
break;
case DOUBLE:
result = new StoredField(sf, (Double) o);
break;
case FLOAT:
result = new StoredField(sf, (Float) o);
break;
case INT:
result = new StoredField(sf, (Integer) o);
break;
case LONG:
result = new StoredField(sf, (Long) o);
break;
default:
LOG.error("Unknown field type: {}", fieldSchema.getType());
}
return result;
}
@Override
public void put(K key, T persistent) {
Schema schema = persistent.getSchema();
Document doc = new Document();
// populate the doc
List<org.apache.avro.Schema.Field> fields = schema.getFields();
for (org.apache.avro.Schema.Field field : fields) {
if (!persistent.isDirty(field.name())) {
continue;
}
String sf = mapping.getLuceneField(field.name());
if (sf == null) {
continue;
}
Schema fieldSchema = field.schema();
Object o = persistent.get(field.pos());
if (o == null) {
continue;
}
doc.add(convertToIndexableField(sf, fieldSchema, o));
}
LOG.info("DOCUMENT: {}", doc);
try {
if (key instanceof Integer) {
doc.add(new IntPoint(mapping.getPrimaryKey(), (Integer) key));
} else if (key instanceof Long) {
doc.add(new LongPoint(mapping.getPrimaryKey(), (Long) key));
} else if (key instanceof Float) {
doc.add(new FloatPoint(mapping.getPrimaryKey(), (Float) key));
} else if (key instanceof Double) {
doc.add(new DoublePoint(mapping.getPrimaryKey(), (Double) key));
} else {
doc.add(new StringField(mapping.getPrimaryKey(), key.toString(), Store.YES));
}
LOG.info("DOCUMENT: {}", doc);
if (get(key, null) == null) {
writer.addDocument(doc);
} else {
writer.updateDocument(
new Term(mapping.getPrimaryKey(), key.toString()),
doc);
}
searcherManager.maybeRefresh();
} catch (IOException e) {
LOG.error("Error updating document: {}", e);
}
}
@Override
protected Result<K, T> executePartial(FileSplitPartitionQuery<K, T> arg0)
throws IOException {
throw new OperationNotSupportedException("executePartial is not supported for LuceneStore");
}
@Override
protected Result<K, T> executeQuery(Query<K, T> query) throws IOException {
try {
return new LuceneResult<>(this, query, searcherManager);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
return null;
}
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) {
throw new OperationNotSupportedException("getPartitions is not supported for LuceneStore");
}
@Override
public void flush() {
try {
writer.commit();
searcherManager.maybeRefreshBlocking();
} catch (IOException e) {
LOG.error("Error in commit: {}", e);
}
}
@Override
public void close() {
try {
searcherManager.close();
writer.close();
dir.close();
} catch (IOException e) {
LOG.error("Error in close: {}", e);
}
super.close();
}
public LuceneMapping getMapping() {
return mapping;
}
}