| = MongoDB Component |
| :doctitle: MongoDB |
| :shortname: mongodb |
| :artifactid: camel-mongodb |
| :description: Perform operations on MongoDB documents and collections. |
| :since: 2.19 |
| :supportlevel: Stable |
| :tabs-sync-option: |
| :component-header: Both producer and consumer are supported |
| //Manually maintained attributes |
| :camel-spring-boot-name: mongodb |
| |
| *Since Camel {since}* |
| |
| *{component-header}* |
| |
| According to Wikipedia: "NoSQL is a movement promoting a loosely defined |
| class of non-relational data stores that break with a long history of |
| relational databases and ACID guarantees." NoSQL solutions have grown in |
| popularity in the last few years, and major extremely-used sites and |
| services such as Facebook, LinkedIn, Twitter, etc. are known to use them |
| extensively to achieve scalability and agility. |
| |
| Basically, NoSQL solutions differ from traditional RDBMS (Relational |
| Database Management Systems) in that they don't use SQL as their query |
| language and generally don't offer ACID-like transactional behaviour nor |
| relational data. Instead, they are designed around the concept of |
| flexible data structures and schemas (meaning that the traditional |
| concept of a database table with a fixed schema is dropped), extreme |
| scalability on commodity hardware and blazing-fast processing. |
| |
| MongoDB is a very popular NoSQL solution and the camel-mongodb component |
| integrates Camel with MongoDB allowing you to interact with MongoDB |
| collections both as a producer (performing operations on the collection) |
| and as a consumer (consuming documents from a MongoDB collection). |
| |
| MongoDB revolves around the concepts of documents (not as is office |
| documents, but rather hierarchical data defined in JSON/BSON) and |
| collections. This component page will assume you are familiar with them. |
| Otherwise, visit http://www.mongodb.org/[http://www.mongodb.org/]. |
| |
| [NOTE] |
| ==== |
| The MongoDB Camel component uses Mongo Java Driver 4.x. |
| ==== |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component: |
| |
| [source,xml] |
| ------------------------------------------------------------ |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-mongodb</artifactId> |
| <version>x.y.z</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ------------------------------------------------------------ |
| |
| == URI formats |
| |
| --------------------------------------------------------------------------------------------------------------- |
| mongodb:connectionBean?database=databaseName&collection=collectionName&operation=operationName[&moreOptions...] |
| mongodb:dummy?hosts=hostnames&database=databaseName&collection=collectionName&operation=operationName[&moreOptions...] |
| --------------------------------------------------------------------------------------------------------------- |
| |
| // component-configure options: START |
| |
| // component-configure options: END |
| |
| // component options: START |
| include::partial$component-configure-options.adoc[] |
| include::partial$component-endpoint-options.adoc[] |
| // component options: END |
| |
| // endpoint options: START |
| |
| // endpoint options: END |
| // component headers: START |
| include::partial$component-endpoint-headers.adoc[] |
| // component headers: END |
| |
| |
| [[MongoDB-ConfigurationofdatabaseinSpringXML]] |
| == Configuration of database in Spring XML |
| |
| The following Spring XML creates a bean defining the connection to a |
| MongoDB instance. |
| |
| Since mongo java driver 3, the WriteConcern and readPreference options are not dynamically modifiable. They are defined in the mongoClient object |
| |
| [source,xml] |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| <beans xmlns="http://www.springframework.org/schema/beans" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
| xmlns:context="http://www.springframework.org/schema/context" |
| xmlns:mongo="http://www.springframework.org/schema/data/mongo" |
| xsi:schemaLocation="http://www.springframework.org/schema/context |
| http://www.springframework.org/schema/context/spring-context.xsd |
| http://www.springframework.org/schema/data/mongo |
| http://www.springframework.org/schema/data/mongo/spring-mongo.xsd |
| http://www.springframework.org/schema/beans |
| http://www.springframework.org/schema/beans/spring-beans.xsd"> |
| |
| <mongo:mongo-client id="mongoBean" host="${mongo.url}" port="${mongo.port}" credentials="${mongo.user}:${mongo.pass}@${mongo.dbname}"> |
| <mongo:client-options write-concern="NORMAL" /> |
| </mongo:mongo-client> |
| </beans> |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| |
| == Sample route |
| |
| The following route defined in Spring XML executes the operation |
| <<getDbStats>> on a collection. |
| |
| *Get DB stats for specified collection* |
| |
| [source,xml] |
| --------------------------------------------------------------------------------------------------------------------------- |
| <route> |
| <from uri="direct:start" /> |
| <!-- using bean 'mongoBean' defined above --> |
| <to uri="mongodb:mongoBean?database=${mongodb.database}&collection=${mongodb.collection}&operation=getDbStats" /> |
| <to uri="direct:result" /> |
| </route> |
| --------------------------------------------------------------------------------------------------------------------------- |
| |
| == MongoDB operations - producer endpoints |
| |
| === Query operations |
| |
| ==== findById |
| |
| This operation retrieves only one element from the collection whose _id |
| field matches the content of the IN message body. The incoming object |
| can be anything that has an equivalent to a `Bson` type. See |
| http://bsonspec.org/spec.html[http://bsonspec.org/spec.html] |
| and |
| http://www.mongodb.org/display/DOCS/Java+Types[http://www.mongodb.org/display/DOCS/Java+Types]. |
| |
| [source,java] |
| ------------------------------------------------------------------------------ |
| from("direct:findById") |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") |
| .to("mock:resultFindById"); |
| ------------------------------------------------------------------------------ |
| |
| Please, note that the default _id is treated by Mongo as and `ObjectId` type, so you may need to convert it properly. |
| |
| [source,java] |
| ------------------------------------------------------------------------------ |
| from("direct:findById") |
| .convertBodyTo(ObjectId.class) |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") |
| .to("mock:resultFindById"); |
| ------------------------------------------------------------------------------ |
| |
| [TIP] |
| ==== |
| *Supports optional parameters* |
| |
| This operation supports projection operators. |
| See <<Specifying a fields filter (projection)>>. |
| ==== |
| |
| ==== findOneByQuery |
| |
| Retrieve the first element from a collection matching a MongoDB query selector. |
| *If the `CamelMongoDbCriteria` header is set, then its value is used as the query selector*. |
| If the `CamelMongoDbCriteria` header is _null_, then the IN message body is used as the query |
| selector. In both cases, the query selector should be of type `Bson` or convertible to |
| `Bson` (for instance, a JSON string or `HashMap`). See <<Type conversions>> for more info. |
| |
| Create query selectors using the `Filters` provided by the MongoDB Driver. |
| |
| ===== Example without a query selector (returns the first document in a collection) |
| |
| [source,java] |
| ------------------------------------------------------------------------------------ |
| from("direct:findOneByQuery") |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") |
| .to("mock:resultFindOneByQuery"); |
| ------------------------------------------------------------------------------------ |
| |
| ===== Example with a query selector (returns the first matching document in a collection): |
| |
| [source,java] |
| ------------------------------------------------------------------------------------ |
| from("direct:findOneByQuery") |
| .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani"))) |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") |
| .to("mock:resultFindOneByQuery"); |
| ------------------------------------------------------------------------------------ |
| |
| [TIP] |
| ==== |
| *Supports optional parameters* |
| |
| This operation supports projection operators and sort clauses. |
| See <<Specifying a fields filter (projection)>>, <<Specifying a sort clause>>. |
| ==== |
| |
| ==== findAll |
| |
| The `findAll` operation returns all documents matching a query, or none |
| at all, in which case all documents contained in the collection are |
| returned. *The query object is extracted `CamelMongoDbCriteria` header*. |
| if the CamelMongoDbCriteria header is null the query object is extracted |
| message body, i.e. it should be of type `Bson` or convertible to `Bson`. |
| It can be a JSON String or a Hashmap. |
| See <<Type conversions>> for more info. |
| |
| ===== Example without a query selector (returns all documents in a collection) |
| |
| [source,java] |
| ----------------------------------------------------------------------------- |
| from("direct:findAll") |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") |
| .to("mock:resultFindAll"); |
| ----------------------------------------------------------------------------- |
| |
| ===== Example with a query selector (returns all matching documents in a collection) |
| |
| [source,java] |
| ----------------------------------------------------------------------------- |
| from("direct:findAll") |
| .setHeader(MongoDbConstants.CRITERIA, Filters.eq("name", "Raul Kripalani")) |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") |
| .to("mock:resultFindAll"); |
| ----------------------------------------------------------------------------- |
| |
| ===== Example with option _outputType=MongoIterable_ and batch size |
| |
| [source,java] |
| ----------------------------------------------------------------------------- |
| from("direct:findAll") |
| .setHeader(MongoDbConstants.BATCH_SIZE).constant(10) |
| .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani"))) |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=MongoIterable") |
| .to("mock:resultFindAll"); |
| ----------------------------------------------------------------------------- |
| |
| [TIP] |
| ==== |
| *Supports optional parameters* |
| |
| This operation supports projection operators and sort clauses. |
| See <<Specifying a fields filter (projection)>>, <<Specifying a sort clause>>. |
| ==== |
| |
| ==== count |
| |
| Returns the total number of objects in a collection, returning a Long as |
| the OUT message body. + |
| The following example will count the number of records in the |
| "dynamicCollectionName" collection. Notice how dynamicity is enabled, |
| and as a result, the operation will not run against the |
| "notableScientists" collection, but against the "dynamicCollectionName" |
| collection. |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------------------ |
| from("direct:count") |
| .to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true"); |
| |
| Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName"); |
| assertTrue("Result is not of type Long", result instanceof Long); |
| ------------------------------------------------------------------------------------------------------------------------------------ |
| |
| You can provide a query |
| *The query object is extracted `CamelMongoDbCriteria` header*. |
| if the CamelMongoDbCriteria header is null the query object is extracted |
| message body, i.e. it should be of type `Bson` or convertible to |
| `Bson`., and |
| operation will return the amount of documents matching this criteria. |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------ |
| Document query = ... |
| Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName"); |
| ------------------------------------------------------------------------------------------------------------------------ |
| |
| ==== Specifying a fields filter (projection) |
| |
| Query operations will, by default, return the matching objects in their |
| entirety (with all their fields). If your documents are large and you |
| only require retrieving a subset of their fields, you can specify a |
| field filter in all query operations, simply by setting the relevant |
| `Bson` (or type convertible to `Bson`, such as a JSON String, |
| Map, etc.) on the `CamelMongoDbFieldsProjection` header, constant shortcut: |
| `MongoDbConstants.FIELDS_PROJECTION`. |
| |
| Here is an example that uses MongoDB's `Projections` to simplify |
| the creation of Bson. It retrieves all fields except `_id` and |
| `boringField`: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------- |
| // route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") |
| Bson fieldProjection = Projection.exclude("_id", "boringField"); |
| Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection); |
| ---------------------------------------------------------------------------------------------------------------------------- |
| |
| Here is an example that uses MongoDB's `Projections` to simplify |
| the creation of Bson. It retrieves all fields except `_id` and |
| `boringField`: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------- |
| // route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") |
| Bson fieldProjection = Projection.exclude("_id", "boringField"); |
| Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection); |
| ---------------------------------------------------------------------------------------------------------------------------- |
| |
| ==== Specifying a sort clause |
| |
| There is a often a requirement to fetch the min/max record from a |
| collection based on sorting by a particular field |
| that uses MongoDB's `Sorts` to simplify |
| the creation of Bson. It retrieves all fields except `_id` and |
| `boringField`: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------- |
| // route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") |
| Bson sorts = Sorts.descending("_id"); |
| Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.SORT_BY, sorts); |
| ---------------------------------------------------------------------------------------------------------------------------- |
| |
| In a Camel route the SORT_BY header can be used with the findOneByQuery |
| operation to achieve the same result. If the FIELDS_PROJECTION header is also |
| specified the operation will return a single field/value pair |
| that can be passed directly to another component (for example, a |
| parameterized MyBatis SELECT query). This example demonstrates fetching |
| the temporally newest document from a collection and reducing the result |
| to a single field, based on the `documentTimestamp` field: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------- |
| .from("direct:someTriggeringEvent") |
| .setHeader(MongoDbConstants.SORT_BY).constant(Sorts.descending("documentTimestamp")) |
| .setHeader(MongoDbConstants.FIELDS_PROJECTION).constant(Projection.include("documentTimestamp")) |
| .setBody().constant("{}") |
| .to("mongodb:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery") |
| .to("direct:aMyBatisParameterizedSelect"); |
| ---------------------------------------------------------------------------------------------------------------------------- |
| |
| === Create/update operations |
| |
| ==== insert |
| |
| Inserts an new object into the MongoDB collection, taken from the IN |
| message body. Type conversion is attempted to turn it into `Document` or |
| a `List`. + |
| Two modes are supported: single insert and multiple insert. For |
| multiple insert, the endpoint will expect a List, Array or Collections |
| of objects of any type, as long as they are - or can be converted to - |
| `Document`. |
| Example: |
| |
| [source,java] |
| ----------------------------------------------------------------------------- |
| from("direct:insert") |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); |
| ----------------------------------------------------------------------------- |
| |
| The operation will return a WriteResult, and depending on the |
| `WriteConcern` or the value of the `invokeGetLastError` option, |
| `getLastError()` would have been called already or not. If you want to |
| access the ultimate result of the write operation, you need to retrieve |
| the `CommandResult` by calling `getLastError()` or |
| `getCachedLastError()` on the `WriteResult`. Then you can verify the |
| result by calling `CommandResult.ok()`, |
| `CommandResult.getErrorMessage()` and/or `CommandResult.getException()`. |
| |
| Note that the new object's `_id` must be unique in the collection. If |
| you don't specify the value, MongoDB will automatically generate one for |
| you. But if you do specify it and it is not unique, the insert operation |
| will fail (and for Camel to notice, you will need to enable |
| invokeGetLastError or set a WriteConcern that waits for the write |
| result). |
| |
| This is not a limitation of the component, but it is how things work in |
| MongoDB for higher throughput. If you are using a custom `_id`, you are |
| expected to ensure at the application level that is unique (and this is |
| a good practice too). |
| |
| OID(s) of the inserted record(s) is stored in the |
| message header under `CamelMongoOid` key (`MongoDbConstants.OID` |
| constant). The value stored is `org.bson.types.ObjectId` for single |
| insert or `java.util.List<org.bson.types.ObjectId>` if multiple records |
| have been inserted. |
| |
| In MongoDB Java Driver 3.x the insertOne and insertMany operation return void. |
| The Camel insert operation return the Document or List of Documents inserted. Note that each Documents are Updated by a new OID if need. |
| |
| ==== save |
| |
| The save operation is equivalent to an _upsert_ (UPdate, inSERT) |
| operation, where the record will be updated, and if it doesn't exist, it |
| will be inserted, all in one atomic operation. MongoDB will perform the |
| matching based on the `_id` field. |
| |
| Beware that in case of an update, the object is replaced entirely and |
| the usage of |
| http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[MongoDB's |
| $modifiers] is not permitted. Therefore, if you want to manipulate the |
| object if it already exists, you have two options: |
| |
| 1. perform a query to retrieve the entire object first along with all |
| its fields (may not be efficient), alter it inside Camel and then save |
| it. |
| 2. use the update operation with |
| http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[$modifiers], |
| which will execute the update at the server-side instead. You can enable |
| the upsert flag, in which case if an insert is required, MongoDB will |
| apply the $modifiers to the filter query object and insert the result. |
| |
| If the document to be saved does not contain the `_id` attribute, the operation will be an insert, and the new `_id` created will be placed in the `CamelMongoOid` header. |
| |
| For example: |
| |
| [source,java] |
| --------------------------------------------------------------------------- |
| from("direct:insert") |
| .to("mongodb:myDb?database=flights&collection=tickets&operation=save"); |
| --------------------------------------------------------------------------- |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=save"); |
| org.bson.Document docForSave = new org.bson.Document(); |
| docForSave.put("key", "value"); |
| Object result = template.requestBody("direct:insert", docForSave); |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| |
| ==== update |
| |
| Update one or multiple records on the collection. Requires a filter query and |
| a update rules. |
| |
| You can define the filter using MongoDBConstants.CRITERIA header as `Bson` |
| and define the update rules as `Bson` in Body. |
| |
| [NOTE] |
| ==== |
| *Update after enrich* |
| |
| While defining the filter by using MongoDBConstants.CRITERIA header as `Bson` |
| to query mongodb before you do update, you should notice you need to remove it from the resulting camel exchange |
| during aggregation if you use enrich pattern with a aggregation strategy and then apply mongodb update. |
| If you don't remove this header during aggregation and/or redefine MongoDBConstants.CRITERIA header before sending |
| camel exchange to mongodb producer endpoint, you may end up with invalid camel exchange payload while updating mongodb. |
| ==== |
| |
| |
| The second way Require a |
| List<Bson> as the IN message body containing exactly 2 elements: |
| |
| * Element 1 (index 0) => filter query => determines what objects will be |
| affected, same as a typical query object |
| * Element 2 (index 1) => update rules => how matched objects will be |
| updated. All |
| http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[modifier |
| operations] from MongoDB are supported. |
| |
| [NOTE] |
| ==== |
| *Multiupdates* |
| |
| By default, MongoDB will only update 1 object even if multiple objects |
| match the filter query. To instruct MongoDB to update *all* matching |
| records, set the `CamelMongoDbMultiUpdate` IN message header to `true`. |
| ==== |
| |
| |
| A header with key `CamelMongoDbRecordsAffected` will be returned |
| (`MongoDbConstants.RECORDS_AFFECTED` constant) with the number of |
| records updated (copied from `WriteResult.getN()`). |
| |
| For example, the following will update *all* records whose filterField |
| field equals true by setting the value of the "scientist" field to |
| "Darwin": |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); |
| List<Bson> body = new ArrayList<>(); |
| Bson filterField = Filters.eq("filterField", true); |
| body.add(filterField); |
| BsonDocument updateObj = new BsonDocument().append("$set", new BsonDocument("scientist", new BsonString("Darwin"))); |
| body.add(updateObj); |
| Object result = template.requestBodyAndHeader("direct:update", body, MongoDbConstants.MULTIUPDATE, true); |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); |
| Maps<String, Object> headers = new HashMap<>(2); |
| headers.add(MongoDbConstants.MULTIUPDATE, true); |
| headers.add(MongoDbConstants.FIELDS_FILTER, Filters.eq("filterField", true)); |
| String updateObj = Updates.set("scientist", "Darwin");; |
| Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers); |
| |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); |
| String updateObj = "[{\"filterField\": true}, {\"$set\", {\"scientist\", \"Darwin\"}}]"; |
| Object result = template.requestBodyAndHeader("direct:update", updateObj, MongoDbConstants.MULTIUPDATE, true); |
| |
| ------------------------------------------------------------------------------------------------------------------------------------------ |
| |
| === Delete operations |
| |
| ==== remove |
| |
| Remove matching records from the collection. The IN message body will |
| act as the removal filter query, and is expected to be of type |
| `DBObject` or a type convertible to it. + |
| The following example will remove all objects whose field |
| 'conditionField' equals true, in the science database, notableScientists |
| collection: |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove"); |
| Bson conditionField = Filters.eq("conditionField", true); |
| Object result = template.requestBody("direct:remove", conditionField); |
| ------------------------------------------------------------------------------------------------------------------ |
| |
| A header with key `CamelMongoDbRecordsAffected` is returned |
| (`MongoDbConstants.RECORDS_AFFECTED` constant) with type `int`, |
| containing the number of records deleted (copied from |
| `WriteResult.getN()`). |
| |
| === Bulk Write Operations |
| |
| ==== bulkWrite |
| |
| Performs write operations in bulk with controls for order of execution. |
| Requires a `List<WriteModel<Document>>` as the IN message body containing commands for insert, update, and delete operations. |
| |
| The following example will insert a new scientist "Pierre Curie", update record with id "5" by setting the value of the "scientist" field to |
| "Marie Curie" and delete record with id "3" : |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------------------------------ |
| // route: from("direct:bulkWrite").to("mongodb:myDb?database=science&collection=notableScientists&operation=bulkWrite"); |
| List<WriteModel<Document>> bulkOperations = Arrays.asList( |
| new InsertOneModel<>(new Document("scientist", "Pierre Curie")), |
| new UpdateOneModel<>(new Document("_id", "5"), |
| new Document("$set", new Document("scientist", "Marie Curie"))), |
| new DeleteOneModel<>(new Document("_id", "3"))); |
| |
| BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class); |
| ------------------------------------------------------------------------------------------------------------------ |
| |
| By default, operations are executed in order and interrupted on the first write error without processing any remaining write operations in the list. |
| To instruct MongoDB to continue to process remaining write operations in the list, set the `CamelMongoDbBulkOrdered` IN message header to `false`. |
| Unordered operations are executed in parallel and this behavior is not guaranteed. |
| |
| |
| === Other operations |
| |
| ==== aggregate |
| |
| Perform a aggregation with the given pipeline contained in the |
| body. |
| *Aggregations could be long and heavy operations. Use with care.* |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| // route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate"); |
| List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", |
| group("$scientist", sum("count", 1))); |
| from("direct:aggregate") |
| .setBody().constant(aggregate) |
| .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate") |
| .to("mock:resultAggregate"); |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| |
| By default, a List of all results is returned. This can be heavy on memory depending on the size of the results. A safer alternative is to set your |
| outputType=MongoIterable. The next Processor will see an iterable in the message body allowing it to step through the results one by one. Thus setting |
| a batch size and returning an iterable allows for efficient retrieval and processing of the result. |
| |
| An example would look like: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", |
| group("$scientist", sum("count", 1))); |
| from("direct:aggregate") |
| .setHeader(MongoDbConstants.BATCH_SIZE).constant(10) |
| .setBody().constant(aggregate) |
| .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable") |
| .split(body()) |
| .streaming() |
| .to("mock:resultAggregate"); |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| |
| Note that calling `.split(body())` is enough to send the entries down the route one-by-one, however it would still load all the entries into memory first. |
| Calling `.streaming()` is thus required to load data into memory by batches. |
| |
| |
| ==== getDbStats |
| |
| Equivalent of running the `db.stats()` command in the MongoDB shell, |
| which displays useful statistic figures about the database. + |
| For example: |
| |
| ------------------------------------- |
| > db.stats(); |
| { |
| "db" : "test", |
| "collections" : 7, |
| "objects" : 719, |
| "avgObjSize" : 59.73296244784423, |
| "dataSize" : 42948, |
| "storageSize" : 1000058880, |
| "numExtents" : 9, |
| "indexes" : 4, |
| "indexSize" : 32704, |
| "fileSize" : 1275068416, |
| "nsSizeMB" : 16, |
| "ok" : 1 |
| } |
| ------------------------------------- |
| |
| Usage example: |
| |
| [source,java] |
| --------------------------------------------------------------------------------------------------------- |
| // from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats"); |
| Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); |
| assertTrue("Result is not of type Document", result instanceof Document); |
| --------------------------------------------------------------------------------------------------------- |
| |
| The operation will return a data structure similar to the one displayed |
| in the shell, in the form of a `Document` in the OUT message body. |
| |
| ==== getColStats |
| |
| Equivalent of running the `db.collection.stats()` command in the MongoDB |
| shell, which displays useful statistic figures about the collection. + |
| For example: |
| |
| ----------------------------- |
| > db.camelTest.stats(); |
| { |
| "ns" : "test.camelTest", |
| "count" : 100, |
| "size" : 5792, |
| "avgObjSize" : 57.92, |
| "storageSize" : 20480, |
| "numExtents" : 2, |
| "nindexes" : 1, |
| "lastExtentSize" : 16384, |
| "paddingFactor" : 1, |
| "flags" : 1, |
| "totalIndexSize" : 8176, |
| "indexSizes" : { |
| "_id_" : 8176 |
| }, |
| "ok" : 1 |
| } |
| ----------------------------- |
| |
| Usage example: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------------- |
| // from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats"); |
| Object result = template.requestBody("direct:getColStats", "irrelevantBody"); |
| assertTrue("Result is not of type Document", result instanceof Document); |
| ----------------------------------------------------------------------------------------------------------- |
| |
| The operation will return a data structure similar to the one displayed |
| in the shell, in the form of a `Document` in the OUT message body. |
| |
| ==== command |
| |
| Run the body as a command on database. Useful for admin operation as |
| getting host information, replication or sharding status. |
| |
| Collection parameter is not use for this operation. |
| |
| [source,java] |
| -------------------------------------------------------------------------------- |
| // route: from("command").to("mongodb:myDb?database=science&operation=command"); |
| DBObject commandBody = new BasicDBObject("hostInfo", "1"); |
| Object result = template.requestBody("direct:command", commandBody); |
| -------------------------------------------------------------------------------- |
| |
| === Dynamic operations |
| |
| An Exchange can override the endpoint's fixed operation by setting the |
| `CamelMongoDbOperation` header, defined by the |
| `MongoDbConstants.OPERATION_HEADER` constant. + |
| The values supported are determined by the MongoDbOperation enumeration |
| and match the accepted values for the `operation` parameter on the |
| endpoint URI. |
| |
| For example: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------------------------------- |
| // from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); |
| Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count"); |
| assertTrue("Result is not of type Long", result instanceof Long); |
| ----------------------------------------------------------------------------------------------------------------------------- |
| |
| == Consumers |
| There are several types of consumers: |
| |
| . Tailable Cursor Consumer |
| . Change Streams Consumer |
| |
| === Tailable Cursor Consumer |
| |
| MongoDB offers a mechanism to instantaneously consume ongoing data from |
| a collection, by keeping the cursor open just like the `tail -f` command |
| of *nix systems. This mechanism is significantly more efficient than a |
| scheduled poll, due to the fact that the server pushes new data to the |
| client as it becomes available, rather than making the client ping back |
| at scheduled intervals to fetch new data. It also reduces otherwise |
| redundant network traffic. |
| |
| There is only one requisite to use tailable cursors: the collection must |
| be a "capped collection", meaning that it will only hold N objects, and |
| when the limit is reached, MongoDB flushes old objects in the same order |
| they were originally inserted. For more information, please refer to: |
| http://www.mongodb.org/display/DOCS/Tailable+Cursors[http://www.mongodb.org/display/DOCS/Tailable+Cursors]. |
| |
| The Camel MongoDB component implements a tailable cursor consumer, |
| making this feature available for you to use in your Camel routes. As |
| new objects are inserted, MongoDB will push them as `Document` in natural |
| order to your tailable cursor consumer, who will transform them to an |
| Exchange and will trigger your route logic. |
| |
| == How the tailable cursor consumer works |
| |
| To turn a cursor into a tailable cursor, a few special flags are to be |
| signalled to MongoDB when first generating the cursor. Once created, the |
| cursor will then stay open and will block upon calling the |
| `MongoCursor.next()` method until new data arrives. However, the MongoDB |
| server reserves itself the right to kill your cursor if new data doesn't |
| appear after an indeterminate period. If you are interested to continue |
| consuming new data, you have to regenerate the cursor. And to do so, you |
| will have to remember the position where you left off or else you will |
| start consuming from the top again. |
| |
| The Camel MongoDB tailable cursor consumer takes care of all these tasks |
| for you. You will just need to provide the key to some field in your |
| data of increasing nature, which will act as a marker to position your |
| cursor every time it is regenerated, e.g. a timestamp, a sequential ID, |
| etc. It can be of any datatype supported by MongoDB. Date, Strings and |
| Integers are found to work well. We call this mechanism "tail tracking" |
| in the context of this component. |
| |
| The consumer will remember the last value of this field and whenever the |
| cursor is to be regenerated, it will run the query with a filter like: |
| `increasingField > lastValue`, so that only unread data is consumed. |
| |
| *Setting the increasing field:* Set the key of the increasing field on |
| the endpoint URI `tailTrackingIncreasingField` option. In Camel 2.10, it |
| must be a top-level field in your data, as nested navigation for this |
| field is not yet supported. That is, the "timestamp" field is okay, but |
| "nested.timestamp" will not work. Please open a ticket in the Camel JIRA |
| if you do require support for nested increasing fields. |
| |
| *Cursor regeneration delay:* One thing to note is that if new data is |
| not already available upon initialisation, MongoDB will kill the cursor |
| instantly. Since we don't want to overwhelm the server in this case, a |
| `cursorRegenerationDelay` option has been introduced (with a default |
| value of 1000ms.), which you can modify to suit your needs. |
| |
| An example: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------- |
| from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") |
| .id("tailableCursorConsumer1") |
| .autoStartup(false) |
| .to("mock:test"); |
| ----------------------------------------------------------------------------------------------------- |
| |
| The above route will consume from the "flights.cancellations" capped |
| collection, using "departureTime" as the increasing field, with a |
| default regeneration cursor delay of 1000ms. |
| |
| == Persistent tail tracking |
| |
| Standard tail tracking is volatile and the last value is only kept in |
| memory. However, in practice you will need to restart your Camel |
| container every now and then, but your last value would then be lost and |
| your tailable cursor consumer would start consuming from the top again, |
| very likely sending duplicate records into your route. |
| |
| To overcome this situation, you can enable the *persistent tail |
| tracking* feature to keep track of the last consumed increasing value in |
| a special collection inside your MongoDB database too. When the consumer |
| initialises again, it will restore the last tracked value and continue |
| as if nothing happened. |
| |
| The last read value is persisted on two occasions: every time the cursor |
| is regenerated and when the consumer shuts down. We may consider |
| persisting at regular intervals too in the future (flush every 5 |
| seconds) for added robustness if the demand is there. To request this |
| feature, please open a ticket in the Camel JIRA. |
| |
| == Enabling persistent tail tracking |
| |
| To enable this function, set at least the following options on the |
| endpoint URI: |
| |
| * `persistentTailTracking` option to `true` |
| * `persistentId` option to a unique identifier for this consumer, so |
| that the same collection can be reused across many consumers |
| |
| Additionally, you can set the `tailTrackDb`, `tailTrackCollection` and |
| `tailTrackField` options to customise where the runtime information will |
| be stored. Refer to the endpoint options table at the top of this page |
| for descriptions of each option. |
| |
| For example, the following route will consume from the |
| "flights.cancellations" capped collection, using "departureTime" as the |
| increasing field, with a default regeneration cursor delay of 1000ms, |
| with persistent tail tracking turned on, and persisting under the |
| "cancellationsTracker" id on the "flights.camelTailTracking", storing |
| the last processed value under the "lastTrackingValue" field |
| (`camelTailTracking` and `lastTrackingValue` are defaults). |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------------------------------------- |
| from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + |
| "&persistentId=cancellationsTracker") |
| .id("tailableCursorConsumer2") |
| .autoStartup(false) |
| .to("mock:test"); |
| ----------------------------------------------------------------------------------------------------------------------------------- |
| |
| Below is another example identical to the one above, but where the |
| persistent tail tracking runtime information will be stored under the |
| "trackers.camelTrackers" collection, in the "lastProcessedDepartureTime" |
| field: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------------------------------------- |
| from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + |
| "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + |
| "&tailTrackField=lastProcessedDepartureTime") |
| .id("tailableCursorConsumer3") |
| .autoStartup(false) |
| .to("mock:test"); |
| ----------------------------------------------------------------------------------------------------------------------------------- |
| |
| === Change Streams Consumer |
| |
| Change Streams allow applications to access real-time data changes without the complexity and risk of tailing the MongoDB oplog. |
| Applications can use change streams to subscribe to all data changes on a collection and immediately react to them. |
| Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will. |
| The exchange body will contain the full document of any change. |
| |
| To configure Change Streams Consumer you need to specify `consumerType`, `database`, `collection` |
| and optional JSON property `streamFilter` to filter events. |
| That JSON property is standard MongoDB `$match` aggregation. |
| It could be easily specified using XML DSL configuration: |
| |
| [source,xml] |
| ------------- |
| <route id="filterConsumer"> |
| <from uri="mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }"/> |
| <to uri="mock:test"/> |
| </route> |
| ------------- |
| |
| Java configuration: |
| [source,java] |
| ------------- |
| from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }") |
| .to("mock:test"); |
| ------------- |
| |
| TIP: You can externalize the streamFilter value into a property placeholder which allows the endpoint |
| URI parameters to be _cleaner_ and easier to read. |
| |
| == Type conversions |
| |
| The `MongoDbBasicConverters` type converter included with the |
| camel-mongodb component provides the following conversions: |
| |
| [width="100%",cols="10%,10%,10%,70%",options="header",] |
| |======================================================================= |
| |Name |From type |To type |How? |
| |fromMapToDocument |`Map` |`Document` |constructs a new `Document` via the `new Document(Map m)` |
| constructor. |
| |fromDocumentToMap |`Document` |`Map` |`Document` already implements `Map`. |
| |fromStringToDocument |`String` |`Document` |uses `com.mongodb.Document.parse(String s)`. |
| |fromStringToObjectId |`String` |`ObjectId` |constructs a new `ObjectId` via the `new ObjectId(s)` |
| |fromFileToDocument|`File` |`Document` |uses `fromInputStreamToDocument` under the hood |
| |fromInputStreamToDocument|`InputStream` |`Document` |converts the inputstream bytes to a `Document` |
| |fromStringToList |`String` |`List<Bson>` |uses `org.bson.codecs.configuration.CodecRegistries` to convert to BsonArray then to List<Bson>. |
| |======================================================================= |
| |
| This type converter is auto-discovered, so you don't need to configure anything manually. |
| |
| |
| |
| include::spring-boot:partial$starter.adoc[] |