blob: f9df1a90c84e43ba7b82595063adc835a33f099b [file] [log] [blame]
= MongoDB Component
:doctitle: MongoDB
:shortname: mongodb
:artifactid: camel-mongodb
:description: Perform operations on MongoDB documents and collections.
:since: 2.19
:supportlevel: Stable
:tabs-sync-option:
:component-header: Both producer and consumer are supported
//Manually maintained attributes
:camel-spring-boot-name: mongodb
*Since Camel {since}*
*{component-header}*
According to Wikipedia: "NoSQL is a movement promoting a loosely defined
class of non-relational data stores that break with a long history of
relational databases and ACID guarantees." NoSQL solutions have grown in
popularity in the last few years, and major extremely-used sites and
services such as Facebook, LinkedIn, Twitter, etc. are known to use them
extensively to achieve scalability and agility.
Basically, NoSQL solutions differ from traditional RDBMS (Relational
Database Management Systems) in that they don't use SQL as their query
language and generally don't offer ACID-like transactional behaviour nor
relational data. Instead, they are designed around the concept of
flexible data structures and schemas (meaning that the traditional
concept of a database table with a fixed schema is dropped), extreme
scalability on commodity hardware and blazing-fast processing.
MongoDB is a very popular NoSQL solution and the camel-mongodb component
integrates Camel with MongoDB allowing you to interact with MongoDB
collections both as a producer (performing operations on the collection)
and as a consumer (consuming documents from a MongoDB collection).
MongoDB revolves around the concepts of documents (not as is office
documents, but rather hierarchical data defined in JSON/BSON) and
collections. This component page will assume you are familiar with them.
Otherwise, visit http://www.mongodb.org/[http://www.mongodb.org/].
[NOTE]
====
The MongoDB Camel component uses Mongo Java Driver 4.x.
====
Maven users will need to add the following dependency to their `pom.xml`
for this component:
[source,xml]
------------------------------------------------------------
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mongodb</artifactId>
<version>x.y.z</version>
<!-- use the same version as your Camel core version -->
</dependency>
------------------------------------------------------------
== URI formats
---------------------------------------------------------------------------------------------------------------
mongodb:connectionBean?database=databaseName&collection=collectionName&operation=operationName[&moreOptions...]
mongodb:dummy?hosts=hostnames&database=databaseName&collection=collectionName&operation=operationName[&moreOptions...]
---------------------------------------------------------------------------------------------------------------
// component-configure options: START
// component-configure options: END
// component options: START
include::partial$component-configure-options.adoc[]
include::partial$component-endpoint-options.adoc[]
// component options: END
// endpoint options: START
// endpoint options: END
// component headers: START
include::partial$component-endpoint-headers.adoc[]
// component headers: END
[[MongoDB-ConfigurationofdatabaseinSpringXML]]
== Configuration of database in Spring XML
The following Spring XML creates a bean defining the connection to a
MongoDB instance.
Since mongo java driver 3, the WriteConcern and readPreference options are not dynamically modifiable. They are defined in the mongoClient object
[source,xml]
----------------------------------------------------------------------------------------------------------------------------------
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mongo="http://www.springframework.org/schema/data/mongo"
xsi:schemaLocation="http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/data/mongo
http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<mongo:mongo-client id="mongoBean" host="${mongo.url}" port="${mongo.port}" credentials="${mongo.user}:${mongo.pass}@${mongo.dbname}">
<mongo:client-options write-concern="NORMAL" />
</mongo:mongo-client>
</beans>
----------------------------------------------------------------------------------------------------------------------------------
== Sample route
The following route defined in Spring XML executes the operation
<<getDbStats>> on a collection.
*Get DB stats for specified collection*
[source,xml]
---------------------------------------------------------------------------------------------------------------------------
<route>
<from uri="direct:start" />
<!-- using bean 'mongoBean' defined above -->
<to uri="mongodb:mongoBean?database=${mongodb.database}&amp;collection=${mongodb.collection}&amp;operation=getDbStats" />
<to uri="direct:result" />
</route>
---------------------------------------------------------------------------------------------------------------------------
== MongoDB operations - producer endpoints
=== Query operations
==== findById
This operation retrieves only one element from the collection whose _id
field matches the content of the IN message body. The incoming object
can be anything that has an equivalent to a `Bson` type. See
http://bsonspec.org/spec.html[http://bsonspec.org/spec.html]
and
http://www.mongodb.org/display/DOCS/Java+Types[http://www.mongodb.org/display/DOCS/Java+Types].
[source,java]
------------------------------------------------------------------------------
from("direct:findById")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findById")
.to("mock:resultFindById");
------------------------------------------------------------------------------
Please, note that the default _id is treated by Mongo as and `ObjectId` type, so you may need to convert it properly.
[source,java]
------------------------------------------------------------------------------
from("direct:findById")
.convertBodyTo(ObjectId.class)
.to("mongodb:myDb?database=flights&collection=tickets&operation=findById")
.to("mock:resultFindById");
------------------------------------------------------------------------------
[TIP]
====
*Supports optional parameters*
This operation supports projection operators.
See <<Specifying a fields filter (projection)>>.
====
==== findOneByQuery
Retrieve the first element from a collection matching a MongoDB query selector.
*If the `CamelMongoDbCriteria` header is set, then its value is used as the query selector*.
If the `CamelMongoDbCriteria` header is _null_, then the IN message body is used as the query
selector. In both cases, the query selector should be of type `Bson` or convertible to
`Bson` (for instance, a JSON string or `HashMap`). See <<Type conversions>> for more info.
Create query selectors using the `Filters` provided by the MongoDB Driver.
===== Example without a query selector (returns the first document in a collection)
[source,java]
------------------------------------------------------------------------------------
from("direct:findOneByQuery")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
.to("mock:resultFindOneByQuery");
------------------------------------------------------------------------------------
===== Example with a query selector (returns the first matching document in a collection):
[source,java]
------------------------------------------------------------------------------------
from("direct:findOneByQuery")
.setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani")))
.to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
.to("mock:resultFindOneByQuery");
------------------------------------------------------------------------------------
[TIP]
====
*Supports optional parameters*
This operation supports projection operators and sort clauses.
See <<Specifying a fields filter (projection)>>, <<Specifying a sort clause>>.
====
==== findAll
The `findAll` operation returns all documents matching a query, or none
at all, in which case all documents contained in the collection are
returned. *The query object is extracted `CamelMongoDbCriteria` header*.
if the CamelMongoDbCriteria header is null the query object is extracted
message body, i.e. it should be of type `Bson` or convertible to `Bson`.
It can be a JSON String or a Hashmap.
See <<Type conversions>> for more info.
===== Example without a query selector (returns all documents in a collection)
[source,java]
-----------------------------------------------------------------------------
from("direct:findAll")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
.to("mock:resultFindAll");
-----------------------------------------------------------------------------
===== Example with a query selector (returns all matching documents in a collection)
[source,java]
-----------------------------------------------------------------------------
from("direct:findAll")
.setHeader(MongoDbConstants.CRITERIA, Filters.eq("name", "Raul Kripalani"))
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
.to("mock:resultFindAll");
-----------------------------------------------------------------------------
===== Example with option _outputType=MongoIterable_ and batch size
[source,java]
-----------------------------------------------------------------------------
from("direct:findAll")
.setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
.setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani")))
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=MongoIterable")
.to("mock:resultFindAll");
-----------------------------------------------------------------------------
[TIP]
====
*Supports optional parameters*
This operation supports projection operators and sort clauses.
See <<Specifying a fields filter (projection)>>, <<Specifying a sort clause>>.
====
==== count
Returns the total number of objects in a collection, returning a Long as
the OUT message body. +
The following example will count the number of records in the
"dynamicCollectionName" collection. Notice how dynamicity is enabled,
and as a result, the operation will not run against the
"notableScientists" collection, but against the "dynamicCollectionName"
collection.
[source,java]
------------------------------------------------------------------------------------------------------------------------------------
from("direct:count")
.to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true");
Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName");
assertTrue("Result is not of type Long", result instanceof Long);
------------------------------------------------------------------------------------------------------------------------------------
You can provide a query
*The query object is extracted `CamelMongoDbCriteria` header*.
if the CamelMongoDbCriteria header is null the query object is extracted
message body, i.e. it should be of type `Bson` or convertible to
`Bson`., and
operation will return the amount of documents matching this criteria.
[source,java]
------------------------------------------------------------------------------------------------------------------------
Document query = ...
Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName");
------------------------------------------------------------------------------------------------------------------------
==== Specifying a fields filter (projection)
Query operations will, by default, return the matching objects in their
entirety (with all their fields). If your documents are large and you
only require retrieving a subset of their fields, you can specify a
field filter in all query operations, simply by setting the relevant
`Bson` (or type convertible to `Bson`, such as a JSON String,
Map, etc.) on the `CamelMongoDbFieldsProjection` header, constant shortcut:
`MongoDbConstants.FIELDS_PROJECTION`.
Here is an example that uses MongoDB's `Projections` to simplify
the creation of Bson. It retrieves all fields except `_id` and
`boringField`:
[source,java]
----------------------------------------------------------------------------------------------------------------------------
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson fieldProjection = Projection.exclude("_id", "boringField");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);
----------------------------------------------------------------------------------------------------------------------------
Here is an example that uses MongoDB's `Projections` to simplify
the creation of Bson. It retrieves all fields except `_id` and
`boringField`:
[source,java]
----------------------------------------------------------------------------------------------------------------------------
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson fieldProjection = Projection.exclude("_id", "boringField");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);
----------------------------------------------------------------------------------------------------------------------------
==== Specifying a sort clause
There is a often a requirement to fetch the min/max record from a
collection based on sorting by a particular field
that uses MongoDB's `Sorts` to simplify
the creation of Bson. It retrieves all fields except `_id` and
`boringField`:
[source,java]
----------------------------------------------------------------------------------------------------------------------------
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson sorts = Sorts.descending("_id");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.SORT_BY, sorts);
----------------------------------------------------------------------------------------------------------------------------
In a Camel route the SORT_BY header can be used with the findOneByQuery
operation to achieve the same result. If the FIELDS_PROJECTION header is also
specified the operation will return a single field/value pair
that can be passed directly to another component (for example, a
parameterized MyBatis SELECT query). This example demonstrates fetching
the temporally newest document from a collection and reducing the result
to a single field, based on the `documentTimestamp` field:
[source,java]
----------------------------------------------------------------------------------------------------------------------------
.from("direct:someTriggeringEvent")
.setHeader(MongoDbConstants.SORT_BY).constant(Sorts.descending("documentTimestamp"))
.setHeader(MongoDbConstants.FIELDS_PROJECTION).constant(Projection.include("documentTimestamp"))
.setBody().constant("{}")
.to("mongodb:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery")
.to("direct:aMyBatisParameterizedSelect");
----------------------------------------------------------------------------------------------------------------------------
=== Create/update operations
==== insert
Inserts an new object into the MongoDB collection, taken from the IN
message body. Type conversion is attempted to turn it into `Document` or
a `List`. +
Two modes are supported: single insert and multiple insert. For
multiple insert, the endpoint will expect a List, Array or Collections
of objects of any type, as long as they are - or can be converted to -
`Document`.
Example:
[source,java]
-----------------------------------------------------------------------------
from("direct:insert")
.to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
-----------------------------------------------------------------------------
The operation will return a WriteResult, and depending on the
`WriteConcern` or the value of the `invokeGetLastError` option,
`getLastError()` would have been called already or not. If you want to
access the ultimate result of the write operation, you need to retrieve
the `CommandResult` by calling `getLastError()` or
`getCachedLastError()` on the `WriteResult`. Then you can verify the
result by calling `CommandResult.ok()`,
`CommandResult.getErrorMessage()` and/or `CommandResult.getException()`.
Note that the new object's `_id` must be unique in the collection. If
you don't specify the value, MongoDB will automatically generate one for
you. But if you do specify it and it is not unique, the insert operation
will fail (and for Camel to notice, you will need to enable
invokeGetLastError or set a WriteConcern that waits for the write
result).
This is not a limitation of the component, but it is how things work in
MongoDB for higher throughput. If you are using a custom `_id`, you are
expected to ensure at the application level that is unique (and this is
a good practice too).
OID(s) of the inserted record(s) is stored in the
message header under `CamelMongoOid` key (`MongoDbConstants.OID`
constant). The value stored is `org.bson.types.ObjectId` for single
insert or `java.util.List<org.bson.types.ObjectId>` if multiple records
have been inserted.
In MongoDB Java Driver 3.x the insertOne and insertMany operation return void.
The Camel insert operation return the Document or List of Documents inserted. Note that each Documents are Updated by a new OID if need.
==== save
The save operation is equivalent to an _upsert_ (UPdate, inSERT)
operation, where the record will be updated, and if it doesn't exist, it
will be inserted, all in one atomic operation. MongoDB will perform the
matching based on the `_id` field.
Beware that in case of an update, the object is replaced entirely and
the usage of
http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[MongoDB's
$modifiers] is not permitted. Therefore, if you want to manipulate the
object if it already exists, you have two options:
1. perform a query to retrieve the entire object first along with all
its fields (may not be efficient), alter it inside Camel and then save
it.
2. use the update operation with
http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[$modifiers],
which will execute the update at the server-side instead. You can enable
the upsert flag, in which case if an insert is required, MongoDB will
apply the $modifiers to the filter query object and insert the result.
If the document to be saved does not contain the `_id` attribute, the operation will be an insert, and the new `_id` created will be placed in the `CamelMongoOid` header.
For example:
[source,java]
---------------------------------------------------------------------------
from("direct:insert")
.to("mongodb:myDb?database=flights&collection=tickets&operation=save");
---------------------------------------------------------------------------
[source,java]
------------------------------------------------------------------------------------------------------------------------------------------
// route: from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=save");
org.bson.Document docForSave = new org.bson.Document();
docForSave.put("key", "value");
Object result = template.requestBody("direct:insert", docForSave);
------------------------------------------------------------------------------------------------------------------------------------------
==== update
Update one or multiple records on the collection. Requires a filter query and
a update rules.
You can define the filter using MongoDBConstants.CRITERIA header as `Bson`
and define the update rules as `Bson` in Body.
[NOTE]
====
*Update after enrich*
While defining the filter by using MongoDBConstants.CRITERIA header as `Bson`
to query mongodb before you do update, you should notice you need to remove it from the resulting camel exchange
during aggregation if you use enrich pattern with a aggregation strategy and then apply mongodb update.
If you don't remove this header during aggregation and/or redefine MongoDBConstants.CRITERIA header before sending
camel exchange to mongodb producer endpoint, you may end up with invalid camel exchange payload while updating mongodb.
====
The second way Require a
List<Bson> as the IN message body containing exactly 2 elements:
* Element 1 (index 0) => filter query => determines what objects will be
affected, same as a typical query object
* Element 2 (index 1) => update rules => how matched objects will be
updated. All
http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations[modifier
operations] from MongoDB are supported.
[NOTE]
====
*Multiupdates*
By default, MongoDB will only update 1 object even if multiple objects
match the filter query. To instruct MongoDB to update *all* matching
records, set the `CamelMongoDbMultiUpdate` IN message header to `true`.
====
A header with key `CamelMongoDbRecordsAffected` will be returned
(`MongoDbConstants.RECORDS_AFFECTED` constant) with the number of
records updated (copied from `WriteResult.getN()`).
For example, the following will update *all* records whose filterField
field equals true by setting the value of the "scientist" field to
"Darwin":
[source,java]
------------------------------------------------------------------------------------------------------------------------------------------
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
List<Bson> body = new ArrayList<>();
Bson filterField = Filters.eq("filterField", true);
body.add(filterField);
BsonDocument updateObj = new BsonDocument().append("$set", new BsonDocument("scientist", new BsonString("Darwin")));
body.add(updateObj);
Object result = template.requestBodyAndHeader("direct:update", body, MongoDbConstants.MULTIUPDATE, true);
------------------------------------------------------------------------------------------------------------------------------------------
[source,java]
------------------------------------------------------------------------------------------------------------------------------------------
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
Maps<String, Object> headers = new HashMap<>(2);
headers.add(MongoDbConstants.MULTIUPDATE, true);
headers.add(MongoDbConstants.FIELDS_FILTER, Filters.eq("filterField", true));
String updateObj = Updates.set("scientist", "Darwin");;
Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers);
------------------------------------------------------------------------------------------------------------------------------------------
[source,java]
------------------------------------------------------------------------------------------------------------------------------------------
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
String updateObj = "[{\"filterField\": true}, {\"$set\", {\"scientist\", \"Darwin\"}}]";
Object result = template.requestBodyAndHeader("direct:update", updateObj, MongoDbConstants.MULTIUPDATE, true);
------------------------------------------------------------------------------------------------------------------------------------------
=== Delete operations
==== remove
Remove matching records from the collection. The IN message body will
act as the removal filter query, and is expected to be of type
`DBObject` or a type convertible to it. +
The following example will remove all objects whose field
'conditionField' equals true, in the science database, notableScientists
collection:
[source,java]
------------------------------------------------------------------------------------------------------------------
// route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove");
Bson conditionField = Filters.eq("conditionField", true);
Object result = template.requestBody("direct:remove", conditionField);
------------------------------------------------------------------------------------------------------------------
A header with key `CamelMongoDbRecordsAffected` is returned
(`MongoDbConstants.RECORDS_AFFECTED` constant) with type `int`,
containing the number of records deleted (copied from
`WriteResult.getN()`).
=== Bulk Write Operations
==== bulkWrite
Performs write operations in bulk with controls for order of execution.
Requires a `List<WriteModel<Document>>` as the IN message body containing commands for insert, update, and delete operations.
The following example will insert a new scientist "Pierre Curie", update record with id "5" by setting the value of the "scientist" field to
"Marie Curie" and delete record with id "3" :
[source,java]
------------------------------------------------------------------------------------------------------------------
// route: from("direct:bulkWrite").to("mongodb:myDb?database=science&collection=notableScientists&operation=bulkWrite");
List<WriteModel<Document>> bulkOperations = Arrays.asList(
new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
new UpdateOneModel<>(new Document("_id", "5"),
new Document("$set", new Document("scientist", "Marie Curie"))),
new DeleteOneModel<>(new Document("_id", "3")));
BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
------------------------------------------------------------------------------------------------------------------
By default, operations are executed in order and interrupted on the first write error without processing any remaining write operations in the list.
To instruct MongoDB to continue to process remaining write operations in the list, set the `CamelMongoDbBulkOrdered` IN message header to `false`.
Unordered operations are executed in parallel and this behavior is not guaranteed.
=== Other operations
==== aggregate
Perform a aggregation with the given pipeline contained in the
body.
*Aggregations could be long and heavy operations. Use with care.*
[source,java]
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
group("$scientist", sum("count", 1)));
from("direct:aggregate")
.setBody().constant(aggregate)
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate")
.to("mock:resultAggregate");
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
By default, a List of all results is returned. This can be heavy on memory depending on the size of the results. A safer alternative is to set your
outputType=MongoIterable. The next Processor will see an iterable in the message body allowing it to step through the results one by one. Thus setting
a batch size and returning an iterable allows for efficient retrieval and processing of the result.
An example would look like:
[source,java]
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
group("$scientist", sum("count", 1)));
from("direct:aggregate")
.setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
.setBody().constant(aggregate)
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable")
.split(body())
.streaming()
.to("mock:resultAggregate");
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Note that calling `.split(body())` is enough to send the entries down the route one-by-one, however it would still load all the entries into memory first.
Calling `.streaming()` is thus required to load data into memory by batches.
==== getDbStats
Equivalent of running the `db.stats()` command in the MongoDB shell,
which displays useful statistic figures about the database. +
For example:
-------------------------------------
> db.stats();
{
"db" : "test",
"collections" : 7,
"objects" : 719,
"avgObjSize" : 59.73296244784423,
"dataSize" : 42948,
"storageSize" : 1000058880,
"numExtents" : 9,
"indexes" : 4,
"indexSize" : 32704,
"fileSize" : 1275068416,
"nsSizeMB" : 16,
"ok" : 1
}
-------------------------------------
Usage example:
[source,java]
---------------------------------------------------------------------------------------------------------
// from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats");
Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
assertTrue("Result is not of type Document", result instanceof Document);
---------------------------------------------------------------------------------------------------------
The operation will return a data structure similar to the one displayed
in the shell, in the form of a `Document` in the OUT message body.
==== getColStats
Equivalent of running the `db.collection.stats()` command in the MongoDB
shell, which displays useful statistic figures about the collection. +
For example:
-----------------------------
> db.camelTest.stats();
{
"ns" : "test.camelTest",
"count" : 100,
"size" : 5792,
"avgObjSize" : 57.92,
"storageSize" : 20480,
"numExtents" : 2,
"nindexes" : 1,
"lastExtentSize" : 16384,
"paddingFactor" : 1,
"flags" : 1,
"totalIndexSize" : 8176,
"indexSizes" : {
"_id_" : 8176
},
"ok" : 1
}
-----------------------------
Usage example:
[source,java]
-----------------------------------------------------------------------------------------------------------
// from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats");
Object result = template.requestBody("direct:getColStats", "irrelevantBody");
assertTrue("Result is not of type Document", result instanceof Document);
-----------------------------------------------------------------------------------------------------------
The operation will return a data structure similar to the one displayed
in the shell, in the form of a `Document` in the OUT message body.
==== command
Run the body as a command on database. Useful for admin operation as
getting host information, replication or sharding status.
Collection parameter is not use for this operation.
[source,java]
--------------------------------------------------------------------------------
// route: from("command").to("mongodb:myDb?database=science&operation=command");
DBObject commandBody = new BasicDBObject("hostInfo", "1");
Object result = template.requestBody("direct:command", commandBody);
--------------------------------------------------------------------------------
=== Dynamic operations
An Exchange can override the endpoint's fixed operation by setting the
`CamelMongoDbOperation` header, defined by the
`MongoDbConstants.OPERATION_HEADER` constant. +
The values supported are determined by the MongoDbOperation enumeration
and match the accepted values for the `operation` parameter on the
endpoint URI.
For example:
[source,java]
-----------------------------------------------------------------------------------------------------------------------------
// from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count");
assertTrue("Result is not of type Long", result instanceof Long);
-----------------------------------------------------------------------------------------------------------------------------
== Consumers
There are several types of consumers:
. Tailable Cursor Consumer
. Change Streams Consumer
=== Tailable Cursor Consumer
MongoDB offers a mechanism to instantaneously consume ongoing data from
a collection, by keeping the cursor open just like the `tail -f` command
of *nix systems. This mechanism is significantly more efficient than a
scheduled poll, due to the fact that the server pushes new data to the
client as it becomes available, rather than making the client ping back
at scheduled intervals to fetch new data. It also reduces otherwise
redundant network traffic.
There is only one requisite to use tailable cursors: the collection must
be a "capped collection", meaning that it will only hold N objects, and
when the limit is reached, MongoDB flushes old objects in the same order
they were originally inserted. For more information, please refer to:
http://www.mongodb.org/display/DOCS/Tailable+Cursors[http://www.mongodb.org/display/DOCS/Tailable+Cursors].
The Camel MongoDB component implements a tailable cursor consumer,
making this feature available for you to use in your Camel routes. As
new objects are inserted, MongoDB will push them as `Document` in natural
order to your tailable cursor consumer, who will transform them to an
Exchange and will trigger your route logic.
== How the tailable cursor consumer works
To turn a cursor into a tailable cursor, a few special flags are to be
signalled to MongoDB when first generating the cursor. Once created, the
cursor will then stay open and will block upon calling the
`MongoCursor.next()` method until new data arrives. However, the MongoDB
server reserves itself the right to kill your cursor if new data doesn't
appear after an indeterminate period. If you are interested to continue
consuming new data, you have to regenerate the cursor. And to do so, you
will have to remember the position where you left off or else you will
start consuming from the top again.
The Camel MongoDB tailable cursor consumer takes care of all these tasks
for you. You will just need to provide the key to some field in your
data of increasing nature, which will act as a marker to position your
cursor every time it is regenerated, e.g. a timestamp, a sequential ID,
etc. It can be of any datatype supported by MongoDB. Date, Strings and
Integers are found to work well. We call this mechanism "tail tracking"
in the context of this component.
The consumer will remember the last value of this field and whenever the
cursor is to be regenerated, it will run the query with a filter like:
`increasingField > lastValue`, so that only unread data is consumed.
*Setting the increasing field:* Set the key of the increasing field on
the endpoint URI `tailTrackingIncreasingField` option. In Camel 2.10, it
must be a top-level field in your data, as nested navigation for this
field is not yet supported. That is, the "timestamp" field is okay, but
"nested.timestamp" will not work. Please open a ticket in the Camel JIRA
if you do require support for nested increasing fields.
*Cursor regeneration delay:* One thing to note is that if new data is
not already available upon initialisation, MongoDB will kill the cursor
instantly. Since we don't want to overwhelm the server in this case, a
`cursorRegenerationDelay` option has been introduced (with a default
value of 1000ms.), which you can modify to suit your needs.
An example:
[source,java]
-----------------------------------------------------------------------------------------------------
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
.id("tailableCursorConsumer1")
.autoStartup(false)
.to("mock:test");
-----------------------------------------------------------------------------------------------------
The above route will consume from the "flights.cancellations" capped
collection, using "departureTime" as the increasing field, with a
default regeneration cursor delay of 1000ms.
== Persistent tail tracking
Standard tail tracking is volatile and the last value is only kept in
memory. However, in practice you will need to restart your Camel
container every now and then, but your last value would then be lost and
your tailable cursor consumer would start consuming from the top again,
very likely sending duplicate records into your route.
To overcome this situation, you can enable the *persistent tail
tracking* feature to keep track of the last consumed increasing value in
a special collection inside your MongoDB database too. When the consumer
initialises again, it will restore the last tracked value and continue
as if nothing happened.
The last read value is persisted on two occasions: every time the cursor
is regenerated and when the consumer shuts down. We may consider
persisting at regular intervals too in the future (flush every 5
seconds) for added robustness if the demand is there. To request this
feature, please open a ticket in the Camel JIRA.
== Enabling persistent tail tracking
To enable this function, set at least the following options on the
endpoint URI:
* `persistentTailTracking` option to `true`
* `persistentId` option to a unique identifier for this consumer, so
that the same collection can be reused across many consumers
Additionally, you can set the `tailTrackDb`, `tailTrackCollection` and
`tailTrackField` options to customise where the runtime information will
be stored. Refer to the endpoint options table at the top of this page
for descriptions of each option.
For example, the following route will consume from the
"flights.cancellations" capped collection, using "departureTime" as the
increasing field, with a default regeneration cursor delay of 1000ms,
with persistent tail tracking turned on, and persisting under the
"cancellationsTracker" id on the "flights.camelTailTracking", storing
the last processed value under the "lastTrackingValue" field
(`camelTailTracking` and `lastTrackingValue` are defaults).
[source,java]
-----------------------------------------------------------------------------------------------------------------------------------
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
"&persistentId=cancellationsTracker")
.id("tailableCursorConsumer2")
.autoStartup(false)
.to("mock:test");
-----------------------------------------------------------------------------------------------------------------------------------
Below is another example identical to the one above, but where the
persistent tail tracking runtime information will be stored under the
"trackers.camelTrackers" collection, in the "lastProcessedDepartureTime"
field:
[source,java]
-----------------------------------------------------------------------------------------------------------------------------------
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
"&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
"&tailTrackField=lastProcessedDepartureTime")
.id("tailableCursorConsumer3")
.autoStartup(false)
.to("mock:test");
-----------------------------------------------------------------------------------------------------------------------------------
=== Change Streams Consumer
Change Streams allow applications to access real-time data changes without the complexity and risk of tailing the MongoDB oplog.
Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.
Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
The exchange body will contain the full document of any change.
To configure Change Streams Consumer you need to specify `consumerType`, `database`, `collection`
and optional JSON property `streamFilter` to filter events.
That JSON property is standard MongoDB `$match` aggregation.
It could be easily specified using XML DSL configuration:
[source,xml]
-------------
<route id="filterConsumer">
<from uri="mongodb:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets&amp;streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }"/>
<to uri="mock:test"/>
</route>
-------------
Java configuration:
[source,java]
-------------
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }")
.to("mock:test");
-------------
TIP: You can externalize the streamFilter value into a property placeholder which allows the endpoint
URI parameters to be _cleaner_ and easier to read.
== Type conversions
The `MongoDbBasicConverters` type converter included with the
camel-mongodb component provides the following conversions:
[width="100%",cols="10%,10%,10%,70%",options="header",]
|=======================================================================
|Name |From type |To type |How?
|fromMapToDocument |`Map` |`Document` |constructs a new `Document` via the `new Document(Map m)`
constructor.
|fromDocumentToMap |`Document` |`Map` |`Document` already implements `Map`.
|fromStringToDocument |`String` |`Document` |uses `com.mongodb.Document.parse(String s)`.
|fromStringToObjectId |`String` |`ObjectId` |constructs a new `ObjectId` via the `new ObjectId(s)`
|fromFileToDocument|`File` |`Document` |uses `fromInputStreamToDocument` under the hood
|fromInputStreamToDocument|`InputStream` |`Document` |converts the inputstream bytes to a `Document`
|fromStringToList |`String` |`List<Bson>` |uses `org.bson.codecs.configuration.CodecRegistries` to convert to BsonArray then to List<Bson>.
|=======================================================================
This type converter is auto-discovered, so you don't need to configure anything manually.
include::spring-boot:partial$starter.adoc[]