blob: 30dba7be71d89f60b22b7e50fc9129d2cdebd1a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.elasticsearch.mapping;
import com.google.inject.ConfigurationException;
import org.apache.commons.io.IOUtils;
import org.apache.gora.elasticsearch.store.ElasticsearchStore;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.util.GoraException;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import javax.xml.XMLConstants;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Locale;
/**
* Builder for Mapping definitions of Elasticsearch.
*/
public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
/**
* XSD validation file for the XML mapping.
*/
private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
// Index description
static final String ATT_NAME = "name";
static final String ATT_TYPE = "type";
// Class description
static final String TAG_CLASS = "class";
static final String ATT_KEYCLASS = "keyClass";
static final String ATT_INDEX = "index";
static final String TAG_FIELD = "field";
static final String ATT_DOCFIELD = "docfield";
static final String ATT_SCALINGFACTOR = "scalingFactor";
/**
* Mapping instance being built.
*/
private ElasticsearchMapping elasticsearchMapping;
private final ElasticsearchStore<K, T> dataStore;
/**
* Constructor for ElasticsearchMappingBuilder.
*
* @param store ElasticsearchStore instance
*/
public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
this.elasticsearchMapping = new ElasticsearchMapping();
this.dataStore = store;
}
/**
* Returns the Elasticsearch Mapping being built.
*
* @return Elasticsearch Mapping instance
*/
public ElasticsearchMapping getElasticsearchMapping() {
return elasticsearchMapping;
}
/**
* Sets the Elasticsearch Mapping.
*
* @param elasticsearchMapping Elasticsearch Mapping instance
*/
public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
this.elasticsearchMapping = elasticsearchMapping;
}
/**
* Reads Elasticsearch mappings from file.
*
* @param inputStream Mapping input stream
* @param xsdValidation Parameter for enabling XSD validation
*/
public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
try {
SAXBuilder saxBuilder = new SAXBuilder();
if (inputStream == null) {
LOG.error("The mapping input stream is null!");
throw new GoraException("The mapping input stream is null!");
}
// Convert input stream to a string to use it a few times
String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
// XSD validation for XML file
if (xsdValidation) {
Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
.newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
schema.newValidator().validate(xmlSource);
LOG.info("Mapping file is valid.");
}
Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
if (document == null) {
LOG.error("The mapping document is null!");
throw new GoraException("The mapping document is null!");
}
Element root = document.getRootElement();
// Extract class descriptions
@SuppressWarnings("unchecked")
List<Element> classElements = root.getChildren(TAG_CLASS);
for (Element classElement : classElements) {
final Class<T> persistentClass = dataStore.getPersistentClass();
final Class<K> keyClass = dataStore.getKeyClass();
if (haveKeyClass(keyClass, classElement)
&& havePersistentClass(persistentClass, classElement)) {
loadPersistentClass(classElement, persistentClass);
break;
}
}
} catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
throw new RuntimeException(ex);
}
LOG.info("Gora Elasticsearch mapping file was read successfully.");
}
private boolean haveKeyClass(final Class<K> keyClass,
final Element classElement) {
return classElement.getAttributeValue(ATT_KEYCLASS).equals(
keyClass.getName());
}
private boolean havePersistentClass(final Class<T> persistentClass,
final Element classElement) {
return classElement.getAttributeValue(ATT_NAME).equals(
persistentClass.getName());
}
/**
* Handle the XML parsing of the class definition.
*
* @param classElement the XML node containing the class definition
*/
protected void loadPersistentClass(Element classElement,
Class<T> pPersistentClass) {
String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
elasticsearchMapping.setIndexName(indexName);
// docNameFromMapping could be null here
if (!indexName.equals(indexNameFromMapping)) {
ElasticsearchStore.LOG
.info("Keyclass and nameclass match, but mismatching index names. "
+ "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
indexNameFromMapping, indexName);
if (indexNameFromMapping != null) {
elasticsearchMapping.setIndexName(indexName);
}
}
// Process fields declaration
@SuppressWarnings("unchecked")
List<Element> fields = classElement.getChildren(TAG_FIELD);
for (Element fieldElement : fields) {
String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
Field field;
if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
} else {
field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
}
elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
}
}
}