blob: d2dfe4bd54b965270519891faa559cc187f1c8f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.metamodel.elasticsearch.nativeclient;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableSchema;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.ObjectLookupContainer;
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataContext implementation for ElasticSearch analytics engine.
*
* ElasticSearch has a data storage structure hierarchy that briefly goes like
* this:
* <ul>
* <li>Index</li>
* <li>Document type (short: Type) (within an index)</li>
* <li>Documents (of a particular type)</li>
* </ul>
*
* When instantiating this DataContext, an index name is provided. Within this
* index, each document type is represented as a table.
*
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*/
public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
private final Client elasticSearchClient;
private final String indexName;
// Table definitions that are set from the beginning, not supposed to be
// changed.
private final List<SimpleTableDef> staticTableDefinitions;
// Table definitions that are discovered, these can change
private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
/**
* Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
* custom array of {@link SimpleTableDef}s which allows the user to define
* his own view on the indexes in the engine.
*
* @param client
* the ElasticSearch client
* @param indexName
* the name of the ElasticSearch index to represent
* @param tableDefinitions
* an array of {@link SimpleTableDef}s, which define the table
* and column model of the ElasticSearch index.
*/
public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
super(false);
if (client == null) {
throw new IllegalArgumentException("ElasticSearch Client cannot be null");
}
if (indexName == null || indexName.trim().length() == 0) {
throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
}
this.elasticSearchClient = client;
this.indexName = indexName;
this.staticTableDefinitions = Arrays.asList(tableDefinitions);
this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
}
/**
* Constructs a {@link ElasticSearchDataContext} and automatically detects
* the schema structure/view on all indexes (see
* {@link #detectTable(ClusterState, String, String)}).
*
* @param client
* the ElasticSearch client
* @param indexName
* the name of the ElasticSearch index to represent
*/
public ElasticSearchDataContext(Client client, String indexName) {
this(client, indexName, new SimpleTableDef[0]);
}
/**
* Performs an analysis of the available indexes in an ElasticSearch cluster
* {@link Client} instance and detects the elasticsearch types structure
* based on the metadata provided by the ElasticSearch java client.
*
* @see {@link #detectTable(ClusterState, String, String)}
* @return a mutable schema instance, useful for further fine tuning by the
* user.
*/
private SimpleTableDef[] detectSchema() {
logger.info("Detecting schema for index '{}'", indexName);
final ClusterState cs;
final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
.prepareState();
// different methods here to set the index name, so we have to use
// reflection :-/
try {
final byte majorVersion = Version.CURRENT.major;
final Object methodArgument = new String[] { indexName };
if (majorVersion == 0) {
final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
method.invoke(clusterStateRequestBuilder, methodArgument);
} else {
final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
method.invoke(clusterStateRequestBuilder, methodArgument);
}
} catch (Exception e) {
logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
}
cs = clusterStateRequestBuilder.execute().actionGet().getState();
final List<SimpleTableDef> result = new ArrayList<>();
final IndexMetaData imd = cs.getMetaData().index(indexName);
if (imd == null) {
// index does not exist
logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
} else {
final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
final ObjectLookupContainer<String> documentTypes = mappings.keys();
for (final Object documentTypeCursor : documentTypes) {
final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
try {
final SimpleTableDef table = detectTable(cs, indexName, documentType);
result.add(table);
} catch (Exception e) {
logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
}
}
}
final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
@Override
public int compare(SimpleTableDef o1, SimpleTableDef o2) {
return o1.getName().compareTo(o2.getName());
}
});
return tableDefArray;
}
/**
* Performs an analysis of an available index type in an ElasticSearch
* {@link Client} client and tries to detect the index structure based on
* the metadata provided by the java client.
*
* @param cs
* the ElasticSearch cluster
* @param indexName
* the name of the index
* @param documentType
* the name of the index type
* @return a table definition for ElasticSearch.
*/
public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception {
logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName);
final IndexMetaData imd = cs.getMetaData().index(indexName);
if (imd == null) {
// index does not exist
throw new IllegalArgumentException("No such index: " + indexName);
}
final MappingMetaData mappingMetaData = imd.mapping(documentType);
if (mappingMetaData == null) {
throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
}
final Map<String, Object> mp = mappingMetaData.getSourceAsMap();
final Object metadataProperties = mp.get("properties");
if (metadataProperties != null && metadataProperties instanceof Map) {
@SuppressWarnings("unchecked")
final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties;
final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap);
final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(),
metaData.getColumnTypes());
return std;
}
throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType
+ "' in index: " + indexName);
}
@Override
protected Schema getMainSchema() throws MetaModelException {
final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
for (final SimpleTableDef tableDef : staticTableDefinitions) {
addTable(theSchema, tableDef);
}
final SimpleTableDef[] tables = detectSchema();
synchronized (this) {
dynamicTableDefinitions.clear();
dynamicTableDefinitions.addAll(Arrays.asList(tables));
for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
final List<String> tableNames = theSchema.getTableNames();
if (!tableNames.contains(tableDef.getName())) {
addTable(theSchema, tableDef);
}
}
}
return theSchema;
}
private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
final MutableTable table = tableDef.toTable().setSchema(theSchema);
final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
if (idColumn != null && idColumn instanceof MutableColumn) {
final MutableColumn mutableColumn = (MutableColumn) idColumn;
mutableColumn.setPrimaryKey(true);
}
theSchema.addTable(table);
}
@Override
protected String getMainSchemaName() throws MetaModelException {
return indexName;
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
List<FilterItem> whereItems, int firstRow, int maxRows) {
final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
LogicalOperator.AND);
if (queryBuilder != null) {
// where clause can be pushed down to an ElasticSearch query
final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
final SearchResponse response = searchRequest.execute().actionGet();
return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
}
return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
final SearchResponse response = searchRequest.execute().actionGet();
return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
}
private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
final String documentType = table.getName();
final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
if (firstRow > 1) {
final int zeroBasedFrom = firstRow - 1;
searchRequest.setFrom(zeroBasedFrom);
}
if (limitMaxRowsIsSet(maxRows)) {
searchRequest.setSize(maxRows);
} else {
searchRequest.setScroll(TIMEOUT_SCROLL);
}
if (queryBuilder != null) {
searchRequest.setQuery(queryBuilder);
}
return searchRequest;
}
@Override
protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
Object keyValue) {
if (keyValue == null) {
return null;
}
final String documentType = table.getName();
final String id = keyValue.toString();
final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
if (!response.isExists()) {
return null;
}
final Map<String, Object> source = response.getSource();
final String documentId = response.getId();
final DataSetHeader header = new SimpleDataSetHeader(selectItems);
return NativeElasticSearchUtils.createRow(source, documentId, header);
}
@Override
protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
if (!whereItems.isEmpty()) {
// not supported - will have to be done by counting client-side
return null;
}
final String documentType = table.getName();
final CountResponse response = elasticSearchClient.prepareCount(indexName)
.setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
return response.getCount();
}
private boolean limitMaxRowsIsSet(int maxRows) {
return (maxRows != -1);
}
@Override
public UpdateSummary executeUpdate(UpdateScript update) {
final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this);
update.run(callback);
callback.onExecuteUpdateFinished();
return callback.getUpdateSummary();
}
/**
* Gets the {@link Client} that this {@link DataContext} is wrapping.
*
* @return
*/
public Client getElasticSearchClient() {
return elasticSearchClient;
}
/**
* Gets the name of the index that this {@link DataContext} is working on.
*
* @return
*/
public String getIndexName() {
return indexName;
}
}