blob: 9a50704bf0a3359fb950640b22bf654f331308b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.rethinkdb.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.gen.ast.ReqlExpr;
import com.rethinkdb.model.MapObject;
import com.rethinkdb.net.Connection;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.gora.rethinkdb.query.RethinkDBQuery;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.rethinkdb.query.RethinkDBResult;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.ClassLoadingUtils;
import org.apache.gora.util.GoraException;
/**
* {@inheritDoc}
* {@link RethinkDBStore} is the primary class
* responsible for facilitating GORA CRUD operations on RethinkDB documents.
*/
public class RethinkDBStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
public static final String DEFAULT_MAPPING_FILE = "/gora-rethinkdb-mapping.xml";
private RethinkDBStoreParameters rethinkDBStoreParameters;
private RethinkDBMapping rethinkDBMapping;
public static final RethinkDB r = RethinkDB.r;
public Connection connection;
/**
* {@inheritDoc}
* Initialize the RethinkDB dataStore by {@link Properties} parameters.
*
* @param keyClass key class type for dataStore.
* @param persistentClass persistent class type for dataStore.
* @param properties RethinkDB dataStore properties EG:- RethinkDB client credentials.
*/
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
super.initialize(keyClass, persistentClass, properties);
try {
rethinkDBStoreParameters = RethinkDBStoreParameters.load(properties);
connection = r.connection()
.hostname(rethinkDBStoreParameters.getServerHost())
.port(Integer.valueOf(rethinkDBStoreParameters.getServerPort()))
.user(rethinkDBStoreParameters.getUserName(), rethinkDBStoreParameters.getUserPassword())
.connect();
String databaseIdentifier = rethinkDBStoreParameters.getDatabaseName();
if (!r.dbList()
.run(connection, ArrayList.class)
.first()
.stream()
.anyMatch(db -> db.equals(databaseIdentifier))) {
r.dbCreate(rethinkDBStoreParameters.getDatabaseName()).run(connection);
}
RethinkDBMappingBuilder<K, T> builder = new RethinkDBMappingBuilder<>(this);
rethinkDBMapping = builder.fromFile(rethinkDBStoreParameters.getMappingFile()).build();
if (!schemaExists()) {
createSchema();
}
} catch (Exception e) {
LOG.error("Error while initializing RethinkDB dataStore: {}",
new Object[]{e.getMessage()});
throw new RuntimeException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public String getSchemaName(final String mappingSchemaName,
final Class<?> persistentClass) {
return super.getSchemaName(mappingSchemaName, persistentClass);
}
/**
* {@inheritDoc}
*/
@Override
public String getSchemaName() {
return rethinkDBMapping.getDocumentClass();
}
/**
* {@inheritDoc}
* Create a new class of RethinkDB documents if necessary. Enforce specified schema over the document class.
*/
@Override
public void createSchema() throws GoraException {
if (schemaExists()) {
return;
}
try {
r.db(rethinkDBStoreParameters.getDatabaseName())
.tableCreate(rethinkDBMapping.getDocumentClass())
.run(connection);
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
* Deletes enforced schema over RethinkDB Document class.
*/
@Override
public void deleteSchema() throws GoraException {
try {
r.db(rethinkDBStoreParameters.getDatabaseName())
.tableDrop(rethinkDBMapping.getDocumentClass())
.run(connection);
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
* Check whether there exist a schema enforced over RethinkDB document class.
*/
@Override
public boolean schemaExists() throws GoraException {
try {
String collectionIdentifier = rethinkDBMapping.getDocumentClass();
return r.db(rethinkDBStoreParameters.getDatabaseName())
.tableList()
.run(connection, ArrayList.class).first()
.stream()
.anyMatch(db -> db.equals(collectionIdentifier));
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public T get(K key, String[] fields) throws GoraException {
try {
boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.getAll(key)
.count()
.run(connection, Boolean.class)
.first();
if (isExists) {
String[] dbFields = getFieldsToQuery(fields);
MapObject<String, Object> document = r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.get(key)
.run(connection, MapObject.class)
.first();
return convertRethinkDBDocToAvroBean(document, dbFields);
} else {
return null;
}
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void put(K key, T val) throws GoraException {
if (val.isDirty()) {
try {
boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.getAll(key)
.count()
.run(connection, Boolean.class)
.first();
if (!isExists) {
MapObject<String, Object> document = convertAvroBeanToRethinkDBDocument(key, val);
r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.insert(document)
.run(connection);
} else {
MapObject<String, Object> document = convertAvroBeanToRethinkDBDocument(key, val);
r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.get(key)
.replace(document)
.run(connection);
}
} catch (Exception e) {
throw new GoraException(e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.info("Ignored putting persistent bean {} in the store as it is neither "
+ "new, neither dirty.", new Object[]{val});
}
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean delete(K key) throws GoraException {
try {
r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.get(key)
.delete()
.run(connection);
return true;
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
if (query.getFields() == null || (query.getFields().length == getFields().length)) {
String[] fields = getFieldsToQuery(query.getFields());
RethinkDBQuery dataStoreQuery = ((RethinkDBQuery) query);
dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
MapObject<String, Object> document = reqlExpr.delete().run(connection, MapObject.class).first();
int deleteCount = Integer.valueOf(document.get("deleted").toString());
if (deleteCount > 0) {
return deleteCount;
} else {
return 0;
}
} else {
RethinkDBQuery<K, T> dataStoreQuery = new RethinkDBQuery<>(this);
dataStoreQuery.setStartKey(query.getStartKey());
dataStoreQuery.setEndKey(query.getEndKey());
dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters,
getFieldsToQuery(null), getFields());
ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
String[] projection = new String[query.getFields().length];
int counter = 0;
for (String k : query.getFields()) {
String dbFieldName = rethinkDBMapping.getDocumentField(k);
if (dbFieldName != null && dbFieldName.length() > 0) {
projection[counter] = dbFieldName;
counter++;
}
}
MapObject<String, Object> document = reqlExpr.replace(row -> row.without(projection))
.run(connection, MapObject.class).first();
int replacedCount = Integer.valueOf(document.get("replaced").toString());
if (replacedCount > 0) {
return replacedCount;
} else {
return 0;
}
}
}
/**
* {@inheritDoc}
*/
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
String[] fields = getFieldsToQuery(query.getFields());
RethinkDBQuery dataStoreQuery;
if (query instanceof RethinkDBQuery) {
dataStoreQuery = ((RethinkDBQuery) query);
} else {
dataStoreQuery = (RethinkDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery();
}
dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
try {
ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
com.rethinkdb.net.Result<MapObject> result = reqlExpr.run(connection, MapObject.class);
return new RethinkDBResult<>(this, query, result);
} catch (Exception e) {
throw new GoraException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public Query<K, T> newQuery() {
return new RethinkDBQuery<>(this);
}
/**
* {@inheritDoc}
*/
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
List<PartitionQuery<K, T>> partitions = new ArrayList<>();
PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
query);
partitionQuery.setConf(this.getConf());
partitions.add(partitionQuery);
return partitions;
}
/**
* {@inheritDoc}
* Flushes locally cached to content in memory to remote RethinkDB server.
*/
@Override
public void flush() throws GoraException {
}
/**
* {@inheritDoc}
* Releases resources which have been used dataStore.
*/
@Override
public void close() {
try {
flush();
} catch (Exception ex) {
LOG.error("Error occurred while flushing data to RethinkDB : ", ex);
}
}
@Override
public boolean exists(K key) throws GoraException {
boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
.getAll(key)
.count()
.run(connection, Boolean.class)
.first();
return isExists;
}
private MapObject<String, Object> convertAvroBeanToRethinkDBDocument(final K key, final T persistent) {
MapObject<String, Object> result = new MapObject();
for (Schema.Field f : persistent.getSchema().getFields()) {
if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) {
String docf = rethinkDBMapping.getDocumentField(f.name());
Object value = persistent.get(f.pos());
RethinkDBMapping.DocumentFieldType storeType = rethinkDBMapping.getDocumentFieldType(docf);
LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}",
new Object[]{docf, f.schema().getType(), storeType});
Object o = convertAvroFieldToRethinkDBField(docf, f.schema(), f.schema().getType(),
storeType, value);
result.put(docf, o);
}
}
result.put("id", key.toString());
return result;
}
private Object convertAvroFieldToRethinkDBField(final String docf, final Schema fieldSchema,
final Schema.Type fieldType,
final RethinkDBMapping.DocumentFieldType storeType,
final Object value) {
Object result = null;
switch (fieldType) {
case MAP:
if (storeType != null && !(storeType == RethinkDBMapping.DocumentFieldType.MAP ||
storeType == RethinkDBMapping.DocumentFieldType.DOCUMENT)) {
throw new IllegalStateException(
"Field " + fieldSchema.getName()
+ ": to store a AVRO 'map', target RethinkDB mapping have to be of type 'Map'" +
"| 'Document'");
}
Schema valueSchema = fieldSchema.getValueType();
result = convertAvroMapToDocField(docf, (Map<CharSequence, ?>) value, valueSchema,
valueSchema.getType(), storeType);
break;
case ARRAY:
if (storeType != null && !(storeType == RethinkDBMapping.DocumentFieldType.LIST)) {
throw new IllegalStateException("Field " + fieldSchema.getName()
+ ": To store a AVRO 'array', target RethinkDB mapping have to be of type 'List'");
}
Schema elementSchema = fieldSchema.getElementType();
result = convertAvroListToDocField(docf, (List<?>) value, elementSchema,
elementSchema.getType(), storeType);
break;
case BYTES:
case FIXED:
if (value != null) {
result = Base64.getEncoder()
.encodeToString(((ByteBuffer) value).array());
}
break;
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
result = value;
break;
case STRING:
if (value != null) {
result = value.toString();
}
break;
case ENUM:
if (value != null)
result = value.toString();
break;
case RECORD:
if (value == null)
break;
result = convertAvroBeanToRethinkDBDocField(docf, fieldSchema, value);
break;
case UNION:
result = convertAvroUnionToRethinkDBField(docf, fieldSchema, storeType, value);
break;
default:
LOG.error("Unknown field type: {}", fieldSchema.getType());
break;
}
return result;
}
private Object convertAvroMapToDocField(final String docf,
final Map<CharSequence, ?> value, final Schema fieldSchema,
final Schema.Type fieldType,
final RethinkDBMapping.DocumentFieldType storeType) {
if (storeType == RethinkDBMapping.DocumentFieldType.MAP) {
HashMap map = new HashMap<String, Object>();
if (value == null)
return map;
for (Map.Entry<CharSequence, ?> e : value.entrySet()) {
String mapKey = e.getKey().toString();
Object mapValue = e.getValue();
RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType,
mapValue);
map.put(mapKey, result);
}
return map;
} else {
MapObject<String, Object> doc = new MapObject<String, Object>();
if (value == null)
return doc;
for (Map.Entry<CharSequence, ?> e : value.entrySet()) {
String mapKey = e.getKey().toString();
Object mapValue = e.getValue();
RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType,
mapValue);
doc.put(mapKey, result);
}
return doc;
}
}
private Object convertAvroListToDocField(final String docf, final Collection<?> array,
final Schema fieldSchema, final Schema.Type fieldType,
final RethinkDBMapping.DocumentFieldType storeType) {
if (storeType == RethinkDBMapping.DocumentFieldType.LIST) {
ArrayList list;
list = new ArrayList<Object>();
if (array == null)
return list;
for (Object item : array) {
RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType, item);
list.add(result);
}
return list;
}
return null;
}
private MapObject<String, Object> convertAvroBeanToRethinkDBDocField(final String docf,
final Schema fieldSchema,
final Object value) {
MapObject<String, Object> record = new MapObject();
for (Schema.Field member : fieldSchema.getFields()) {
Object innerValue = ((PersistentBase) value).get(member.pos());
String innerDoc = rethinkDBMapping.getDocumentField(member.name());
Schema.Type innerType = member.schema().getType();
RethinkDBMapping.DocumentFieldType innerStoreType = rethinkDBMapping.getDocumentFieldType(innerDoc);
LOG.debug("Transform value to BaseDocument , docField:{}, schemaType:{}, storeType:{}",
new Object[]{member.name(), member.schema().getType(),
innerStoreType});
Object fieldValue = convertAvroFieldToRethinkDBField(docf, member.schema()
, innerType, innerStoreType, innerValue);
record.put(member.name(), fieldValue);
}
return record;
}
private Object convertAvroUnionToRethinkDBField(final String docf, final Schema fieldSchema,
final RethinkDBMapping.DocumentFieldType storeType,
final Object value) {
Object result;
Schema.Type type0 = fieldSchema.getTypes().get(0).getType();
Schema.Type type1 = fieldSchema.getTypes().get(1).getType();
if (!type0.equals(type1)
&& (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
Schema innerSchema = null;
if (type0.equals(Schema.Type.NULL)) {
innerSchema = fieldSchema.getTypes().get(1);
} else {
innerSchema = fieldSchema.getTypes().get(0);
}
LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}",
new Object[]{innerSchema.getType(), type1, storeType});
result = convertAvroFieldToRethinkDBField(docf, innerSchema, innerSchema.getType(), storeType, value);
} else {
throw new IllegalStateException("RethinkDBStore only supports Union of two types field.");
}
return result;
}
public T convertRethinkDBDocToAvroBean(final MapObject<String, Object> obj, final String[] fields) throws GoraException {
T persistent = newPersistent();
String[] dbFields = getFieldsToQuery(fields);
for (String f : dbFields) {
String docf = rethinkDBMapping.getDocumentField(f);
if (docf == null || !obj.containsKey(docf))
continue;
RethinkDBMapping.DocumentFieldType storeType = rethinkDBMapping.getDocumentFieldType(docf);
Schema.Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}",
new Object[]{field.name(), fieldSchema.getType(), docf, storeType});
Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj);
persistent.put(field.pos(), result);
}
persistent.clearDirty();
return persistent;
}
private Object convertDocFieldToAvroField(final Schema fieldSchema,
final RethinkDBMapping.DocumentFieldType storeType,
final Schema.Field field,
final String docf,
final MapObject<String, Object> obj) throws GoraException {
Object result = null;
switch (fieldSchema.getType()) {
case MAP:
result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType);
break;
case ARRAY:
result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType);
break;
case RECORD:
MapObject<String, Object> record = (MapObject<String, Object>)
decorateMapToODoc((Map<String, Object>) obj.get(docf));
if (record == null) {
result = null;
break;
}
result = convertAvroBeanToRethinkDBDoc(fieldSchema, record);
break;
case BOOLEAN:
result = Boolean.valueOf(obj.get(docf).toString());
break;
case DOUBLE:
result = Double.valueOf(obj.get(docf).toString());
break;
case FLOAT:
result = Float.valueOf(obj.get(docf).toString());
break;
case INT:
result = Integer.valueOf(obj.get(docf).toString());
break;
case LONG:
result = Long.valueOf(obj.get(docf).toString());
break;
case STRING:
result = new Utf8(obj.get(docf).toString());
;
break;
case ENUM:
result = AvroUtils.getEnumValue(fieldSchema, obj.get(docf).toString());
break;
case BYTES:
case FIXED:
if (!obj.containsKey(docf)) {
result = null;
break;
}
result = ByteBuffer.wrap(Base64
.getDecoder()
.decode(obj.get(docf).toString()));
break;
case NULL:
result = null;
break;
case UNION:
result = convertDocFieldToAvroUnion(fieldSchema, storeType, field, docf, obj);
break;
default:
LOG.warn("Unable to read {}", docf);
break;
}
return result;
}
private Object convertDocFieldToAvroUnion(final Schema fieldSchema,
final RethinkDBMapping.DocumentFieldType storeType,
final Schema.Field field,
final String docf,
final MapObject<String, Object> doc) throws GoraException {
Object result;
Schema.Type type0 = fieldSchema.getTypes().get(0).getType();
Schema.Type type1 = fieldSchema.getTypes().get(1).getType();
if (!type0.equals(type1)
&& (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
Schema innerSchema = null;
if (type0.equals(Schema.Type.NULL)) {
innerSchema = fieldSchema.getTypes().get(1);
} else {
innerSchema = fieldSchema.getTypes().get(0);
}
LOG.debug("Load from ODocument (UNION), schemaType:{}, docField:{}, storeType:{}",
new Object[]{innerSchema.getType(), docf, storeType});
result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc);
} else {
throw new GoraException("RethinkDBStore only supports Union of two types field.");
}
return result;
}
private Object convertAvroBeanToRethinkDBDoc(final Schema fieldSchema,
final MapObject<String, Object> doc) throws GoraException {
Object result;
Class<?> clazz = null;
try {
clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName());
} catch (Exception e) {
throw new GoraException(e);
}
PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
for (Schema.Field recField : fieldSchema.getFields()) {
Schema innerSchema = recField.schema();
RethinkDBMapping.DocumentFieldType innerStoreType = rethinkDBMapping
.getDocumentFieldType(recField.name());
String innerDocField = rethinkDBMapping.getDocumentField(recField.name()) != null ? rethinkDBMapping
.getDocumentField(recField.name()) : recField.name();
LOG.debug("Load from ODocument (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}",
new Object[]{recField.name(), innerSchema.getType(), innerDocField,
innerStoreType});
record.put(recField.pos(),
convertDocFieldToAvroField(innerSchema, innerStoreType, recField, innerDocField,
doc));
}
result = record;
return result;
}
private Object convertDocFieldToAvroList(final String docf,
final Schema fieldSchema,
final MapObject<String, Object> doc,
final Schema.Field f,
final RethinkDBMapping.DocumentFieldType storeType) throws GoraException {
if (storeType == RethinkDBMapping.DocumentFieldType.LIST
|| storeType == null) {
List<Object> list = (List<Object>) doc.get(docf);
List<Object> rlist = new ArrayList<>();
if (list == null) {
return new DirtyListWrapper(rlist);
}
for (Object item : list) {
MapObject<String, Object> innerDoc = new MapObject();
innerDoc.put("item", item);
Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f,
"item", innerDoc);
rlist.add(o);
}
return new DirtyListWrapper<>(rlist);
}
return null;
}
private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema,
final MapObject<String, Object> doc, final Schema.Field f,
final RethinkDBMapping.DocumentFieldType storeType) throws GoraException {
if (storeType == RethinkDBMapping.DocumentFieldType.MAP) {
Map<String, Object> map = (Map<String, Object>) doc.get(docf);
Map<Utf8, Object> rmap = new HashMap<>();
if (map == null) {
return new DirtyMapWrapper(rmap);
}
for (Map.Entry entry : map.entrySet()) {
String mapKey = entry.getKey().toString();
Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey,
decorateMapToODoc(map));
rmap.put(new Utf8(mapKey), o);
}
return new DirtyMapWrapper<>(rmap);
} else {
MapObject<String, Object> innerDoc = (MapObject<String, Object>)
decorateMapToODoc((Map<String, Object>) doc.get(docf));
Map<Utf8, Object> rmap = new HashMap<>();
if (innerDoc == null) {
return new DirtyMapWrapper(rmap);
}
for (String fieldName : innerDoc.keySet()) {
String mapKey = fieldName;
Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey,
innerDoc);
rmap.put(new Utf8(mapKey), o);
}
return new DirtyMapWrapper<>(rmap);
}
}
private MapObject<String, Object> decorateMapToODoc(Map<String, Object> map) {
if (Objects.isNull(map)) {
return null;
}
MapObject<String, Object> doc = new MapObject();
for (Map.Entry entry : map.entrySet()) {
doc.put(entry.getKey().toString(), entry.getValue());
}
return doc;
}
}