| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.gora.mongodb.store; |
| |
| import com.google.common.base.Splitter; |
| import com.mongodb.*; |
| import com.mongodb.client.*; |
| import com.mongodb.client.model.CountOptions; |
| import com.mongodb.client.model.CreateCollectionOptions; |
| import com.mongodb.client.model.UpdateOptions; |
| import com.mongodb.client.result.DeleteResult; |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.Schema.Type; |
| import org.apache.avro.generic.GenericArray; |
| import org.apache.avro.util.Utf8; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.gora.mongodb.filters.MongoFilterUtil; |
| import org.apache.gora.mongodb.query.MongoDBQuery; |
| import org.apache.gora.mongodb.query.MongoDBResult; |
| import org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType; |
| import org.apache.gora.mongodb.utils.BSONDecorator; |
| import org.apache.gora.mongodb.utils.Utf8Codec; |
| import org.apache.gora.persistency.impl.BeanFactoryImpl; |
| import org.apache.gora.persistency.impl.DirtyListWrapper; |
| import org.apache.gora.persistency.impl.DirtyMapWrapper; |
| import org.apache.gora.persistency.impl.PersistentBase; |
| import org.apache.gora.query.PartitionQuery; |
| import org.apache.gora.query.Query; |
| import org.apache.gora.query.Result; |
| import org.apache.gora.query.impl.PartitionQueryImpl; |
| import org.apache.gora.store.DataStoreFactory; |
| import org.apache.gora.store.impl.DataStoreBase; |
| import org.apache.gora.util.AvroUtils; |
| import org.apache.gora.util.ClassLoadingUtils; |
| import org.apache.gora.util.GoraException; |
| import org.bson.Document; |
| import org.bson.codecs.configuration.CodecRegistries; |
| import org.bson.codecs.configuration.CodecRegistry; |
| import org.bson.conversions.Bson; |
| import org.bson.types.ObjectId; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.xml.bind.DatatypeConverter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.Charset; |
| import java.util.*; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.stream.StreamSupport; |
| |
| import static com.mongodb.AuthenticationMechanism.*; |
| import static com.mongodb.client.model.Filters.and; |
| |
| /** |
| * Implementation of a MongoDB data store to be used by gora. |
| * |
| * @param <K> |
| * class to be used for the key |
| * @param <T> |
| * class to be persisted within the store |
| * @author Fabien Poulard fpoulard@dictanova.com |
| * @author Damien Raude-Morvan draudemorvan@dictanova.com |
| */ |
| public class MongoStore<K, T extends PersistentBase> extends |
| DataStoreBase<K, T> { |
| |
| public static final Logger LOG = LoggerFactory.getLogger(MongoStore.class); |
| |
| /** |
| * Default value for mapping file |
| */ |
| public static final String DEFAULT_MAPPING_FILE = "/gora-mongodb-mapping.xml"; |
| |
| /** |
| * Key at DataStore Properties (same as gora.properties) to hold a mapping loaded from memory, instead from filesystem like |
| * the one at PARSE_MAPPING_FILE_KEY. If this key is present, the mapping is loaded from the value instead of gora-mongodb-mapping.xml |
| */ |
| public static final String XML_MAPPING_DEFINITION = "gora.mapping" ; |
| |
| /** |
| * MongoDB client |
| */ |
| private static ConcurrentHashMap<String, com.mongodb.client.MongoClient> mapsOfClients = new ConcurrentHashMap<>(); |
| |
| private MongoDatabase mongoClientDB; |
| |
| private MongoCollection<Document> mongoClientColl; |
| |
| /** |
| * Mapping definition for MongoDB |
| */ |
| private MongoMapping mapping; |
| |
| private MongoFilterUtil<K, T> filterUtil; |
| |
| public MongoStore() { |
| // Create a default mapping that will be overriden in initialize method |
| this.mapping = new MongoMapping(); |
| } |
| |
| /** |
| * Initialize the data store by reading the credentials, setting the client's |
| * properties up and reading the mapping file. |
| */ |
| public void initialize(final Class<K> keyClass, |
| final Class<T> pPersistentClass, final Properties properties) throws GoraException { |
| try { |
| LOG.debug("Initializing MongoDB store"); |
| MongoStoreParameters parameters = MongoStoreParameters.load(properties, getConf()); |
| super.initialize(keyClass, pPersistentClass, properties); |
| |
| filterUtil = new MongoFilterUtil<>(getConf()); |
| |
| // Load the mapping |
| MongoMappingBuilder<K, T> builder = new MongoMappingBuilder<>(this); |
| LOG.debug("Initializing Mongo store with mapping {}.", |
| new Object[] { parameters.getMappingFile() }); |
| |
| InputStream mappingInputStream; |
| |
| // If there is a mapping definition in the Properties, use it. |
| if (properties.containsKey(XML_MAPPING_DEFINITION)) { |
| if (LOG.isTraceEnabled()) LOG.trace(XML_MAPPING_DEFINITION + " = " + properties.getProperty(XML_MAPPING_DEFINITION)); |
| mappingInputStream = IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset)null); |
| } |
| // Otherwise use the mapping file from parameters. |
| else { |
| String mappingFile = parameters.getMappingFile(); |
| mappingInputStream = getClass().getResourceAsStream(mappingFile); |
| } |
| |
| builder.fromInputStream(mappingInputStream); |
| mapping = builder.build(); |
| |
| // Prepare MongoDB connection |
| mongoClientDB = getDB(parameters); |
| mongoClientColl = mongoClientDB |
| .getCollection(mapping.getCollectionName()); |
| |
| LOG.info("Initialized Mongo store for database {} of {}.", new Object[] { |
| parameters.getDbname(), parameters.getServers() }); |
| } catch (GoraException e) { |
| throw e; |
| } catch (IOException e) { |
| LOG.error("Error while initializing MongoDB store", e); |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Retrieve a client connected to the MongoDB server to be used. |
| * |
| * @param params This value should specify the host:port (at least one) for |
| * connecting to remote MongoDB. |
| * @return a {@link com.mongodb.client.MongoClient} instance connected to the server |
| */ |
| private com.mongodb.client.MongoClient getClient(MongoStoreParameters params) { |
| |
| // Utf8 serialization! |
| CodecRegistry codecRegistry = CodecRegistries.fromRegistries( |
| MongoClientSettings.getDefaultCodecRegistry(), |
| CodecRegistries.fromCodecs(new Utf8Codec()) |
| ); |
| // Configure options |
| MongoClientSettings.Builder settings = MongoClientSettings.builder(); |
| settings.codecRegistry(codecRegistry); |
| if (params.getReadPreference() != null) { |
| settings.readPreference(ReadPreference.valueOf(params.getReadPreference())); |
| } |
| if (params.getWriteConcern() != null) { |
| settings.writeConcern(WriteConcern.valueOf(params.getWriteConcern())); |
| } |
| |
| |
| // Build server address |
| List<ServerAddress> seeds = new ArrayList<>(); |
| Iterable<String> serversArray = Splitter.on(",").split(params.getServers()); |
| for (String server : serversArray) { |
| Iterator<String> paramsIterator = Splitter.on(":").trimResults().split(server).iterator(); |
| if (!paramsIterator.hasNext()) { |
| // No server, use default |
| seeds.add(new ServerAddress()); |
| } else { |
| String host = paramsIterator.next(); |
| if (paramsIterator.hasNext()) { |
| String port = paramsIterator.next(); |
| seeds.add(new ServerAddress(host, Integer.parseInt(port))); |
| } else { |
| seeds.add(new ServerAddress(host)); |
| } |
| } |
| } |
| settings.applyToClusterSettings(builder -> builder.hosts(seeds)); |
| |
| // If configuration contains a login + secret, try to authenticated with DB |
| if (params.getLogin() != null && params.getSecret() != null) { |
| MongoCredential credential = createCredential(params.getAuthenticationType(), params.getLogin(), params.getDbname(), params.getSecret()); |
| settings.credential(credential); |
| } |
| |
| return MongoClients.create(settings.build()); |
| } |
| |
| /** |
| * This method creates credentials according to the Authentication type. |
| * |
| * @param authenticationType authentication Type (Authentication Mechanism) |
| * @param username username |
| * @param database database |
| * @param password password |
| * @return Mongo Crendential |
| * @see <a href="http://api.mongodb.com/java/current/com/mongodb/AuthenticationMechanism.html">AuthenticationMechanism in MongoDB Java Driver</a> |
| */ |
| private MongoCredential createCredential(String authenticationType, String username, String database, String password) { |
| MongoCredential credential = null; |
| if (PLAIN.getMechanismName().equals(authenticationType)) { |
| credential = MongoCredential.createPlainCredential(username, database, password.toCharArray()); |
| } else if (SCRAM_SHA_1.getMechanismName().equals(authenticationType)) { |
| credential = MongoCredential.createScramSha1Credential(username, database, password.toCharArray()); |
| } else if (SCRAM_SHA_256.getMechanismName().equals(authenticationType)) { |
| credential = MongoCredential.createScramSha256Credential(username, database, password.toCharArray()); |
| } else if (GSSAPI.getMechanismName().equals(authenticationType)) { |
| credential = MongoCredential.createGSSAPICredential(username); |
| } else if (MONGODB_X509.getMechanismName().equals(authenticationType)) { |
| credential = MongoCredential.createMongoX509Credential(username); |
| } else { |
| credential = MongoCredential.createCredential(username, database, password.toCharArray()); |
| } |
| return credential; |
| } |
| |
| /** |
| * Get reference to Mongo DB, using credentials if not null. |
| */ |
| private MongoDatabase getDB(MongoStoreParameters parameters) throws UnknownHostException { |
| |
| // Get reference to Mongo DB |
| if (!mapsOfClients.containsKey(parameters.getServers())) |
| mapsOfClients.put(parameters.getServers(), getClient(parameters)); |
| return mapsOfClients.get(parameters.getServers()).getDatabase(parameters.getDbname()); |
| } |
| |
| public MongoMapping getMapping() { |
| return mapping; |
| } |
| |
| /** |
| * Accessor to the name of the collection used. |
| */ |
| @Override |
| public String getSchemaName() { |
| return mapping.getCollectionName(); |
| } |
| |
| @Override |
| public String getSchemaName(final String mappingSchemaName, |
| final Class<?> persistentClass) { |
| return super.getSchemaName(mappingSchemaName, persistentClass); |
| } |
| |
| /** |
| * Create a new collection in MongoDB if necessary. |
| */ |
| @Override |
| public void createSchema() throws GoraException { |
| if (mongoClientDB == null) |
| throw new GoraException( |
| "Impossible to create the schema as no database has been selected."); |
| if (schemaExists()) { |
| return; |
| } |
| |
| try { |
| // If initialized create the collection |
| CreateCollectionOptions opts = new CreateCollectionOptions(); |
| String name = mapping.getCollectionName(); |
| mongoClientDB.createCollection(name, opts); |
| mongoClientColl = mongoClientDB.getCollection(name); |
| |
| LOG.debug("Collection {} has been created for Mongo database {}.", |
| new Object[] {name, mongoClientDB.getName() }); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Drop the collection. |
| */ |
| @Override |
| public void deleteSchema() throws GoraException { |
| if (mongoClientColl == null) |
| throw new GoraException( |
| "Impossible to delete the schema as no schema is selected."); |
| |
| try { |
| // If initialized, simply drop the collection |
| mongoClientColl.drop(); |
| |
| LOG.debug( |
| "Collection {} has been dropped.", |
| new Object[] { mongoClientColl.getNamespace().getFullName() }); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Check if the collection already exists or should be created. |
| */ |
| @Override |
| public boolean schemaExists() throws GoraException { |
| try { |
| MongoIterable<String> names = mongoClientDB.listCollectionNames(); |
| String name = mapping.getCollectionName(); |
| return StreamSupport.stream(names.spliterator(), false) |
| .anyMatch(name::equals); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Ensure the data is synced to disk. |
| */ |
| @Override |
| public void flush() throws GoraException { |
| // no-op |
| } |
| |
| /** |
| * Release the resources linked to this collection |
| */ |
| @Override |
| public void close() { |
| } |
| |
| /** |
| * Retrieve an entry from the store with only selected fields. |
| * |
| * @param key |
| * identifier of the document in the database |
| * @param fields |
| * list of fields to be loaded from the database |
| */ |
| @Override |
| public T get(final K key, final String[] fields) throws GoraException { |
| try { |
| String[] dbFields = getFieldsToQuery(fields); |
| // Prepare the MongoDB query |
| Document q = new Document("_id", key); |
| Document proj = new Document(); |
| for (String field : dbFields) { |
| String docf = mapping.getDocumentField(field); |
| if (docf != null) { |
| proj.put(docf, true); |
| } |
| } |
| // Execute the query |
| FindIterable<Document> res = mongoClientColl.find(q).projection(proj); |
| // Build the corresponding persistent |
| return newInstance(res.first(), dbFields); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| @Override |
| public boolean exists(final K key) throws GoraException { |
| try { |
| // Prepare the MongoDB query |
| Document q = new Document("_id", key); |
| // Execute the query |
| long res = mongoClientColl.countDocuments(q); |
| return res > 0; |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Persist an object into the store. |
| * |
| * @param key |
| * identifier of the object in the store |
| * @param obj |
| * the object to be inserted |
| */ |
| @Override |
| public void put(final K key, final T obj) throws GoraException { |
| try { |
| // Save the object in the database |
| if (obj.isDirty()) { |
| performPut(key, obj); |
| } else { |
| LOG.info("Ignored putting object {} in the store as it is neither " |
| + "new, neither dirty.", new Object[] { obj }); |
| } |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Update a object that already exists in the store. The object must exist |
| * already or the update may fail. |
| * |
| * @param key |
| * identifier of the object in the store |
| * @param obj |
| * the object to be inserted |
| */ |
| private void performPut(final K key, final T obj) { |
| // Build the query to select the object to be updated |
| Document qSel = new Document("_id", key); |
| |
| // Build the update query |
| Document qUpdate = new Document(); |
| |
| Document qUpdateSet = newUpdateSetInstance(obj); |
| if (qUpdateSet.size() > 0) { |
| qUpdate.put("$set", qUpdateSet); |
| } |
| |
| Document qUpdateUnset = newUpdateUnsetInstance(obj); |
| if (qUpdateUnset.size() > 0) { |
| qUpdate.put("$unset", qUpdateUnset); |
| } |
| |
| // Execute the update (if there is at least one $set ot $unset |
| if (!qUpdate.isEmpty()) { |
| mongoClientColl.updateOne(qSel, qUpdate, new UpdateOptions().upsert(true)); |
| obj.clearDirty(); |
| } else { |
| LOG.debug("No update to perform, skip {}", key); |
| } |
| } |
| |
| @Override |
| public boolean delete(final K key) throws GoraException { |
| try { |
| Document removeKey = new Document("_id", key); |
| DeleteResult writeResult = mongoClientColl.deleteOne(removeKey); |
| return writeResult.getDeletedCount() > 0; |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| @Override |
| public long deleteByQuery(final Query<K, T> query) throws GoraException { |
| try { |
| // Build the actual MongoDB query |
| Bson q = MongoDBQuery.toDBQuery(query); |
| DeleteResult writeResult = mongoClientColl.deleteMany(q); |
| return writeResult.getDeletedCount(); |
| } catch (Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Execute the query and return the result. |
| */ |
| @Override |
| public Result<K, T> execute(final Query<K, T> query) throws GoraException { |
| try { |
| String[] fields = getFieldsToQuery(query.getFields()); |
| // Build the actual MongoDB query |
| Bson q = MongoDBQuery.toDBQuery(query); |
| Bson p = MongoDBQuery.toProjection(fields, mapping); |
| |
| if (query.getFilter() != null) { |
| Optional<Bson> filter = filterUtil.setFilter(query.getFilter(), this); |
| if (!filter.isPresent()) { |
| // don't need local filter |
| query.setLocalFilterEnabled(false); |
| } else { |
| q = and(q, filter.get()); |
| } |
| } |
| |
| // Execute the query on the collection |
| FindIterable<Document> iterable = mongoClientColl.find(q).projection(p); |
| CountOptions countOptions = new CountOptions(); |
| if (query.getLimit() > 0) { |
| iterable.limit((int) query.getLimit()); |
| countOptions.limit((int) query.getLimit()); |
| } |
| iterable.batchSize(100); |
| iterable.noCursorTimeout(true); |
| |
| // Build the result |
| long size = mongoClientColl.countDocuments(q, countOptions); |
| return new MongoDBResult<>(this, query, iterable.cursor(), size); |
| } catch(Exception e) { |
| throw new GoraException(e); |
| } |
| } |
| |
| /** |
| * Create a new {@link Query} to query the datastore. |
| */ |
| @Override |
| public Query<K, T> newQuery() { |
| MongoDBQuery<K, T> query = new MongoDBQuery<>(this); |
| query.setFields(getFieldsToQuery(null)); |
| return query; |
| } |
| |
| /** |
| * Partitions the given query and returns a list of PartitionQuerys, which |
| * will execute on local data. |
| */ |
| @Override |
| public List<PartitionQuery<K, T>> getPartitions(final Query<K, T> query) |
| throws IOException { |
| // FIXME: for now, there is only one partition as we do not handle |
| // MongoDB sharding configuration |
| List<PartitionQuery<K, T>> partitions = new ArrayList<>(); |
| PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( |
| query); |
| partitionQuery.setConf(getConf()); |
| partitions.add(partitionQuery); |
| return partitions; |
| } |
| |
| // //////////////////////////////////////////////////////// DESERIALIZATION |
| |
| /** |
| * Build a new instance of the persisted class from the {@link Document} |
| * retrieved from the database. |
| * |
| * @param obj |
| * the {@link Document} that results from the query to the database |
| * @param fields |
| * the list of fields to be mapped to the persistence class instance |
| * @return a persistence class instance which content was deserialized from |
| * the {@link Document} |
| * @throws GoraException |
| */ |
| public T newInstance(final Document obj, final String[] fields) throws GoraException { |
| if (obj == null) |
| return null; |
| BSONDecorator easybson = new BSONDecorator(obj); |
| // Create new empty persistent bean instance |
| T persistent = newPersistent(); |
| String[] dbFields = getFieldsToQuery(fields); |
| |
| // Populate each field |
| for (String f : dbFields) { |
| // Check the field exists in the mapping and in the db |
| String docf = mapping.getDocumentField(f); |
| if (docf == null || !easybson.containsField(docf)) |
| continue; |
| |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| Field field = fieldMap.get(f); |
| Schema fieldSchema = field.schema(); |
| |
| LOG.debug( |
| "Load from DBObject (MAIN), field:{}, schemaType:{}, docField:{}, storeType:{}", |
| new Object[] { field.name(), fieldSchema.getType(), docf, storeType }); |
| Object result = fromDocument(fieldSchema, storeType, field, docf, |
| easybson); |
| persistent.put(field.pos(), result); |
| } |
| persistent.clearDirty(); |
| return persistent; |
| } |
| |
| private Object fromDocument(final Schema fieldSchema, |
| final DocumentFieldType storeType, final Field field, final String docf, |
| final BSONDecorator easybson) throws GoraException { |
| Object result = null; |
| switch (fieldSchema.getType()) { |
| case MAP: |
| result = fromMongoMap(docf, fieldSchema, easybson, field); |
| break; |
| case ARRAY: |
| result = fromMongoList(docf, fieldSchema, easybson, field); |
| break; |
| case RECORD: |
| Document rec = easybson.getDBObject(docf); |
| if (rec == null) { |
| result = null; |
| break; |
| } |
| result = fromMongoRecord(fieldSchema, docf, rec); |
| break; |
| case BOOLEAN: |
| result = easybson.getBoolean(docf); |
| break; |
| case DOUBLE: |
| result = easybson.getDouble(docf); |
| break; |
| case FLOAT: |
| result = easybson.getDouble(docf).floatValue(); |
| break; |
| case INT: |
| result = easybson.getInt(docf); |
| break; |
| case LONG: |
| result = easybson.getLong(docf); |
| break; |
| case STRING: |
| result = fromMongoString(storeType, docf, easybson); |
| break; |
| case ENUM: |
| result = AvroUtils.getEnumValue(fieldSchema, easybson.getUtf8String(docf) |
| .toString()); |
| break; |
| case BYTES: |
| case FIXED: |
| result = easybson.getBytes(docf); |
| break; |
| case NULL: |
| result = null; |
| break; |
| case UNION: |
| result = fromMongoUnion(fieldSchema, storeType, field, docf, easybson); |
| break; |
| default: |
| LOG.warn("Unable to read {}", docf); |
| break; |
| } |
| return result; |
| } |
| |
| private Object fromMongoUnion(final Schema fieldSchema, |
| final DocumentFieldType storeType, final Field field, final String docf, |
| final BSONDecorator easybson) throws GoraException { |
| Object result;// schema [type0, type1] |
| Type type0 = fieldSchema.getTypes().get(0).getType(); |
| Type type1 = fieldSchema.getTypes().get(1).getType(); |
| |
| // Check if types are different and there's a "null", like ["null","type"] |
| // or ["type","null"] |
| if (!type0.equals(type1) |
| && (type0.equals(Type.NULL) || type1.equals(Type.NULL))) { |
| Schema innerSchema = fieldSchema.getTypes().get(1); |
| LOG.debug( |
| "Load from DBObject (UNION), schemaType:{}, docField:{}, storeType:{}", |
| new Object[] { innerSchema.getType(), docf, storeType }); |
| // Deserialize as if schema was ["type"] |
| result = fromDocument(innerSchema, storeType, field, docf, easybson); |
| } else { |
| throw new IllegalStateException( |
| "MongoStore doesn't support 3 types union field yet. Please update your mapping"); |
| } |
| return result; |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| private Object fromMongoRecord(final Schema fieldSchema, final String docf, |
| final Document rec) throws GoraException { |
| Object result; |
| BSONDecorator innerBson = new BSONDecorator(rec); |
| Class<?> clazz = null; |
| try { |
| clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); |
| } catch (ClassNotFoundException e) { |
| } |
| PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); |
| for (Field recField : fieldSchema.getFields()) { |
| Schema innerSchema = recField.schema(); |
| DocumentFieldType innerStoreType = mapping |
| .getDocumentFieldType(innerSchema.getName()); |
| String innerDocField = mapping.getDocumentField(recField.name()) != null ? mapping |
| .getDocumentField(recField.name()) : recField.name(); |
| String fieldPath = docf + "." + innerDocField; |
| LOG.debug( |
| "Load from DBObject (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}", |
| new Object[] { recField.name(), innerSchema.getType(), fieldPath, |
| innerStoreType }); |
| record.put( |
| recField.pos(), |
| fromDocument(innerSchema, innerStoreType, recField, innerDocField, |
| innerBson)); |
| } |
| result = record; |
| return result; |
| } |
| |
| /* pp */ Object fromMongoList(final String docf, final Schema fieldSchema, |
| final BSONDecorator easybson, final Field f) throws GoraException { |
| List<Document> list = easybson.getDBList(docf); |
| List<Object> rlist = new ArrayList<>(); |
| if (list == null) { |
| return new DirtyListWrapper(rlist); |
| } |
| |
| |
| for (Object item : list) { |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| Object o = fromDocument(fieldSchema.getElementType(), storeType, f, |
| "item", new BSONDecorator(new Document("item", item))); |
| rlist.add(o); |
| } |
| return new DirtyListWrapper<>(rlist); |
| } |
| |
| /* pp */ Object fromMongoMap(final String docf, final Schema fieldSchema, |
| final BSONDecorator easybson, final Field f) throws GoraException { |
| Document map = easybson.getDBObject(docf); |
| Map<Utf8, Object> rmap = new HashMap<>(); |
| if (map == null) { |
| return new DirtyMapWrapper(rmap); |
| } |
| for (Entry<String, Object> e : map.entrySet()) { |
| String mapKey = e.getKey(); |
| String decodedMapKey = decodeFieldKey(mapKey); |
| |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| Object o = fromDocument(fieldSchema.getValueType(), storeType, f, mapKey, |
| new BSONDecorator(map)); |
| rmap.put(new Utf8(decodedMapKey), o); |
| } |
| return new DirtyMapWrapper<>(rmap); |
| } |
| |
| private Object fromMongoString(final DocumentFieldType storeType, |
| final String docf, final BSONDecorator easybson) { |
| Object result; |
| if (storeType == DocumentFieldType.OBJECTID) { |
| // Try auto-conversion of BSON data to ObjectId |
| // It will work if data is stored as String or as ObjectId |
| Object bin = easybson.get(docf); |
| if (bin instanceof String) { |
| ObjectId id = new ObjectId((String) bin); |
| result = new Utf8(id.toString()); |
| } else { |
| result = new Utf8(bin.toString()); |
| } |
| } else if (storeType == DocumentFieldType.DATE) { |
| Object bin = easybson.get(docf); |
| if (bin instanceof Date) { |
| Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.getDefault()); |
| calendar.setTime((Date) bin); |
| result = new Utf8(DatatypeConverter.printDateTime(calendar)); |
| } else { |
| result = new Utf8(bin.toString()); |
| } |
| } else { |
| result = easybson.getUtf8String(docf); |
| } |
| return result; |
| } |
| |
| // ////////////////////////////////////////////////////////// SERIALIZATION |
| |
| /** |
| * Build a new instance of {@link Document} from the persistence class |
| * instance in parameter. Limit the {@link Document} to the fields that are |
| * dirty and not null, that is the fields that will need to be updated in the |
| * store. |
| * |
| * @param persistent |
| * a persistence class instance which content is to be serialized as |
| * a {@link Document} for use as parameter of a $set operator |
| * @return a {@link Document} which content corresponds to the fields that |
| * have to be updated... and formatted to be passed in parameter of a |
| * $set operator |
| */ |
| private Document newUpdateSetInstance(final T persistent) { |
| Document result = new Document(); |
| for (Field f : persistent.getSchema().getFields()) { |
| if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { |
| String docf = mapping.getDocumentField(f.name()); |
| Object value = persistent.get(f.pos()); |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| LOG.debug( |
| "Transform value to DBObject (MAIN), docField:{}, schemaType:{}, storeType:{}", |
| new Object[] { docf, f.schema().getType(), storeType }); |
| Object o = toDocument(docf, f.schema(), f.schema().getType(), |
| storeType, value); |
| result.put(docf, o); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Build a new instance of {@link Document} from the persistence class |
| * instance in parameter. Limit the {@link Document} to the fields that are |
| * dirty and null, that is the fields that will need to be updated in the |
| * store by being removed. |
| * |
| * @param persistent |
| * a persistence class instance which content is to be serialized as |
| * a {@link Document} for use as parameter of a $set operator |
| * @return a {@link Document} which content corresponds to the fields that |
| * have to be updated... and formated to be passed in parameter of a |
| * $unset operator |
| */ |
| private Document newUpdateUnsetInstance(final T persistent) { |
| Document result = new Document(); |
| for (Field f : persistent.getSchema().getFields()) { |
| if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) == null)) { |
| String docf = mapping.getDocumentField(f.name()); |
| Object value = persistent.get(f.pos()); |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| LOG.debug( |
| "Transform value to DBObject (MAIN), docField:{}, schemaType:{}, storeType:{}", |
| new Object[] { docf, f.schema().getType(), storeType }); |
| Object o = toDocument(docf, f.schema(), f.schema().getType(), |
| storeType, value); |
| result.put(docf, o); |
| } |
| } |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Object toDocument(final String docf, final Schema fieldSchema, |
| final Type fieldType, final DocumentFieldType storeType, |
| final Object value) { |
| Object result = null; |
| switch (fieldType) { |
| case MAP: |
| if (storeType != null && storeType != DocumentFieldType.DOCUMENT) { |
| throw new IllegalStateException( |
| "Field " |
| + fieldSchema.getType() |
| + ": to store a Gora 'map', target Mongo mapping have to be of 'document' type"); |
| } |
| Schema valueSchema = fieldSchema.getValueType(); |
| result = mapToMongo(docf, (Map<CharSequence, ?>) value, valueSchema, |
| valueSchema.getType()); |
| break; |
| case ARRAY: |
| if (storeType != null && storeType != DocumentFieldType.LIST) { |
| throw new IllegalStateException( |
| "Field " |
| + fieldSchema.getType() |
| + ": To store a Gora 'array', target Mongo mapping have to be of 'list' type"); |
| } |
| Schema elementSchema = fieldSchema.getElementType(); |
| result = listToMongo(docf, (List<?>) value, elementSchema, |
| elementSchema.getType()); |
| break; |
| case BYTES: |
| if (value != null) { |
| result = ((ByteBuffer) value).array(); |
| } |
| break; |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case BOOLEAN: |
| result = value; |
| break; |
| case STRING: |
| result = stringToMongo(fieldSchema, storeType, value); |
| break; |
| case ENUM: |
| if (value != null) |
| result = value.toString(); |
| break; |
| case RECORD: |
| if (value == null) |
| break; |
| result = recordToMongo(docf, fieldSchema, value); |
| break; |
| case UNION: |
| result = unionToMongo(docf, fieldSchema, storeType, value); |
| break; |
| case FIXED: |
| result = value; |
| break; |
| |
| default: |
| LOG.error("Unknown field type: {}", fieldSchema.getType()); |
| break; |
| } |
| |
| return result; |
| } |
| |
| private Object unionToMongo(final String docf, final Schema fieldSchema, |
| final DocumentFieldType storeType, final Object value) { |
| Object result;// schema [type0, type1] |
| Type type0 = fieldSchema.getTypes().get(0).getType(); |
| Type type1 = fieldSchema.getTypes().get(1).getType(); |
| |
| // Check if types are different and there's a "null", like ["null","type"] |
| // or ["type","null"] |
| if (!type0.equals(type1) |
| && (type0.equals(Type.NULL) || type1.equals(Type.NULL))) { |
| Schema innerSchema = fieldSchema.getTypes().get(1); |
| LOG.debug( |
| "Transform value to DBObject (UNION), schemaType:{}, type1:{}, storeType:{}", |
| new Object[] { innerSchema.getType(), type1, storeType }); |
| // Deserialize as if schema was ["type"] |
| result = toDocument(docf, innerSchema, type1, storeType, value); |
| } else { |
| throw new IllegalStateException( |
| "MongoStore doesn't support 3 types union field yet. Please update your mapping"); |
| } |
| return result; |
| } |
| |
| private Document recordToMongo(final String docf, |
| final Schema fieldSchema, final Object value) { |
| Document record = new Document(); |
| for (Field member : fieldSchema.getFields()) { |
| Object innerValue = ((PersistentBase) value).get(member.pos()); |
| String innerDoc = mapping.getDocumentField(member.name()); |
| Type innerType = member.schema().getType(); |
| DocumentFieldType innerStoreType = mapping.getDocumentFieldType(innerDoc); |
| LOG.debug( |
| "Transform value to DBObject (RECORD), docField:{}, schemaType:{}, storeType:{}", |
| new Object[] { member.name(), member.schema().getType(), |
| innerStoreType }); |
| record.put( |
| member.name(), |
| toDocument(docf, member.schema(), innerType, innerStoreType, |
| innerValue)); |
| } |
| return record; |
| } |
| |
| private Object stringToMongo(final Schema fieldSchema, |
| final DocumentFieldType storeType, final Object value) { |
| Object result = null; |
| if (storeType == DocumentFieldType.OBJECTID) { |
| if (value != null) { |
| ObjectId id; |
| try { |
| id = new ObjectId(value.toString()); |
| } catch (IllegalArgumentException e1) { |
| // Unable to parse anything from Utf8 value, throw error |
| throw new IllegalStateException("Field " + fieldSchema.getType() |
| + ": Invalid string: unable to convert to ObjectId"); |
| } |
| result = id; |
| } |
| } else if (storeType == DocumentFieldType.DATE) { |
| if (value != null) { |
| // Try to parse date from Utf8 value |
| Calendar calendar = null; |
| try { |
| // Parse as date + time |
| calendar = DatatypeConverter.parseDateTime(value.toString()); |
| } catch (IllegalArgumentException e1) { |
| try { |
| // Parse as date only |
| calendar = DatatypeConverter.parseDate(value.toString()); |
| } catch (IllegalArgumentException e2) { |
| // No-op |
| } |
| } |
| if (calendar == null) { |
| // Unable to parse anything from Utf8 value, throw error |
| throw new IllegalStateException("Field " + fieldSchema.getType() |
| + ": Invalid date format '" + value + "'"); |
| } |
| result = calendar.getTime(); |
| } |
| } else { |
| if (value != null) { |
| result = value.toString(); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Convert a Java Map as used in Gora generated classes to a Map that can |
| * safely be serialized into MongoDB. |
| * |
| * @param value |
| * the Java Map that must be serialized into a MongoDB object |
| * @param fieldType |
| * type of the values within the map |
| * @return a {@link Document} version of the {@link Map} that can be |
| * safely serialized into MongoDB. |
| */ |
| private Document mapToMongo(final String docf, |
| final Map<CharSequence, ?> value, final Schema fieldSchema, |
| final Type fieldType) { |
| Document map = new Document(); |
| // Handle null case |
| if (value == null) |
| return map; |
| |
| // Handle regular cases |
| for (Entry<CharSequence, ?> e : value.entrySet()) { |
| String mapKey = e.getKey().toString(); |
| String encodedMapKey = encodeFieldKey(mapKey); |
| Object mapValue = e.getValue(); |
| |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| Object result = toDocument(docf, fieldSchema, fieldType, storeType, |
| mapValue); |
| map.put(encodedMapKey, result); |
| } |
| |
| return map; |
| } |
| |
| /** |
| * Convert a Java {@link GenericArray} as used in Gora generated classes to a |
| * List that can safely be serialized into MongoDB. |
| * |
| * @param array |
| * the {@link GenericArray} to be serialized |
| * @param fieldType |
| * type of the elements within the array |
| * @return a {@link BasicDBList} version of the {@link GenericArray} that can |
| * be safely serialized into MongoDB. |
| */ |
| private BasicDBList listToMongo(final String docf, final Collection<?> array, |
| final Schema fieldSchema, final Type fieldType) { |
| BasicDBList list = new BasicDBList(); |
| // Handle null case |
| if (array == null) |
| return list; |
| |
| // Handle regular cases |
| for (Object item : array) { |
| DocumentFieldType storeType = mapping.getDocumentFieldType(docf); |
| Object result = toDocument(docf, fieldSchema, fieldType, storeType, item); |
| list.add(result); |
| } |
| |
| return list; |
| } |
| |
| // //////////////////////////////////////////////////////// CLEANUP |
| |
| /** |
| * Ensure Key encoding -> dots replaced with middle dots |
| * |
| * @param key |
| * char with only dots. |
| * @return encoded string with "\u00B7" chars.. |
| */ |
| public String encodeFieldKey(final String key) { |
| if (key == null) { |
| return null; |
| } |
| return key.replace(".", "\u00B7"); |
| } |
| |
| /** |
| * Ensure Key decoding -> middle dots replaced with dots |
| * |
| * @param key |
| * encoded string with "\u00B7" chars. |
| * @return Cleanup up char with only dots. |
| */ |
| public String decodeFieldKey(final String key) { |
| if (key == null) { |
| return null; |
| } |
| return key.replace("\u00B7", "."); |
| } |
| |
| } |