| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.metamodel.elasticsearch.nativeclient; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| import org.apache.metamodel.DataContext; |
| import org.apache.metamodel.UpdateScript; |
| import org.apache.metamodel.UpdateSummary; |
| import org.apache.metamodel.data.DataSet; |
| import org.apache.metamodel.data.DataSetHeader; |
| import org.apache.metamodel.data.Row; |
| import org.apache.metamodel.data.SimpleDataSetHeader; |
| import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext; |
| import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; |
| import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser; |
| import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; |
| import org.apache.metamodel.query.FilterItem; |
| import org.apache.metamodel.query.LogicalOperator; |
| import org.apache.metamodel.query.SelectItem; |
| import org.apache.metamodel.schema.Column; |
| import org.apache.metamodel.schema.Table; |
| import org.apache.metamodel.util.SimpleTableDef; |
| import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; |
| import org.elasticsearch.action.get.GetResponse; |
| import org.elasticsearch.action.search.SearchRequestBuilder; |
| import org.elasticsearch.action.search.SearchResponse; |
| import org.elasticsearch.client.Client; |
| import org.elasticsearch.cluster.ClusterState; |
| import org.elasticsearch.cluster.metadata.IndexMetaData; |
| import org.elasticsearch.cluster.metadata.MappingMetaData; |
| import org.elasticsearch.common.collect.ImmutableOpenMap; |
| import org.elasticsearch.index.query.QueryBuilder; |
| import org.elasticsearch.index.query.QueryBuilders; |
| import org.elasticsearch.index.query.TermQueryBuilder; |
| import org.elasticsearch.search.builder.SearchSourceBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.carrotsearch.hppc.ObjectLookupContainer; |
| import com.carrotsearch.hppc.cursors.ObjectCursor; |
| |
| /** |
| * DataContext implementation for ElasticSearch analytics engine. |
| * |
| * ElasticSearch has a data storage structure hierarchy that briefly goes like |
| * this: |
| * <ul> |
| * <li>Index</li> |
| * <li>Document type (short: Type) (within an index)</li> |
| * <li>Documents (of a particular type)</li> |
| * </ul> |
| * |
| * When instantiating this DataContext, an index name is provided. Within this |
| * index, each document type is represented as a table. |
| * |
| * This implementation supports either automatic discovery of a schema or manual |
| * specification of a schema, through the {@link SimpleTableDef} class. |
| */ |
| public class ElasticSearchDataContext extends AbstractElasticSearchDataContext { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class); |
| |
| private final Client elasticSearchClient; |
| |
| /** |
| * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a |
| * custom array of {@link SimpleTableDef}s which allows the user to define |
| * his own view on the indexes in the engine. |
| * |
| * @param client |
| * the ElasticSearch client |
| * @param indexName |
| * the name of the ElasticSearch index to represent |
| * @param tableDefinitions |
| * an array of {@link SimpleTableDef}s, which define the table |
| * and column model of the ElasticSearch index. |
| */ |
| public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) { |
| super(indexName, tableDefinitions); |
| |
| if (client == null) { |
| throw new IllegalArgumentException("ElasticSearch Client cannot be null"); |
| } |
| this.elasticSearchClient = client; |
| this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); |
| } |
| |
| /** |
| * Constructs a {@link ElasticSearchDataContext} and automatically detects |
| * the schema structure/view on all indexes (see |
| * {@link #detectTable(ClusterState, String, String)}). |
| * |
| * @param client |
| * the ElasticSearch client |
| * @param indexName |
| * the name of the ElasticSearch index to represent |
| */ |
| public ElasticSearchDataContext(Client client, String indexName) { |
| this(client, indexName, new SimpleTableDef[0]); |
| } |
| |
| @Override |
| protected SimpleTableDef[] detectSchema() { |
| logger.info("Detecting schema for index '{}'", indexName); |
| |
| final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster() |
| .prepareState().setIndices(indexName); |
| final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState(); |
| |
| final List<SimpleTableDef> result = new ArrayList<>(); |
| |
| final IndexMetaData imd = cs.getMetaData().index(indexName); |
| if (imd == null) { |
| // index does not exist |
| logger.warn("No metadata returned for index name '{}' - no tables will be detected."); |
| } else { |
| final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); |
| final ObjectLookupContainer<String> documentTypes = mappings.keys(); |
| for (final ObjectCursor<?> documentTypeCursor : documentTypes) { |
| final String documentType = documentTypeCursor.value.toString(); |
| try { |
| final SimpleTableDef table = detectTable(cs, indexName, documentType); |
| result.add(table); |
| } catch (Exception e) { |
| logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); |
| } |
| } |
| } |
| return sortTables(result); |
| } |
| |
| /** |
| * Performs an analysis of an available index type in an ElasticSearch |
| * {@link Client} client and tries to detect the index structure based on |
| * the metadata provided by the java client. |
| * |
| * @param cs |
| * the ElasticSearch cluster |
| * @param indexName |
| * the name of the index |
| * @param documentType |
| * the name of the index type |
| * @return a table definition for ElasticSearch. |
| */ |
| public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception { |
| logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName); |
| final IndexMetaData imd = cs.getMetaData().index(indexName); |
| if (imd == null) { |
| // index does not exist |
| throw new IllegalArgumentException("No such index: " + indexName); |
| } |
| final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); |
| final MappingMetaData mappingMetaData = mappings.get(documentType); |
| if (mappingMetaData == null) { |
| throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType); |
| } |
| final Map<String, Object> mp = mappingMetaData.getSourceAsMap(); |
| final Object metadataProperties = mp.get("properties"); |
| if (metadataProperties != null && metadataProperties instanceof Map) { |
| @SuppressWarnings("unchecked") |
| final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties; |
| final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap); |
| final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(), |
| metaData.getColumnTypes()); |
| return std; |
| } |
| throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType |
| + "' in index: " + indexName); |
| } |
| |
| @Override |
| protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, |
| List<FilterItem> whereItems, int firstRow, int maxRows) { |
| final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, |
| LogicalOperator.AND); |
| if (queryBuilder != null) { |
| // where clause can be pushed down to an ElasticSearch query |
| final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder); |
| final SearchResponse response = searchRequest.execute().actionGet(); |
| return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems); |
| } |
| return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); |
| } |
| |
| @Override |
| protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { |
| final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null); |
| final SearchResponse response = searchRequest.execute().actionGet(); |
| return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new) |
| .collect(Collectors.toList())); |
| } |
| |
| private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) { |
| final String documentType = table.getName(); |
| final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType); |
| if (firstRow > 1) { |
| final int zeroBasedFrom = firstRow - 1; |
| searchRequest.setFrom(zeroBasedFrom); |
| } |
| if (limitMaxRowsIsSet(maxRows)) { |
| searchRequest.setSize(maxRows); |
| } else { |
| searchRequest.setScroll(TIMEOUT_SCROLL); |
| } |
| |
| if (queryBuilder != null) { |
| searchRequest.setQuery(queryBuilder); |
| } |
| |
| return searchRequest; |
| } |
| |
| @Override |
| protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn, |
| Object keyValue) { |
| if (keyValue == null) { |
| return null; |
| } |
| |
| final String documentType = table.getName(); |
| final String id = keyValue.toString(); |
| |
| final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet(); |
| |
| if (!response.isExists()) { |
| return null; |
| } |
| |
| final Map<String, Object> source = response.getSource(); |
| final String documentId = response.getId(); |
| |
| final DataSetHeader header = new SimpleDataSetHeader(selectItems); |
| |
| return ElasticSearchUtils.createRow(source, documentId, header); |
| } |
| |
| @Override |
| protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) { |
| if (!whereItems.isEmpty()) { |
| // not supported - will have to be done by counting client-side |
| return null; |
| } |
| final String documentType = table.getName(); |
| final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType); |
| final SearchResponse searchResponse = |
| getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query)) |
| .execute().actionGet(); |
| return searchResponse.getHits().getTotalHits(); |
| } |
| |
| @Override |
| public UpdateSummary executeUpdate(UpdateScript update) { |
| final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this); |
| update.run(callback); |
| callback.onExecuteUpdateFinished(); |
| return callback.getUpdateSummary(); |
| } |
| |
| @Override |
| protected void onSchemaCacheRefreshed() { |
| getElasticSearchClient().admin().indices().prepareRefresh(indexName).get(); |
| |
| detectSchema(); |
| } |
| |
| /** |
| * Gets the {@link Client} that this {@link DataContext} is wrapping. |
| * |
| * @return |
| */ |
| public Client getElasticSearchClient() { |
| return elasticSearchClient; |
| } |
| } |