blob: a82de7f773578575ec254e996e1e123d2ca1c9b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.elasticsearch.store;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
import org.apache.gora.elasticsearch.mapping.ElasticsearchMappingBuilder;
import org.apache.gora.elasticsearch.mapping.Field;
import org.apache.gora.elasticsearch.query.ElasticsearchQuery;
import org.apache.gora.elasticsearch.query.ElasticsearchResult;
import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.ClassLoadingUtils;
import org.apache.gora.util.GoraException;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
*
* @param <K> class to be used for the key
* @param <T> class to be persisted within the store
*/
public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
private static final String XML_MAPPING_DEFINITION = "gora.mapping";
public static final String XSD_VALIDATION = "gora.xsd_validation";
/**
* Elasticsearch client
*/
private RestHighLevelClient client;
/**
* Mapping definition for Elasticsearch
*/
private ElasticsearchMapping elasticsearchMapping;
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
try {
LOG.debug("Initializing Elasticsearch store");
ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
super.initialize(keyClass, persistentClass, properties);
ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
InputStream mappingStream;
if (properties.containsKey(XML_MAPPING_DEFINITION)) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
}
mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
} else {
mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
}
String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
elasticsearchMapping = builder.getElasticsearchMapping();
client = createClient(parameters);
LOG.info("Elasticsearch store was successfully initialized.");
} catch (Exception ex) {
LOG.error("Error while initializing Elasticsearch store", ex);
throw new GoraException(ex);
}
}
public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
// Choosing the authentication method.
switch (parameters.getAuthenticationType()) {
case BASIC:
if (parameters.getUsername() != null && parameters.getPassword() != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
clientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
}
break;
case TOKEN:
if (parameters.getAuthorizationToken() != null) {
Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
parameters.getAuthorizationToken())};
clientBuilder.setDefaultHeaders(defaultHeaders);
} else {
throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
}
break;
case APIKEY:
if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
String apiKeyAuth = Base64.getEncoder()
.encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
.getBytes(StandardCharsets.UTF_8));
Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
clientBuilder.setDefaultHeaders(defaultHeaders);
} else {
throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
}
break;
}
if (parameters.getConnectTimeout() != 0) {
clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
}
if (parameters.getSocketTimeout() != 0) {
clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
}
if (parameters.getIoThreadCount() != 0) {
clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setIoThreadCount(parameters.getIoThreadCount()).build()));
}
return new RestHighLevelClient(clientBuilder);
}
public ElasticsearchMapping getMapping() {
return elasticsearchMapping;
}
@Override
public String getSchemaName() {
return elasticsearchMapping.getIndexName();
}
@Override
public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
return super.getSchemaName(mappingSchemaName, persistentClass);
}
@Override
public void createSchema() throws GoraException {
CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
Map<String, Object> properties = new HashMap<>();
for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
Map<String, Object> fieldType = new HashMap<>();
fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
}
properties.put(entry.getKey(), fieldType);
}
// Special field for range query
properties.put("gora_id", new HashMap<String, Object>() {{
put("type", "keyword");
}});
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
request.mapping(mapping);
try {
if (!client.indices().exists(
new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
client.indices().create(request, RequestOptions.DEFAULT);
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public void deleteSchema() throws GoraException {
DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
try {
if (client.indices().exists(
new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
client.indices().delete(request, RequestOptions.DEFAULT);
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public boolean schemaExists() throws GoraException {
try {
return client.indices().exists(
new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public boolean exists(K key) throws GoraException {
GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
try {
return client.exists(getRequest, RequestOptions.DEFAULT);
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public T get(K key, String[] fields) throws GoraException {
String[] requestedFields = getFieldsToQuery(fields);
List<String> documentFields = new ArrayList<>();
for (String requestedField : requestedFields) {
documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
}
try {
// Prepare the Elasticsearch request
GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
if (getResponse.isExists()) {
Map<String, Object> sourceMap = getResponse.getSourceAsMap();
// Map of field's name and its value from the Document
Map<String, Object> fieldsAndValues = new HashMap<>();
for (String field : documentFields) {
fieldsAndValues.put(field, sourceMap.get(field));
}
// Build the corresponding persistent
return newInstance(fieldsAndValues, requestedFields);
} else {
return null;
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public void put(K key, T obj) throws GoraException {
if (obj.isDirty()) {
Schema schemaObj = obj.getSchema();
List<Schema.Field> fields = schemaObj.getFields();
Map<String, Object> jsonMap = new HashMap<>();
for (Schema.Field field : fields) {
Field mappedField = elasticsearchMapping.getFields().get(field.name());
if (mappedField != null) {
Object fieldValue = obj.get(field.pos());
if (fieldValue != null) {
Schema fieldSchema = field.schema();
Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
jsonMap.put(mappedField.getName(), serializedObj);
}
}
}
// Special field for range query
jsonMap.put("gora_id", key);
// Prepare the Elasticsearch request
IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
try {
client.index(request, RequestOptions.DEFAULT);
} catch (IOException ex) {
throw new GoraException(ex);
}
} else {
LOG.info("Ignored putting object {} in the store as it is neither "
+ "new, neither dirty.", new Object[]{obj});
}
}
@Override
public boolean delete(K key) throws GoraException {
DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
try {
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
try {
BulkByScrollResponse bulkResponse;
if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
QueryBuilder matchDocumentsWithinRange = QueryBuilders
.rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
updateRequest.setQuery(matchDocumentsWithinRange);
// Create a script for deleting fields
StringBuilder toDelete = new StringBuilder();
String[] fieldsToDelete = query.getFields();
for (String field : fieldsToDelete) {
String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField));
}
//toDelete.deleteCharAt(toDelete.length() - 1);
updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
return bulkResponse.getUpdated();
} else {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
QueryBuilder matchDocumentsWithinRange = QueryBuilders
.rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
deleteRequest.setQuery(matchDocumentsWithinRange);
bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
return bulkResponse.getDeleted();
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// Set the query result limit
int size = (int) query.getLimit();
if (size != -1) {
searchSourceBuilder.size(size);
}
try {
// Build the actual Elasticsearch range query
QueryBuilder rangeQueryBuilder = QueryBuilders
.rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
searchSourceBuilder.query(rangeQueryBuilder);
SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String[] avroFields = getFieldsToQuery(query.getFields());
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
List<K> hitId = new ArrayList<>();
// Check filter
Filter<K, T> queryFilter = query.getFilter();
List<T> filteredObjects = new ArrayList<>();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
filteredObjects.add(newInstance(sourceAsMap, avroFields));
hitId.add((K) hit.getId());
}
}
return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public Query<K, T> newQuery() {
ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
query.setFields(getFieldsToQuery(null));
return query;
}
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
List<PartitionQuery<K, T>> partitions = new ArrayList<>();
PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
query);
partitionQuery.setConf(getConf());
partitions.add(partitionQuery);
return partitions;
}
@Override
public void flush() throws GoraException {
try {
client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public void close() {
try {
client.close();
LOG.info("Elasticsearch datastore destroyed successfully.");
} catch (IOException ex) {
LOG.error(ex.getMessage(), ex);
}
}
/**
* Build a new instance of the persisted class from the Document retrieved from the database.
*
* @param fieldsAndValues Map of field's name and its value from the Document
* that results from the query to the database
* @param requestedFields the list of fields to be mapped to the persistence class instance
* @return a persistence class instance which content was deserialized from the Document
* @throws IOException
*/
public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
// Create new empty persistent bean instance
T persistent = newPersistent();
requestedFields = getFieldsToQuery(requestedFields);
// Populate each field
for (String objField : requestedFields) {
Schema.Field field = fieldMap.get(objField);
Schema fieldSchema = field.schema();
String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
Object fieldValue = fieldsAndValues.get(docFieldName);
Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
persistent.put(field.pos(), result);
}
persistent.clearDirty();
return persistent;
}
/**
* Deserialize an Elasticsearch object to a persistent Avro object.
*
* @param avroField persistent Avro class field to which the value will be deserialized
* @param avroFieldSchema schema for the persistent Avro class field
* @param elasticsearchValue Elasticsearch field value to be deserialized
* @return deserialized Avro object from the Elasticsearch object
* @throws GoraException when the given Elasticsearch value cannot be deserialized
*/
private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
Object elasticsearchValue) throws GoraException {
Object fieldValue;
switch (avroFieldSchema.getType()) {
case MAP:
fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
break;
case RECORD:
fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
break;
case ARRAY:
fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
break;
case BOOLEAN:
fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
break;
case BYTES:
fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
break;
case FIXED:
case NULL:
fieldValue = null;
break;
case UNION:
fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
break;
case DOUBLE:
fieldValue = Double.parseDouble(elasticsearchValue.toString());
break;
case ENUM:
fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
break;
case FLOAT:
fieldValue = Float.parseFloat(elasticsearchValue.toString());
break;
case INT:
fieldValue = Integer.parseInt(elasticsearchValue.toString());
break;
case LONG:
fieldValue = Long.parseLong(elasticsearchValue.toString());
break;
case STRING:
fieldValue = new Utf8(elasticsearchValue.toString());
break;
default:
fieldValue = elasticsearchValue;
}
return fieldValue;
}
/**
* Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
* that can safely be written into Avro persistent object.
*
* @param avroField persistent Avro class field to which the value will be deserialized
* @param avroFieldSchema schema for the persistent Avro class field
* @param elasticsearchValue Elasticsearch field value to be deserialized
* @return deserialized Avro List from the given Elasticsearch value
* @throws GoraException when one of the underlying values cannot be deserialized
*/
private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
Object elasticsearchValue) throws GoraException {
List<Object> list = new ArrayList<>();
if (elasticsearchValue != null) {
for (Object item : (List<Object>) elasticsearchValue) {
Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
list.add(result);
}
}
return new DirtyListWrapper<>(list);
}
/**
* Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
* that can safely be written into Avro persistent object.
*
* @param avroField persistent Avro class field to which the value will be deserialized
* @param avroFieldSchema schema for the persistent Avro class field
* @param elasticsearchMap Elasticsearch Map value to be deserialized
* @return deserialized Avro Map from the given Elasticsearch Map value
* @throws GoraException when one of the underlying values cannot be deserialized
*/
private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
Map<String, Object> elasticsearchMap) throws GoraException {
Map<Utf8, Object> deserializedMap = new HashMap<>();
if (elasticsearchMap != null) {
for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
String mapKey = entry.getKey();
Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
deserializedMap.put(new Utf8(mapKey), mapValue);
}
}
return new DirtyMapWrapper<>(deserializedMap);
}
/**
* Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
* that can safely be written into Avro persistent object.
*
* @param avroFieldSchema schema for the persistent Avro class field
* @param elasticsearchRecord Elasticsearch Record value to be deserialized
* @return deserialized Avro Object from the given Elasticsearch Record value
* @throws GoraException when one of the underlying values cannot be deserialized
*/
private Object fromElasticsearchRecord(Schema avroFieldSchema,
Map<String, Object> elasticsearchRecord) throws GoraException {
Class<?> clazz;
try {
clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
} catch (ClassNotFoundException ex) {
throw new GoraException(ex);
}
PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
for (Schema.Field recField : avroFieldSchema.getFields()) {
Schema innerSchema = recField.schema();
Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
}
return record;
}
/**
* Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
* that can safely be written into Avro persistent object.
*
* @param avroField persistent Avro class field to which the value will be deserialized
* @param avroFieldSchema schema for the persistent Avro class field
* @param elasticsearchUnion Elasticsearch Union value to be deserialized
* @return deserialized Avro Object from the given Elasticsearch Union value
* @throws GoraException when one of the underlying values cannot be deserialized
*/
private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
Object deserializedUnion;
Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
if (avroFieldSchema.getTypes().size() == 2 &&
(type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
!type0.equals(type1)) {
int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
} else if (avroFieldSchema.getTypes().size() == 3) {
Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
(type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
if (elasticsearchUnion == null) {
deserializedUnion = null;
} else if (elasticsearchUnion instanceof String) {
throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
} else {
int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
}
} else {
throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
}
} else {
throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
}
return deserializedUnion;
}
/**
* Serialize a persistent Avro object as used in Gora generated classes to
* an object that can be written into Elasticsearch.
*
* @param avroFieldSchema schema for the persistent Avro class field
* @param avroFieldValue persistent Avro field value to be serialized
* @return serialized field value
* @throws GoraException when the given Avro object cannot be serialized
*/
private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
Object output = avroFieldValue;
switch (avroFieldSchema.getType()) {
case ARRAY:
output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
break;
case MAP:
output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
break;
case RECORD:
output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
break;
case BYTES:
output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
break;
case UNION:
output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
break;
case BOOLEAN:
case DOUBLE:
case ENUM:
case FLOAT:
case INT:
case LONG:
case STRING:
output = avroFieldValue.toString();
break;
case FIXED:
break;
case NULL:
output = null;
break;
}
return output;
}
/**
* Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
* List that can safely be written into Elasticsearch.
*
* @param collection the collection to be serialized
* @param avroFieldSchema field schema for the underlying type
* @return a List version of the collection that can be safely written into Elasticsearch
* @throws GoraException when one of the underlying values cannot be serialized
*/
private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
List<Object> list = new ArrayList<>();
for (Object item : collection) {
Object result = serializeFieldValue(avroFieldSchema, item);
list.add(result);
}
return list;
}
/**
* Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
* map that can safely be written into Elasticsearch.
*
* @param map the map to be serialized
* @param avroFieldSchema field schema for the underlying type
* @return a Map version of the Java map that can be safely written into Elasticsearch
* @throws GoraException when one of the underlying values cannot be serialized
*/
private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
Map<CharSequence, Object> serializedMap = new HashMap<>();
for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
String mapKey = entry.getKey().toString();
Object mapValue = entry.getValue();
Object result = serializeFieldValue(avroFieldSchema, mapValue);
serializedMap.put(mapKey, result);
}
return serializedMap;
}
/**
* Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
* record that can safely be written into Elasticsearch.
*
* @param record the object to be serialized
* @param avroFieldSchema field schema for the underlying type
* @return a record version of the Java object that can be safely written into Elasticsearch
* @throws GoraException when one of the underlying values cannot be serialized
*/
private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
Map<CharSequence, Object> serializedRecord = new HashMap<>();
for (Schema.Field member : avroFieldSchema.getFields()) {
Object innerValue = ((PersistentBase) record).get(member.pos());
serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
}
return serializedRecord;
}
/**
* Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
* object that can safely be written into Elasticsearch.
*
* @param union the object to be serialized
* @param avroFieldSchema field schema for the underlying type
* @return a object version of the Java object that can be safely written into Elasticsearch
* @throws GoraException when one of the underlying values cannot be serialized
*/
private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
Object serializedUnion;
Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
if (avroFieldSchema.getTypes().size() == 2 &&
(type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
!type0.equals(type1)) {
int schemaPos = getUnionSchema(union, avroFieldSchema);
Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
serializedUnion = serializeFieldValue(unionSchema, union);
} else if (avroFieldSchema.getTypes().size() == 3) {
Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
(type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
if (union == null) {
serializedUnion = null;
} else if (union instanceof String) {
throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
} else {
int schemaPos = getUnionSchema(union, avroFieldSchema);
Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
serializedUnion = recordToElasticsearch(union, unionSchema);
}
} else {
throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
}
} else {
throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
}
return serializedUnion;
}
/**
* Method to retrieve the corresponding schema type index of a particular
* object having UNION schema. As UNION type can have one or more types and at
* a given instance, it holds an object of only one type of the defined types,
* this method is used to figure out the corresponding instance's schema type
* index.
*
* @param instanceValue value that the object holds
* @param unionSchema union schema containing all of the data types
* @return the unionSchemaPosition corresponding schema position
*/
private int getUnionSchema(Object instanceValue, Schema unionSchema) {
int unionSchemaPos = 0;
for (Schema currentSchema : unionSchema.getTypes()) {
Schema.Type schemaType = currentSchema.getType();
if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
return unionSchemaPos;
}
if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
return unionSchemaPos;
}
if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
return unionSchemaPos;
}
if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
return unionSchemaPos;
}
if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
return unionSchemaPos;
}
if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
return unionSchemaPos;
}
if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
return unionSchemaPos;
}
if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
return unionSchemaPos;
}
if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
return unionSchemaPos;
}
if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
return unionSchemaPos;
}
if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
return unionSchemaPos;
}
if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
return unionSchemaPos;
}
unionSchemaPos++;
}
return 0;
}
}