| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gora.orientdb.store; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.List; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.Calendar; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.TimeZone; |
| import java.util.Locale; |
| |
| import com.github.raymanrt.orientqb.query.Parameter; |
| import com.gitub.raymanrt.orientqb.delete.Delete; |
| import com.orientechnologies.orient.core.config.OGlobalConfiguration; |
| import com.orientechnologies.orient.core.db.ODatabasePool; |
| import com.orientechnologies.orient.core.db.ODatabaseSession; |
| import com.orientechnologies.orient.core.db.ODatabaseType; |
| import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; |
| import com.orientechnologies.orient.core.db.OrientDB; |
| import com.orientechnologies.orient.core.db.OrientDBConfig; |
| import com.orientechnologies.orient.core.db.OrientDBConfigBuilder; |
| import com.orientechnologies.orient.core.db.record.OTrackedList; |
| import com.orientechnologies.orient.core.db.record.OTrackedMap; |
| import com.orientechnologies.orient.core.db.record.OTrackedSet; |
| import com.orientechnologies.orient.core.metadata.schema.OClass; |
| import com.orientechnologies.orient.core.metadata.schema.OType; |
| import com.orientechnologies.orient.core.record.impl.ODocument; |
| import com.orientechnologies.orient.core.sql.OCommandSQL; |
| import com.orientechnologies.orient.core.sql.query.OConcurrentLegacyResultSet; |
| import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; |
| import org.apache.avro.Schema; |
| import org.apache.avro.util.Utf8; |
| import org.apache.gora.orientdb.query.OrientDBQuery; |
| import org.apache.gora.orientdb.query.OrientDBResult; |
| import org.apache.gora.persistency.impl.BeanFactoryImpl; |
| import org.apache.gora.persistency.impl.DirtyListWrapper; |
| import org.apache.gora.persistency.impl.DirtyMapWrapper; |
| import org.apache.gora.persistency.impl.PersistentBase; |
| import org.apache.gora.query.PartitionQuery; |
| import org.apache.gora.query.Query; |
| import org.apache.gora.query.Result; |
| import org.apache.gora.query.impl.PartitionQueryImpl; |
| import org.apache.gora.store.impl.DataStoreBase; |
| import org.apache.gora.util.AvroUtils; |
| import org.apache.gora.util.ClassLoadingUtils; |
| import org.apache.gora.util.GoraException; |
| |
| import javax.xml.bind.DatatypeConverter; |
| |
| import static com.github.raymanrt.orientqb.query.Projection.projection; |
| |
| /** |
| * {@inheritDoc} |
| * {@link org.apache.gora.orientdb.store.OrientDBStore} is the primary class |
| * responsible for facilitating GORA CRUD operations on OrientDB documents. |
| */ |
| public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { |
| |
| public static final String DEFAULT_MAPPING_FILE = "/gora-orientdb-mapping.xml"; |
| private String ROOT_URL; |
| private String ROOT_DATABASE_URL; |
| private OrientDBStoreParameters orientDbStoreParams; |
| private OrientDBMapping orientDBMapping; |
| private OrientDB remoteServerAdmin; |
| private ODatabasePool connectionPool; |
| private List<ODocument> docBatch = Collections.synchronizedList(new ArrayList<>()); |
| private ReentrantLock flushLock = new ReentrantLock(); |
| private int DEFAULT_DB_POOL_MIN_SIZE = 5; |
| private int DEFAULT_DB_POOL_MAX_SIZE = 10; |
| |
| /** |
| * {@inheritDoc} |
| * Initialize the OrientDB dataStore by {@link Properties} parameters. |
| * |
| * @param keyClass key class type for dataStore. |
| * @param persistentClass persistent class type for dataStore. |
| * @param properties OrientDB dataStore properties EG:- OrientDB client credentials. |
| */ |
| @Override |
| public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { |
| super.initialize(keyClass, persistentClass, properties); |
| try { |
| orientDbStoreParams = OrientDBStoreParameters.load(properties); |
| ROOT_URL = "remote:".concat(orientDbStoreParams.getServerHost()).concat(":") |
| .concat(orientDbStoreParams.getServerPort()); |
| ROOT_DATABASE_URL = ROOT_URL.concat("/").concat(orientDbStoreParams.getDatabaseName()); |
| remoteServerAdmin = new OrientDB(ROOT_URL, orientDbStoreParams.getUserName(), |
| orientDbStoreParams.getUserPassword(), OrientDBConfig.defaultConfig()); |
| if (!remoteServerAdmin.exists(orientDbStoreParams.getDatabaseName())) { |
| remoteServerAdmin.create(orientDbStoreParams.getDatabaseName(), |
| ODatabaseType.valueOf(orientDbStoreParams.getStorageType().toUpperCase())); |
| } |
| |
| if (orientDbStoreParams.getConnectionPoolMinSize() != null && |
| orientDbStoreParams.getConnectionPoolMaxSize() != null) { |
| OrientDBConfigBuilder poolCfg = OrientDBConfig.builder(); |
| poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, |
| orientDbStoreParams.getConnectionPoolMinSize()); |
| poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, |
| orientDbStoreParams.getConnectionPoolMaxSize()); |
| |
| connectionPool = new ODatabasePool(remoteServerAdmin, |
| orientDbStoreParams.getDatabaseName(), |
| orientDbStoreParams.getUserName(), |
| orientDbStoreParams.getUserPassword(), poolCfg.build()); |
| } else { |
| OrientDBConfigBuilder poolCfg = OrientDBConfig.builder(); |
| poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, |
| DEFAULT_DB_POOL_MIN_SIZE); |
| poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, |
| DEFAULT_DB_POOL_MAX_SIZE); |
| |
| connectionPool = new ODatabasePool(remoteServerAdmin, |
| orientDbStoreParams.getDatabaseName(), |
| orientDbStoreParams.getUserName(), |
| orientDbStoreParams.getUserPassword(), poolCfg.build()); |
| } |
| |
| OrientDBMappingBuilder<K, T> builder = new OrientDBMappingBuilder<>(this); |
| orientDBMapping = builder.fromFile(orientDbStoreParams.getMappingFile()).build(); |
| |
| if (!schemaExists()) { |
| createSchema(); |
| } |
| } catch (Exception e) { |
| LOG.error("Error while initializing OrientDB dataStore: {}", |
| new Object[]{e.getMessage()}); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public String getSchemaName(final String mappingSchemaName, |
| final Class<?> persistentClass) { |
| return super.getSchemaName(mappingSchemaName, persistentClass); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public String getSchemaName() { |
| return orientDBMapping.getDocumentClass(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. * |
| */ |
| @Override |
| public void createSchema() throws GoraException { |
| if (schemaExists()) { |
| return; |
| } |
| |
| try (ODatabaseSession schemaTx = connectionPool.acquire()) { |
| schemaTx.activateOnCurrentThread(); |
| |
| OClass documentClass = schemaTx.getMetadata().getSchema().createClass(orientDBMapping.getDocumentClass()); |
| documentClass.createProperty("_id", |
| OType.getTypeByClass(super.getKeyClass())).createIndex(OClass.INDEX_TYPE.UNIQUE); |
| for (String docField : orientDBMapping.getDocumentFields()) { |
| documentClass.createProperty(docField, |
| OType.valueOf(orientDBMapping.getDocumentFieldType(docField).name())); |
| } |
| schemaTx.getMetadata().getSchema().reload(); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Deletes enforced schema over OrientDB Document class. |
| */ |
| @Override |
| public void deleteSchema() throws GoraException { |
| if (!schemaExists()) { |
| return; |
| } |
| |
| try (ODatabaseSession schemaTx = connectionPool.acquire()) { |
| schemaTx.activateOnCurrentThread(); |
| schemaTx.getMetadata().getSchema().dropClass(orientDBMapping.getDocumentClass()); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Check whether there exist a schema enforced over OrientDB document class. |
| */ |
| @Override |
| public boolean schemaExists() throws GoraException { |
| try (ODatabaseSession schemaTx = connectionPool.acquire()) { |
| schemaTx.activateOnCurrentThread(); |
| return schemaTx.getMetadata().getSchema() |
| .existsClass(orientDBMapping.getDocumentClass()); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public T get(K key, String[] fields) throws GoraException { |
| String[] dbFields = getFieldsToQuery(fields); |
| com.github.raymanrt.orientqb.query.Query selectQuery = new com.github.raymanrt.orientqb.query.Query(); |
| for (String k : dbFields) { |
| String dbFieldName = orientDBMapping.getDocumentField(k); |
| if (dbFieldName != null && dbFieldName.length() > 0) { |
| selectQuery.select(dbFieldName); |
| } |
| } |
| selectQuery.from(orientDBMapping.getDocumentClass()) |
| .where(projection("_id").eq(Parameter.parameter("key"))); |
| Map<String, Object> params = new HashMap<String, Object>(); |
| params.put("key", key); |
| OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(selectQuery.toString()); |
| |
| try (ODatabaseSession selectTx = connectionPool.acquire()) { |
| selectTx.activateOnCurrentThread(); |
| List<ODocument> result = selectTx.command(query).execute(params); |
| if (result.size() == 1) { |
| return convertOrientDocToAvroBean(result.get(0), dbFields); |
| } else { |
| return null; |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void put(K key, T val) throws GoraException { |
| if (val.isDirty()) { |
| OrientDBQuery<K, T> dataStoreQuery = new OrientDBQuery<>(this); |
| dataStoreQuery.setStartKey(key); |
| dataStoreQuery.setEndKey(key); |
| dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); |
| |
| try (ODatabaseSession selectTx = connectionPool.acquire()) { |
| selectTx.activateOnCurrentThread(); |
| // TODO : further optimize for queries to separate cases update / insert == get rid of select all query |
| // TODO : for update |
| List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) |
| .execute(dataStoreQuery.getParams()); |
| if (result.size() == 1) { |
| ODocument document = updateOrientDocFromAvroBean(key, val, result.get(0)); |
| docBatch.add(document); |
| } else { |
| ODocument document = convertAvroBeanToOrientDoc(key, val); |
| docBatch.add(document); |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.info("Ignored putting persistent bean {} in the store as it is neither " |
| + "new, neither dirty.", new Object[]{val}); |
| } |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean delete(K key) throws GoraException { |
| Delete delete = new Delete(); |
| delete.from(orientDBMapping.getDocumentClass()) |
| .where(projection("_id").eq(Parameter.parameter("key"))); |
| Map<String, Object> params = new HashMap<String, Object>(); |
| params.put("key", key); |
| OCommandSQL query = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); |
| try (ODatabaseSession deleteTx = connectionPool.acquire()) { |
| deleteTx.activateOnCurrentThread(); |
| int deleteCount = deleteTx.command(query).execute(params); |
| if (deleteCount == 1) { |
| return true; |
| } else { |
| return false; |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public long deleteByQuery(Query<K, T> query) throws GoraException { |
| Delete delete = new Delete(); |
| delete.from(orientDBMapping.getDocumentClass()); |
| Map<String, Object> params = new HashMap<String, Object>(); |
| if (query.getFields() == null || (query.getFields().length == getFields().length)) { |
| if (query.getStartKey() != null) { |
| delete.where(projection("_id").ge(Parameter.parameter("start"))); |
| params.put("start", query.getStartKey()); |
| } |
| if (query.getEndKey() != null) { |
| delete.where(projection("_id").le(Parameter.parameter("end"))); |
| params.put("end", query.getEndKey()); |
| } |
| |
| OCommandSQL dbQuery = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); |
| try (ODatabaseSession deleteTx = connectionPool.acquire()) { |
| deleteTx.activateOnCurrentThread(); |
| int deleteCount; |
| if (params.isEmpty()) { |
| deleteCount = deleteTx.command(dbQuery).execute(); |
| } else { |
| deleteCount = deleteTx.command(dbQuery).execute(params); |
| } |
| if (deleteCount > 0) { |
| return deleteCount; |
| } else { |
| return 0; |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } else { |
| |
| OrientDBQuery<K, T> dataStoreQuery = new OrientDBQuery<>(this); |
| dataStoreQuery.setStartKey(query.getStartKey()); |
| dataStoreQuery.setEndKey(query.getEndKey()); |
| dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); |
| |
| try (ODatabaseSession selectTx = connectionPool.acquire()) { |
| selectTx.activateOnCurrentThread(); |
| List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) |
| .execute(dataStoreQuery.getParams()); |
| if (result != null && result.isEmpty()) { |
| return 0; |
| } else { |
| for (ODocument doc : result) { |
| for (String docField : query.getFields()) { |
| if (doc.containsField(orientDBMapping.getDocumentField(docField))) { |
| doc.removeField(orientDBMapping.getDocumentField(docField)); |
| } |
| } |
| doc.save(); |
| } |
| return result.size(); |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Result<K, T> execute(Query<K, T> query) throws GoraException { |
| String[] fields = getFieldsToQuery(query.getFields()); |
| OrientDBQuery dataStoreQuery; |
| if (query instanceof OrientDBQuery) { |
| dataStoreQuery = ((OrientDBQuery) query); |
| } else { |
| dataStoreQuery = (OrientDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery(); |
| } |
| dataStoreQuery.populateOrientDBQuery(orientDBMapping, fields, getFields()); |
| try (ODatabaseSession selectTx = connectionPool.acquire()) { |
| selectTx.activateOnCurrentThread(); |
| OConcurrentLegacyResultSet<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) |
| .execute(dataStoreQuery.getParams()); |
| result.setLimit((int) query.getLimit()); |
| return new OrientDBResult<K, T>(this, query, result); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Query<K, T> newQuery() { |
| OrientDBQuery<K, T> query = new OrientDBQuery<K, T>(this); |
| query.setFields(getFieldsToQuery(null)); |
| return new OrientDBQuery<K, T>(this); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { |
| // TODO : Improve code on OrientDB clusters |
| List<PartitionQuery<K, T>> partitions = new ArrayList<>(); |
| PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( |
| query); |
| partitionQuery.setConf(this.getConf()); |
| partitions.add(partitionQuery); |
| return partitions; |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Flushes locally cached to content in memory to remote OrientDB server. |
| */ |
| @Override |
| public void flush() throws GoraException { |
| try (ODatabaseSession updateTx = connectionPool.acquire()) { |
| updateTx.activateOnCurrentThread(); |
| flushLock.lock(); |
| for (ODocument document : docBatch) { |
| updateTx.save(document); |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } finally { |
| docBatch.clear(); |
| flushLock.unlock(); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Releases resources which have been used dataStore. Eg:- OrientDB Client connection pool. |
| */ |
| @Override |
| public void close() { |
| try { |
| flush(); |
| } catch (Exception ex) { |
| LOG.error("Error occurred while flushing data to OrientDB : ", ex); |
| } |
| docBatch.clear(); |
| remoteServerAdmin.close(); |
| connectionPool.close(); |
| } |
| |
| /** |
| * Returns OrientDB client connection pool maintained at Gora dataStore. |
| * |
| * @return {@link OPartitionedDatabasePool} OrientDB client connection pool. |
| */ |
| public ODatabasePool getConnectionPool() { |
| return connectionPool; |
| } |
| |
| public T convertOrientDocToAvroBean(final ODocument obj, final String[] fields) throws GoraException { |
| T persistent = newPersistent(); |
| String[] dbFields = getFieldsToQuery(fields); |
| for (String f : dbFields) { |
| String docf = orientDBMapping.getDocumentField(f); |
| if (docf == null || !obj.containsField(docf)) |
| continue; |
| |
| OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); |
| Schema.Field field = fieldMap.get(f); |
| Schema fieldSchema = field.schema(); |
| |
| LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}", |
| new Object[]{field.name(), fieldSchema.getType(), docf, storeType}); |
| Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj); |
| persistent.put(field.pos(), result); |
| } |
| persistent.clearDirty(); |
| return persistent; |
| } |
| |
| private Object convertDocFieldToAvroField(final Schema fieldSchema, |
| final OrientDBMapping.DocumentFieldType storeType, |
| final Schema.Field field, |
| final String docf, |
| final ODocument obj) throws GoraException { |
| Object result = null; |
| switch (fieldSchema.getType()) { |
| case MAP: |
| result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType); |
| break; |
| case ARRAY: |
| result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType); |
| break; |
| case RECORD: |
| ODocument record = obj.field(docf); |
| if (record == null) { |
| result = null; |
| break; |
| } |
| result = convertAvroBeanToOrientDoc(fieldSchema, record); |
| break; |
| case BOOLEAN: |
| result = OType.convert(obj.field(docf), Boolean.class); |
| break; |
| case DOUBLE: |
| result = OType.convert(obj.field(docf), Double.class); |
| break; |
| case FLOAT: |
| result = OType.convert(obj.field(docf), Float.class); |
| break; |
| case INT: |
| result = OType.convert(obj.field(docf), Integer.class); |
| break; |
| case LONG: |
| result = OType.convert(obj.field(docf), Long.class); |
| break; |
| case STRING: |
| result = convertDocFieldToAvroString(storeType, docf, obj); |
| break; |
| case ENUM: |
| result = AvroUtils.getEnumValue(fieldSchema, obj.field(docf)); |
| break; |
| case BYTES: |
| case FIXED: |
| if (obj.field(docf) == null) { |
| result = null; |
| break; |
| } |
| result = ByteBuffer.wrap((byte[]) obj.field(docf)); |
| break; |
| case NULL: |
| result = null; |
| break; |
| case UNION: |
| result = convertDocFieldToAvroUnion(fieldSchema, storeType, field, docf, obj); |
| break; |
| default: |
| LOG.warn("Unable to read {}", docf); |
| break; |
| } |
| return result; |
| } |
| |
| private Object convertDocFieldToAvroList(final String docf, |
| final Schema fieldSchema, |
| final ODocument doc, |
| final Schema.Field f, |
| final OrientDBMapping.DocumentFieldType storeType) throws GoraException { |
| if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { |
| OTrackedSet<Object> set = doc.field(docf); |
| List<Object> rlist = new ArrayList<>(); |
| if (set == null) { |
| return new DirtyListWrapper(rlist); |
| } |
| |
| for (Object item : set) { |
| Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, |
| "item", new ODocument("item", item)); |
| rlist.add(o); |
| } |
| return new DirtyListWrapper<>(rlist); |
| |
| } else { |
| OTrackedList<Object> list = doc.field(docf); |
| List<Object> rlist = new ArrayList<>(); |
| if (list == null) { |
| return new DirtyListWrapper(rlist); |
| } |
| |
| for (Object item : list) { |
| Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, |
| "item", new ODocument("item", item)); |
| rlist.add(o); |
| } |
| return new DirtyListWrapper<>(rlist); |
| } |
| } |
| |
| private Object convertAvroListToDocField(final String docf, final Collection<?> array, |
| final Schema fieldSchema, final Schema.Type fieldType, |
| final OrientDBMapping.DocumentFieldType storeType) { |
| if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST) { |
| ArrayList list; |
| list = new ArrayList<Object>(); |
| if (array == null) |
| return list; |
| for (Object item : array) { |
| OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); |
| Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); |
| list.add(result); |
| } |
| return list; |
| } else if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { |
| HashSet set; |
| set = new HashSet<Object>(); |
| if (array == null) |
| return set; |
| for (Object item : array) { |
| OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); |
| Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); |
| set.add(result); |
| } |
| return set; |
| } |
| return null; |
| } |
| |
| private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema, |
| final ODocument doc, final Schema.Field f, |
| final OrientDBMapping.DocumentFieldType storeType) throws GoraException { |
| if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { |
| OTrackedMap<Object> map = doc.field(docf); |
| Map<Utf8, Object> rmap = new HashMap<>(); |
| if (map == null) { |
| return new DirtyMapWrapper(rmap); |
| } |
| |
| for (Map.Entry entry : map.entrySet()) { |
| String mapKey = decodeFieldKey((String) entry.getKey()); |
| Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, |
| decorateOTrackedMapToODoc(map)); |
| rmap.put(new Utf8(mapKey), o); |
| } |
| return new DirtyMapWrapper<>(rmap); |
| } else { |
| ODocument innerDoc = doc.field(docf); |
| Map<Utf8, Object> rmap = new HashMap<>(); |
| if (innerDoc == null) { |
| return new DirtyMapWrapper(rmap); |
| } |
| |
| for (String fieldName : innerDoc.fieldNames()) { |
| String mapKey = decodeFieldKey(fieldName); |
| Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, |
| innerDoc); |
| rmap.put(new Utf8(mapKey), o); |
| } |
| return new DirtyMapWrapper<>(rmap); |
| } |
| } |
| |
| private ODocument decorateOTrackedMapToODoc(OTrackedMap<Object> map) { |
| ODocument doc = new ODocument(); |
| for (Map.Entry entry : map.entrySet()) { |
| doc.field((String) entry.getKey(), entry.getValue()); |
| } |
| return doc; |
| } |
| |
| private Object convertAvroMapToDocField(final String docf, |
| final Map<CharSequence, ?> value, final Schema fieldSchema, |
| final Schema.Type fieldType, |
| final OrientDBMapping.DocumentFieldType storeType) { |
| if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { |
| HashMap map = new HashMap<String, Object>(); |
| if (value == null) |
| return map; |
| |
| for (Map.Entry<CharSequence, ?> e : value.entrySet()) { |
| String mapKey = encodeFieldKey(e.getKey().toString()); |
| Object mapValue = e.getValue(); |
| |
| OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); |
| Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, |
| mapValue); |
| map.put(mapKey, result); |
| } |
| return map; |
| } else { |
| ODocument doc = new ODocument("map" + docf); |
| if (value == null) |
| return doc; |
| for (Map.Entry<CharSequence, ?> e : value.entrySet()) { |
| String mapKey = encodeFieldKey(e.getKey().toString()); |
| Object mapValue = e.getValue(); |
| |
| OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); |
| Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, |
| mapValue); |
| doc.field(mapKey, result); |
| } |
| return doc; |
| } |
| } |
| |
| private Object convertAvroBeanToOrientDoc(final Schema fieldSchema, |
| final ODocument doc) throws GoraException { |
| Object result; |
| Class<?> clazz = null; |
| try { |
| clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); |
| for (Schema.Field recField : fieldSchema.getFields()) { |
| Schema innerSchema = recField.schema(); |
| OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping |
| .getDocumentFieldType(recField.name()); |
| String innerDocField = orientDBMapping.getDocumentField(recField.name()) != null ? orientDBMapping |
| .getDocumentField(recField.name()) : recField.name(); |
| LOG.debug("Load from ODocument (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}", |
| new Object[]{recField.name(), innerSchema.getType(), innerDocField, |
| innerStoreType}); |
| record.put(recField.pos(), |
| convertDocFieldToAvroField(innerSchema, innerStoreType, recField, innerDocField, |
| doc)); |
| } |
| result = record; |
| return result; |
| } |
| |
| private Object convertDocFieldToAvroString(final OrientDBMapping.DocumentFieldType storeType, |
| final String docf, final ODocument doc) { |
| Object result; |
| if (storeType == OrientDBMapping.DocumentFieldType.DATE || |
| storeType == OrientDBMapping.DocumentFieldType.DATETIME) { |
| Date dateTime = doc.field(docf); |
| Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.getDefault()); |
| calendar.setTime(dateTime); |
| result = new Utf8(DatatypeConverter.printDateTime(calendar)); |
| } else { |
| result = new Utf8((String) doc.field(encodeFieldKey(docf))); |
| } |
| return result; |
| } |
| |
| private Object convertDocFieldToAvroUnion(final Schema fieldSchema, |
| final OrientDBMapping.DocumentFieldType storeType, |
| final Schema.Field field, |
| final String docf, |
| final ODocument doc) throws GoraException { |
| Object result; |
| Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); |
| Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); |
| |
| if (!type0.equals(type1) |
| && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { |
| Schema innerSchema = null; |
| if (type0.equals(Schema.Type.NULL)) { |
| innerSchema = fieldSchema.getTypes().get(1); |
| } else { |
| innerSchema = fieldSchema.getTypes().get(0); |
| } |
| |
| LOG.debug("Load from ODocument (UNION), schemaType:{}, docField:{}, storeType:{}", |
| new Object[]{innerSchema.getType(), docf, storeType}); |
| |
| result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc); |
| } else { |
| throw new GoraException("OrientDBStore only supports Union of two types field."); |
| } |
| return result; |
| } |
| |
| private Object convertAvroUnionToOrientDBField(final String docf, final Schema fieldSchema, |
| final OrientDBMapping.DocumentFieldType storeType, |
| final Object value) { |
| Object result; |
| Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); |
| Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); |
| |
| if (!type0.equals(type1) |
| && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { |
| Schema innerSchema = null; |
| if (type0.equals(Schema.Type.NULL)) { |
| innerSchema = fieldSchema.getTypes().get(1); |
| } else { |
| innerSchema = fieldSchema.getTypes().get(0); |
| } |
| |
| LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}", |
| new Object[]{innerSchema.getType(), type1, storeType}); |
| |
| result = convertAvroFieldToOrientField(docf, innerSchema, innerSchema.getType(), storeType, value); |
| } else { |
| throw new IllegalStateException("OrientDBStore only supports Union of two types field."); |
| } |
| return result; |
| } |
| |
| private ODocument convertAvroBeanToOrientDoc(final K key, final T persistent) { |
| ODocument result = new ODocument(orientDBMapping.getDocumentClass()); |
| for (Schema.Field f : persistent.getSchema().getFields()) { |
| if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { |
| String docf = orientDBMapping.getDocumentField(f.name()); |
| Object value = persistent.get(f.pos()); |
| OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); |
| LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", |
| new Object[]{docf, f.schema().getType(), storeType}); |
| Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), |
| storeType, value); |
| result.field(docf, o); |
| } |
| } |
| result.field("_id", key); |
| return result; |
| } |
| |
| private ODocument updateOrientDocFromAvroBean(final K key, final T persistent, final ODocument result) { |
| for (Schema.Field f : persistent.getSchema().getFields()) { |
| if (persistent.isDirty(f.pos()) /*&& (persistent.get(f.pos()) != null)*/) { |
| String docf = orientDBMapping.getDocumentField(f.name()); |
| if (persistent.get(f.pos()) == null) { |
| result.removeField(docf); |
| continue; |
| } |
| Object value = persistent.get(f.pos()); |
| OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); |
| LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", |
| new Object[]{docf, f.schema().getType(), storeType}); |
| Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), |
| storeType, value); |
| result.field(docf, o); |
| } |
| } |
| return result; |
| } |
| |
| private Object convertAvroFieldToOrientField(final String docf, final Schema fieldSchema, |
| final Schema.Type fieldType, |
| final OrientDBMapping.DocumentFieldType storeType, |
| final Object value) { |
| Object result = null; |
| switch (fieldType) { |
| case MAP: |
| if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP || |
| storeType == OrientDBMapping.DocumentFieldType.EMBEDDED)) { |
| throw new IllegalStateException( |
| "Field " + fieldSchema.getName() |
| + ": to store a AVRO 'map', target OrientDB mapping have to be of type 'EmbeddedMap'" + |
| "| 'Embedded'"); |
| } |
| Schema valueSchema = fieldSchema.getValueType(); |
| result = convertAvroMapToDocField(docf, (Map<CharSequence, ?>) value, valueSchema, |
| valueSchema.getType(), storeType); |
| break; |
| case ARRAY: |
| if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST || |
| storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET)) { |
| throw new IllegalStateException("Field " + fieldSchema.getName() |
| + ": To store a AVRO 'array', target Mongo mapping have to be of type 'EmbeddedMap'" + |
| "|'EmbeddedList'"); |
| } |
| Schema elementSchema = fieldSchema.getElementType(); |
| result = convertAvroListToDocField(docf, (List<?>) value, elementSchema, |
| elementSchema.getType(), storeType); |
| break; |
| case BYTES: |
| if (value != null) { |
| result = ((ByteBuffer) value).array(); |
| } |
| break; |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case BOOLEAN: |
| result = value; |
| break; |
| case STRING: |
| result = convertAvroStringToDocField(fieldSchema, storeType, value); |
| break; |
| case ENUM: |
| if (value != null) |
| result = value.toString(); |
| break; |
| case RECORD: |
| if (value == null) |
| break; |
| result = convertAvroBeanToOrientDoc(docf, fieldSchema, value); |
| break; |
| case UNION: |
| result = convertAvroUnionToOrientDBField(docf, fieldSchema, storeType, value); |
| break; |
| case FIXED: |
| result = value; |
| break; |
| default: |
| LOG.error("Unknown field type: {}", fieldSchema.getType()); |
| break; |
| } |
| return result; |
| } |
| |
| private Object convertAvroStringToDocField(final Schema fieldSchema, |
| final OrientDBMapping.DocumentFieldType storeType, |
| final Object value) { |
| Object result = null; |
| if (storeType == OrientDBMapping.DocumentFieldType.DATETIME) { |
| if (value != null) { |
| Calendar dateTime = null; |
| try { |
| dateTime = DatatypeConverter.parseDateTime(value.toString()); |
| } catch (IllegalArgumentException e) { |
| throw new IllegalStateException("Field " + fieldSchema.getType() |
| + ": Invalid date and time format '" + value + "'", e); |
| |
| } |
| result = dateTime.getTime(); |
| } |
| } else if (storeType == OrientDBMapping.DocumentFieldType.DATE) { |
| Calendar date = null; |
| try { |
| date = DatatypeConverter.parseDate(value.toString()); |
| } catch (IllegalArgumentException e) { |
| throw new IllegalStateException("Field " + fieldSchema.getType() |
| + ": Invalid date format '" + value + "'", e); |
| } |
| result = date.getTime(); |
| } else { |
| if (value != null) { |
| result = value.toString(); |
| } |
| } |
| return result; |
| } |
| |
| private ODocument convertAvroBeanToOrientDoc(final String docf, |
| final Schema fieldSchema, |
| final Object value) { |
| ODocument record = new ODocument("record" + docf); |
| for (Schema.Field member : fieldSchema.getFields()) { |
| Object innerValue = ((PersistentBase) value).get(member.pos()); |
| String innerDoc = orientDBMapping.getDocumentField(member.name()); |
| Schema.Type innerType = member.schema().getType(); |
| OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping.getDocumentFieldType(innerDoc); |
| LOG.debug("Transform value to ODocument , docField:{}, schemaType:{}, storeType:{}", |
| new Object[]{member.name(), member.schema().getType(), |
| innerStoreType}); |
| Object fieldValue = convertAvroFieldToOrientField(docf, member.schema() |
| , innerType, innerStoreType, innerValue); |
| record.field(member.name(), fieldValue); |
| } |
| return record; |
| } |
| |
| private String encodeFieldKey(final String key) { |
| if (key == null) { |
| return null; |
| } |
| return key.replace(".", "\u00B7") |
| .replace(":", "\u00FF") |
| .replace(";", "\u00FE") |
| .replace(" ", "\u00FD") |
| .replace("%", "\u00FC") |
| .replace("=", "\u00FB"); |
| } |
| |
| private String decodeFieldKey(final String key) { |
| if (key == null) { |
| return null; |
| } |
| return key.replace("\u00B7", ".") |
| .replace("\u00FF", ":") |
| .replace("\u00FE", ";") |
| .replace("\u00FD", " ") |
| .replace("\u00FC", "%") |
| .replace("\u00FB", "="); |
| } |
| |
| @Override |
| public boolean exists(K key) throws GoraException { |
| return get(key) != null; |
| } |
| } |