blob: 6aebbb4ab357bcef7e8381086bf5264676d44958 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.mongodb;
import java.util.Calendar;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
public class MongoDbTailableCursorConsumerTest extends AbstractMongoDbTest {
private DBCollection cappedTestCollection;
private String cappedTestCollectionName;
@Test
public void testThousandRecordsWithoutReadPreference() throws Exception {
testThousandRecordsWithRouteId("tailableCursorConsumer1");
}
@Test
public void testThousandRecordsWithReadPreference() throws Exception {
testThousandRecordsWithRouteId("tailableCursorConsumer1.readPreference");
}
@Test
public void testNoRecords() throws Exception {
assertEquals(0, cappedTestCollection.count());
MockEndpoint mock = getMockEndpoint("mock:test");
mock.expectedMessageCount(0);
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
assertEquals(0, cappedTestCollection.count());
addTestRoutes();
context.startRoute("tailableCursorConsumer1");
Thread.sleep(1000);
mock.assertIsSatisfied();
context.stopRoute("tailableCursorConsumer1");
}
@Test
public void testMultipleBursts() throws Exception {
assertEquals(0, cappedTestCollection.count());
MockEndpoint mock = getMockEndpoint("mock:test");
mock.expectedMessageCount(5000);
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
addTestRoutes();
context.startRoute("tailableCursorConsumer1");
// pump 5 bursts of 1000 records each with 500ms pause between burst and burst
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5000; i++) {
if (i % 1000 == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
return;
}
}
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we assert, wait for the data pumping to end
t.join();
mock.assertIsSatisfied();
context.stopRoute("tailableCursorConsumer1");
}
@Test
public void testHundredThousandRecords() throws Exception {
assertEquals(0, cappedTestCollection.count());
final MockEndpoint mock = getMockEndpoint("mock:test");
mock.expectedMessageCount(1000);
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
addTestRoutes();
context.startRoute("tailableCursorConsumer1");
// continuous pump of 100000 records, asserting incrementally to reduce overhead on the mock endpoint
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100000; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
// incrementally assert, as the mock endpoint stores all messages and otherwise the test would be sluggish
if (i % 1000 == 0) {
try {
MongoDbTailableCursorConsumerTest.this.assertAndResetMockEndpoint(mock);
} catch (Exception e) {
return;
}
}
}
}
});
// start the data pumping
t.start();
// before we stop the route, wait for the data pumping to end
t.join();
context.stopRoute("tailableCursorConsumer1");
}
@Test
public void testPersistentTailTrack() throws Exception {
assertEquals(0, cappedTestCollection.count());
final MockEndpoint mock = getMockEndpoint("mock:test");
// drop the tracking collection
db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).drop();
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
cappedTestCollection.createIndex("increasing");
addTestRoutes();
context.startRoute("tailableCursorConsumer2");
mock.expectedMessageCount(300);
// pump 300 records
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 300; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
mock.reset();
context.stopRoute("tailableCursorConsumer2");
while (context.getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) { }
context.startRoute("tailableCursorConsumer2");
// expect 300 messages and not 600
mock.expectedMessageCount(300);
// pump 300 records
t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 301; i <= 600; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
// check that the first message received in this second batch corresponds to increasing=301
Object firstBody = mock.getExchanges().get(0).getIn().getBody();
assertTrue(firstBody instanceof DBObject);
assertEquals(301, ((DBObject) firstBody).get("increasing"));
// check that the lastVal is persisted at the right time: check before and after stopping the route
assertEquals(300, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).findOne(new BasicDBObject("persistentId", "darwin")).get("lastTrackingValue"));
// stop the route and verify the last value has been updated
context.stopRoute("tailableCursorConsumer2");
while (context.getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) { }
assertEquals(600, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).findOne(new BasicDBObject("persistentId", "darwin")).get("lastTrackingValue"));
}
@Test
public void testPersistentTailTrackIncreasingDateField() throws Exception {
assertEquals(0, cappedTestCollection.count());
final MockEndpoint mock = getMockEndpoint("mock:test");
final Calendar startTimestamp = Calendar.getInstance();
// get default tracking collection
DBCollection trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION);
trackingCol.drop();
trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION);
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
addTestRoutes();
context.startRoute("tailableCursorConsumer2");
mock.expectedMessageCount(300);
// pump 300 records
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 300; i++) {
Calendar c = (Calendar) (startTimestamp.clone());
c.add(Calendar.MINUTE, i);
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", c.getTime()).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
mock.reset();
// ensure that the persisted lastVal is startTimestamp + 300min
Calendar cal300 = (Calendar) startTimestamp.clone();
cal300.add(Calendar.MINUTE, 300);
context.stopRoute("tailableCursorConsumer2");
assertEquals(cal300.getTime(), trackingCol.findOne(new BasicDBObject("persistentId", "darwin")).get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
context.startRoute("tailableCursorConsumer2");
// expect 300 messages and not 600
mock.expectedMessageCount(300);
// pump 300 records
t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 301; i <= 600; i++) {
Calendar c = (Calendar) (startTimestamp.clone());
c.add(Calendar.MINUTE, i);
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", c.getTime()).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
Object firstBody = mock.getExchanges().get(0).getIn().getBody();
assertTrue(firstBody instanceof DBObject);
Calendar cal301 = (Calendar) startTimestamp.clone();
cal301.add(Calendar.MINUTE, 301);
assertEquals(cal301.getTime(), ((DBObject) firstBody).get("increasing"));
// check that the persisted lastVal after stopping the route is startTimestamp + 600min
context.stopRoute("tailableCursorConsumer2");
Calendar cal600 = (Calendar) startTimestamp.clone();
cal600.add(Calendar.MINUTE, 600);
assertEquals(cal600.getTime(), trackingCol.findOne(new BasicDBObject("persistentId", "darwin")).get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
}
@Test
public void testCustomTailTrackLocation() throws Exception {
assertEquals(0, cappedTestCollection.count());
final MockEndpoint mock = getMockEndpoint("mock:test");
// get the custom tracking collection and drop it (tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton)
DBCollection trackingCol = mongo.getDB("einstein").getCollection("curie");
trackingCol.drop();
trackingCol = mongo.getDB("einstein").getCollection("curie");
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
addTestRoutes();
context.startRoute("tailableCursorConsumer3");
mock.expectedMessageCount(300);
// pump 300 records
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 300; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
mock.reset();
// stop the route to ensure that our lastVal is persisted, and check it
context.stopRoute("tailableCursorConsumer3");
// ensure that the persisted lastVal is 300, newton is the name of the trackingField we are using
assertEquals(300, trackingCol.findOne(new BasicDBObject("persistentId", "darwin")).get("newton"));
context.startRoute("tailableCursorConsumer3");
// expect 300 messages and not 600
mock.expectedMessageCount(300);
// pump 300 records
t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 301; i <= 600; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
}
});
// start the data pumping
t.start();
// before we continue wait for the data pump to end
t.join();
mock.assertIsSatisfied();
// check that the first received body contains increasing=301 and not increasing=1, i.e. it's not starting from the top
Object firstBody = mock.getExchanges().get(0).getIn().getBody();
assertTrue(firstBody instanceof DBObject);
assertEquals(301, ((DBObject) firstBody).get("increasing"));
// check that the persisted lastVal after stopping the route is 600, newton is the name of the trackingField we are using
context.stopRoute("tailableCursorConsumer3");
assertEquals(600, trackingCol.findOne(new BasicDBObject("persistentId", "darwin")).get("newton"));
}
public void assertAndResetMockEndpoint(MockEndpoint mock) throws Exception {
mock.assertIsSatisfied();
mock.reset();
}
private void testThousandRecordsWithRouteId(String routeId) throws Exception {
assertEquals(0, cappedTestCollection.count());
MockEndpoint mock = getMockEndpoint("mock:test");
mock.expectedMessageCount(1000);
// create a capped collection with max = 1000
cappedTestCollection = db.createCollection(cappedTestCollectionName,
BasicDBObjectBuilder.start().add("capped", true).add("size", 1000000000).add("max", 1000).get());
for (int i = 0; i < 1000; i++) {
cappedTestCollection.insert(BasicDBObjectBuilder.start("increasing", i).add("string", "value" + i).get(), WriteConcern.SAFE);
}
assertEquals(1000, cappedTestCollection.count());
addTestRoutes();
context.startRoute(routeId);
Thread.sleep(1000);
mock.assertIsSatisfied();
context.stopRoute(routeId);
}
@Override
public void doPostSetup() {
super.doPostSetup();
// drop the capped collection and let each test create what it needs
cappedTestCollectionName = "camelTestCapped";
cappedTestCollection = db.getCollection(cappedTestCollectionName);
cappedTestCollection.drop();
}
protected void addTestRoutes() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing")
.id("tailableCursorConsumer1")
.autoStartup(false)
.to("mock:test");
from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin")
.id("tailableCursorConsumer2")
.autoStartup(false)
.to("mock:test");
from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&"
+ "persistentTailTracking=true&persistentId=darwin&tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton")
.id("tailableCursorConsumer3")
.autoStartup(false)
.to("mock:test");
from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&readPreference=primary")
.id("tailableCursorConsumer1.readPreference")
.autoStartup(false)
.to("mock:test");
}
});
}
}