blob: b185f08418810e612e334872219a0746637ae1ec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.mongodb3;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.DistinctIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.mongodb.client.model.Filters.eq;
import static org.apache.camel.component.mongodb3.MongoDbConstants.BATCH_SIZE;
import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION;
import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION_INDEX;
import static org.apache.camel.component.mongodb3.MongoDbConstants.CRITERIA;
import static org.apache.camel.component.mongodb3.MongoDbConstants.DATABASE;
import static org.apache.camel.component.mongodb3.MongoDbConstants.FIELDS_PROJECTION;
import static org.apache.camel.component.mongodb3.MongoDbConstants.LIMIT;
import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
import static org.apache.camel.component.mongodb3.MongoDbConstants.MULTIUPDATE;
import static org.apache.camel.component.mongodb3.MongoDbConstants.NUM_TO_SKIP;
import static org.apache.camel.component.mongodb3.MongoDbConstants.OID;
import static org.apache.camel.component.mongodb3.MongoDbConstants.OPERATION_HEADER;
import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_AFFECTED;
import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_MATCHED;
import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_PAGE_SIZE;
import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_TOTAL_SIZE;
import static org.apache.camel.component.mongodb3.MongoDbConstants.SORT_BY;
import static org.apache.camel.component.mongodb3.MongoDbConstants.UPSERT;
import static org.apache.camel.component.mongodb3.MongoDbConstants.WRITERESULT;
/**
* The MongoDb producer.
*/
public class MongoDbProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
private final Map<MongoDbOperation, Processor> operations = new HashMap<>();
private MongoDbEndpoint endpoint;
{
bind(MongoDbOperation.aggregate, createDoAggregate());
bind(MongoDbOperation.bulkWrite, createDoBulkWrite());
bind(MongoDbOperation.command, createDoCommand());
bind(MongoDbOperation.count, createDoCount());
bind(MongoDbOperation.findDistinct, createDoDistinct());
bind(MongoDbOperation.findAll, createDoFindAll());
bind(MongoDbOperation.findById, createDoFindById());
bind(MongoDbOperation.findOneByQuery, createDoFindOneByQuery());
bind(MongoDbOperation.getColStats, createDoGetColStats());
bind(MongoDbOperation.getDbStats, createDoGetDbStats());
bind(MongoDbOperation.insert, createDoInsert());
bind(MongoDbOperation.remove, createDoRemove());
bind(MongoDbOperation.save, createDoSave());
bind(MongoDbOperation.update, createDoUpdate());
}
public MongoDbProducer(MongoDbEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
public void process(Exchange exchange) throws Exception {
MongoDbOperation operation = endpoint.getOperation();
Object header = exchange.getIn().getHeader(OPERATION_HEADER);
if (header != null) {
LOG.debug("Overriding default operation with operation specified on header: {}", header);
try {
if (header instanceof MongoDbOperation) {
operation = ObjectHelper.cast(MongoDbOperation.class, header);
} else {
// evaluate as a String
operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(OPERATION_HEADER, String.class));
}
} catch (Exception e) {
throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
}
}
try {
invokeOperation(operation, exchange);
} catch (Exception e) {
throw MongoDbComponent.wrapInCamelMongoDbException(e);
}
}
/**
* Entry method that selects the appropriate MongoDB operation and executes it
*/
protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
Processor processor = operations.get(operation);
if (processor != null) {
processor.process(exchange);
} else {
throw new CamelMongoDbException("Operation not supported. Value: " + operation);
}
}
private MongoDbProducer bind(MongoDbOperation operation, Function<Exchange, Object> mongoDbFunction) {
operations.put(operation, wrap(mongoDbFunction, operation));
return this;
}
// ----------- MongoDB operations ----------------
private Document createDbStatsCommand() {
return new Document("dbStats", 1).append("scale", 1);
}
private Document createCollStatsCommand(String collectionName) {
return new Document("collStats", collectionName);
}
// --------- Convenience methods -----------------------
private MongoDatabase calculateDb(Exchange exchange) {
// dynamic calculation is an option. In most cases it won't be used and
// we should not penalise all users with running this
// resolution logic on every Exchange if they won't be using this
// functionality at all
if (!endpoint.isDynamicity()) {
return endpoint.getMongoDatabase();
}
String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
MongoDatabase db;
if (dynamicDB == null) {
db = endpoint.getMongoDatabase();
} else {
db = endpoint.getMongoConnection().getDatabase(dynamicDB);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dynamic database selected: {}", db.getName());
}
return db;
}
private String calculateCollectionName(Exchange exchange) {
if (!endpoint.isDynamicity()) {
return endpoint.getCollection();
}
String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
if (dynamicCollection == null) {
return endpoint.getCollection();
}
return dynamicCollection;
}
private MongoCollection<Document> calculateCollection(Exchange exchange) {
// dynamic calculation is an option. In most cases it won't be used and
// we should not penalise all users with running this
// resolution logic on every Exchange if they won't be using this
// functionality at all
if (!endpoint.isDynamicity()) {
return endpoint.getMongoCollection().withWriteConcern(endpoint.getWriteConcern());
}
String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
@SuppressWarnings("unchecked")
List<Bson> dynamicIndex = exchange.getIn().getHeader(COLLECTION_INDEX, List.class);
MongoCollection<Document> dbCol;
if (dynamicDB == null && dynamicCollection == null) {
dbCol = endpoint.getMongoCollection().withWriteConcern(endpoint.getWriteConcern());
} else {
MongoDatabase db = calculateDb(exchange);
if (dynamicCollection == null) {
dbCol = db.getCollection(endpoint.getCollection(), Document.class);
} else {
dbCol = db.getCollection(dynamicCollection, Document.class);
// on the fly add index
if (dynamicIndex == null) {
endpoint.ensureIndex(dbCol, endpoint.createIndex());
} else {
endpoint.ensureIndex(dbCol, dynamicIndex);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dynamic database and/or collection selected: {}->{}", endpoint.getDatabase(), endpoint.getCollection());
}
return dbCol;
}
@SuppressWarnings("rawtypes")
private List<Document> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
List<Document> documentList = new ArrayList<>(insertList.size());
TypeConverter converter = exchange.getContext().getTypeConverter();
for (Object item : insertList) {
try {
Document document = converter.mandatoryConvertTo(Document.class, item);
documentList.add(document);
} catch (Exception e) {
throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-Document items", e);
}
}
return documentList;
}
private boolean isWriteOperation(MongoDbOperation operation) {
return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
}
private Processor wrap(Function<Exchange, Object> supplier, MongoDbOperation operation) {
return exchange -> {
Object result = supplier.apply(exchange);
copyHeaders(exchange);
moveBodyToOutIfResultIsReturnedAsHeader(exchange, operation);
processAndTransferResult(result, exchange, operation);
};
}
private void copyHeaders(Exchange exchange) {
MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
}
private void moveBodyToOutIfResultIsReturnedAsHeader(Exchange exchange, MongoDbOperation operation) {
if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
exchange.getOut().setBody(exchange.getIn().getBody());
}
}
private void processAndTransferResult(Object result, Exchange exchange, MongoDbOperation operation) {
// determine where to set the WriteResult: as the OUT body or as an IN
// message header
if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
exchange.getOut().setHeader(WRITERESULT, result);
} else {
exchange.getOut().setBody(result);
}
}
private Function<Exchange, Object> createDoGetColStats() {
return exch -> calculateDb(exch).runCommand(createCollStatsCommand(calculateCollectionName(exch)));
}
private Function<Exchange, Object> createDoFindOneByQuery() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class);
if (null == query) {
query = exchange.getIn().getMandatoryBody(Bson.class);
}
Bson sortBy = exchange.getIn().getHeader(SORT_BY, Bson.class);
Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
if (fieldFilter == null) {
fieldFilter = new Document();
}
if (sortBy == null) {
sortBy = new Document();
}
Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first();
exchange.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
return ret;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Payload is no Document", e);
}
};
}
private Function<Exchange, Object> createDoCount() {
return exchange -> {
Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class);
if (query == null) {
query = exchange.getContext().getTypeConverter().tryConvertTo(Bson.class, exchange, exchange.getIn().getBody());
}
if (query == null) {
query = new Document();
}
return calculateCollection(exchange).count(query);
};
}
private Function<Exchange, Object> createDoDistinct() {
return exchange -> {
Iterable<String> result = new ArrayList<>();
MongoCollection<Document> dbCol = calculateCollection(exchange);
// get the parameters out of the Exchange Header
String distinctFieldName = exchange.getIn().getHeader(MongoDbConstants.DISTINCT_QUERY_FIELD, String.class);
Bson query = exchange.getContext().getTypeConverter().tryConvertTo(Bson.class, exchange, exchange.getIn().getBody());
DistinctIterable<String> ret;
if (query != null) {
ret = dbCol.distinct(distinctFieldName, query, String.class);
} else {
ret = dbCol.distinct(distinctFieldName, String.class);
}
try {
ret.iterator().forEachRemaining(((List<String>) result)::add);
exchange.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<String>) result).size());
} finally {
ret.iterator().close();
}
return result;
};
}
private Function<Exchange, Object> createDoFindAll() {
return exchange -> {
Iterable<Document> result;
MongoCollection<Document> dbCol = calculateCollection(exchange);
// do not use getMandatoryBody, because if the body is empty we want
// to retrieve all objects in the collection
Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class);
// do not run around looking for a type converter unless there is a
// need for it
if (query == null && exchange.getIn().getBody() != null) {
query = exchange.getContext().getTypeConverter().tryConvertTo(Bson.class, exchange, exchange.getIn().getBody());
}
Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
// get the batch size and number to skip
Integer batchSize = exchange.getIn().getHeader(BATCH_SIZE, Integer.class);
Integer numToSkip = exchange.getIn().getHeader(NUM_TO_SKIP, Integer.class);
Integer limit = exchange.getIn().getHeader(LIMIT, Integer.class);
Document sortBy = exchange.getIn().getHeader(SORT_BY, Document.class);
FindIterable<Document> ret;
if (query == null && fieldFilter == null) {
ret = dbCol.find();
} else if (fieldFilter == null) {
ret = dbCol.find(query);
} else if (query != null) {
ret = dbCol.find(query).projection(fieldFilter);
} else {
ret = dbCol.find().projection(fieldFilter);
}
if (sortBy != null) {
ret.sort(sortBy);
}
if (batchSize != null) {
ret.batchSize(batchSize);
}
if (numToSkip != null) {
ret.skip(numToSkip);
}
if (limit != null) {
ret.limit(limit);
}
if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
try {
result = new ArrayList<>();
ret.iterator().forEachRemaining(((List<Document>)result)::add);
exchange.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>)result).size());
} finally {
ret.iterator().close();
}
} else {
result = ret;
}
return result;
};
}
private Function<Exchange, Object> createDoInsert() {
return exchange -> {
MongoCollection<Document> dbCol = calculateCollection(exchange);
boolean singleInsert = true;
Object insert = exchange.getContext().getTypeConverter().tryConvertTo(Document.class, exchange, exchange.getIn().getBody());
// body could not be converted to Document, check to see if it's of
// type List<Document>
if (insert == null) {
insert = exchange.getIn().getBody(List.class);
// if the body of type List was obtained, ensure that all items
// are of type Document and cast the List to List<Document>
if (insert != null) {
singleInsert = false;
insert = attemptConvertToList((List<?>)insert, exchange);
} else {
throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>");
}
}
if (singleInsert) {
Document insertObject = Document.class.cast(insert);
dbCol.insertOne(insertObject);
exchange.getIn().setHeader(OID, insertObject.get(MONGO_ID));
} else {
@SuppressWarnings("unchecked")
List<Document> insertObjects = (List<Document>)insert;
dbCol.insertMany(insertObjects);
List<Object> objectIdentification = new ArrayList<>(insertObjects.size());
objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList()));
exchange.getIn().setHeader(OID, objectIdentification);
}
return insert;
};
}
private Function<Exchange, Object> createDoUpdate() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Bson updateCriteria = exchange.getIn().getHeader(CRITERIA, Bson.class);
Bson objNew;
if (null == updateCriteria) {
@SuppressWarnings("unchecked")
List<Bson> saveObj = exchange.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class));
if (saveObj.size() != 2) {
throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2");
}
updateCriteria = saveObj.get(0);
objNew = saveObj.get(1);
} else {
objNew = exchange.getIn().getMandatoryBody(Bson.class);
}
Boolean multi = exchange.getIn().getHeader(MULTIUPDATE, Boolean.class);
Boolean upsert = exchange.getIn().getHeader(UPSERT, Boolean.class);
UpdateResult result;
UpdateOptions options = new UpdateOptions();
if (upsert != null) {
options.upsert(upsert);
}
if (multi == null || !multi) {
result = dbCol.updateOne(updateCriteria, objNew, options);
} else {
result = dbCol.updateMany(updateCriteria, objNew, options);
}
if (result.isModifiedCountAvailable()) {
exchange.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount());
}
exchange.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount());
return result;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for update", e);
}
};
}
private Function<Exchange, Object> createDoRemove() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Bson removeObj = exchange.getIn().getMandatoryBody(Bson.class);
DeleteResult result = dbCol.deleteMany(removeObj);
if (result.wasAcknowledged()) {
exchange.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount());
}
return result;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for remove", e);
}
};
}
private Function<Exchange, Object> createDoAggregate() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
@SuppressWarnings("unchecked")
List<Bson> query = exchange.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class));
// Allow body to be a pipeline
// @see http://docs.mongodb.org/manual/core/aggregation/
List<Bson> queryList;
if (query != null) {
queryList = query.stream().collect(Collectors.toList());
} else {
queryList = Arrays.asList(Bson.class.cast(exchange.getIn().getMandatoryBody(Bson.class)));
}
// The number to skip must be in body query
AggregateIterable<Document> aggregationResult = dbCol.aggregate(queryList);
// get the batch size
Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
if (batchSize != null) {
aggregationResult.batchSize(batchSize);
}
Boolean allowDiskUse = exchange.getIn().getHeader(MongoDbConstants.ALLOW_DISK_USE, Boolean.FALSE, Boolean.class);
aggregationResult.allowDiskUse(allowDiskUse);
Iterable<Document> result;
if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
try {
result = new ArrayList<>();
aggregationResult.iterator().forEachRemaining(((List<Document>) result)::add);
exchange.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<Document>) result).size());
} finally {
aggregationResult.iterator().close();
}
} else {
result = aggregationResult;
}
return result;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for aggregate", e);
}
};
}
private Function<Exchange, Object> createDoCommand() {
return exchange -> {
try {
MongoDatabase db = calculateDb(exchange);
Document cmdObj = exchange.getIn().getMandatoryBody(Document.class);
return db.runCommand(cmdObj);
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for command", e);
}
};
}
private Function<Exchange, Object> createDoGetDbStats() {
return exchange1 -> calculateDb(exchange1).runCommand(createDbStatsCommand());
}
private Function<Exchange, Object> createDoFindById() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Object id = exchange.getIn().getMandatoryBody();
Bson o = Filters.eq(MONGO_ID, id);
Document ret;
Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
if (fieldFilter == null) {
fieldFilter = new Document();
}
ret = dbCol.find(o).projection(fieldFilter).first();
exchange.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
return ret;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for findById", e);
}
};
}
private Function<Exchange, Object> createDoSave() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Document saveObj = exchange.getIn().getMandatoryBody(Document.class);
UpdateOptions options = new UpdateOptions().upsert(true);
UpdateResult result;
if (null == saveObj.get(MONGO_ID)) {
result = dbCol.replaceOne(Filters.where("false"), saveObj, options);
exchange.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue());
} else {
result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options);
exchange.getIn().setHeader(OID, saveObj.get(MONGO_ID));
}
return result;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Body incorrect type for save", e);
}
};
}
private Function<Exchange, Object> createDoBulkWrite() {
return exchange -> {
try {
MongoCollection<Document> dbCol = calculateCollection(exchange);
Boolean ordered = exchange.getIn().getHeader(MongoDbConstants.BULK_ORDERED, Boolean.TRUE, Boolean.class);
BulkWriteOptions options = new BulkWriteOptions().ordered(ordered);
@SuppressWarnings("unchecked")
List<WriteModel<Document>> requests = exchange.getIn().getMandatoryBody((Class<List<WriteModel<Document>>>)Class.class.cast(List.class));
BulkWriteResult result = dbCol.bulkWrite(requests, options);
return result;
} catch (InvalidPayloadException e) {
throw new CamelMongoDbException("Invalid payload for bulk write", e);
}
};
}
}