blob: 8e210b40b8d70e11f3564c1ac8c73987388c1409 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import org.apache.metron.indexing.dao.search.FieldType;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
/**
* Responsible for retrieving column-level metadata for Elasticsearch search indices.
*/
public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Map<String, FieldType> elasticsearchTypeMap;
static {
Map<String, FieldType> fieldTypeMap = new HashMap<>();
fieldTypeMap.put("string", FieldType.STRING);
fieldTypeMap.put("ip", FieldType.IP);
fieldTypeMap.put("integer", FieldType.INTEGER);
fieldTypeMap.put("long", FieldType.LONG);
fieldTypeMap.put("date", FieldType.DATE);
fieldTypeMap.put("float", FieldType.FLOAT);
fieldTypeMap.put("double", FieldType.DOUBLE);
fieldTypeMap.put("boolean", FieldType.BOOLEAN);
elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
}
/**
* An Elasticsearch administrative client.
*/
private transient AdminClient adminClient;
/**
* @param adminClient The Elasticsearch admin client.
*/
public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
this.adminClient = adminClient;
}
@SuppressWarnings("unchecked")
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
Map<String, FieldType> indexColumnMetadata = new HashMap<>();
Map<String, String> previousIndices = new HashMap<>();
Set<String> fieldBlackList = new HashSet<>();
String[] latestIndices = getLatestIndices(indices);
if (latestIndices.length > 0) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient
.indices()
.getMappings(new GetMappingsRequest().indices(latestIndices))
.actionGet()
.getMappings();
// for each index
for (Object key : mappings.keys().toArray()) {
String indexName = key.toString();
ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
// for each mapping in the index
Iterator<String> mappingIterator = mapping.keysIt();
while (mappingIterator.hasNext()) {
MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData
.getSourceAsMap().get("properties");
// for each field in the mapping
for (String field : map.keySet()) {
if (!fieldBlackList.contains(field)) {
FieldType type = toFieldType(map.get(field).get("type"));
if(!indexColumnMetadata.containsKey(field)) {
indexColumnMetadata.put(field, type);
// record the last index in which a field exists, to be able to print helpful error message on type mismatch
previousIndices.put(field, indexName);
} else {
FieldType previousType = indexColumnMetadata.get(field);
if (!type.equals(previousType)) {
String previousIndexName = previousIndices.get(field);
LOG.error(String.format(
"Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.",
indexName, field, type.getFieldType(),
previousIndexName, field, previousType.getFieldType(),
FieldType.OTHER.getFieldType()));
indexColumnMetadata.put(field, FieldType.OTHER);
// the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
fieldBlackList.add(field);
}
}
}
}
}
}
} else {
LOG.info(String.format("Unable to find any latest indices; indices=%s", indices));
}
return indexColumnMetadata;
}
/**
* Retrieves the latest indices.
* @param includeIndices
* @return
*/
@Override
public String[] getLatestIndices(List<String> includeIndices) {
LOG.debug("Getting latest indices; indices={}", includeIndices);
Map<String, String> latestIndices = new HashMap<>();
String[] indices = adminClient
.indices()
.prepareGetIndex()
.setFeatures()
.get()
.getIndices();
for (String index : indices) {
int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
if (prefixEnd != -1) {
String prefix = index.substring(0, prefixEnd);
if (includeIndices.contains(prefix)) {
String latestIndex = latestIndices.get(prefix);
if (latestIndex == null || index.compareTo(latestIndex) > 0) {
latestIndices.put(prefix, index);
}
}
}
}
return latestIndices.values().toArray(new String[latestIndices.size()]);
}
/**
* Converts a string type to the corresponding FieldType.
* @param type The type to convert.
* @return The corresponding FieldType or FieldType.OTHER, if no match.
*/
private FieldType toFieldType(String type) {
return elasticsearchTypeMap.getOrDefault(type, FieldType.OTHER);
}
}