blob: 4ce1348f9694e71b2a4baf1f96d3ed0aa1b6d879 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.mongodb;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
import org.apache.rya.mongodb.batch.collection.DbCollectionType;
import org.apache.rya.mongodb.batch.collection.MongoCollectionType;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.bson.Document;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.mongodb.DBObject;
/**
* Integration tests for the {@link MongoDbBatchWriter}.
*/
public class MongoDBRyaBatchWriterIT extends MongoITBase {
private static void setupLogging() {
BasicConfigurator.configure();
}
@BeforeClass
public static void setUpClass() throws Exception {
setupLogging();
}
@Override
protected void updateConfiguration(final MongoDBRdfConfiguration conf) {
conf.setBoolean("rya.mongodb.dao.flusheachupdate", false);
conf.setInt("rya.mongodb.dao.batchwriter.size", 50_000);
conf.setLong("rya.mongodb.dao.batchwriter.flushtime", 100L);
}
@Test
public void testDuplicateKeys() throws Exception {
final List<RyaStatement> statements = new ArrayList<>();
statements.add(statement(1));
statements.add(statement(2));
statements.add(statement(1));
statements.add(statement(3));
statements.add(statement(1));
statements.add(statement(4));
statements.add(statement(1));
statements.add(statement(5));
statements.add(statement(1));
statements.add(statement(6));
final MongoDBRyaDAO dao = new MongoDBRyaDAO();
try {
dao.setConf(conf);
dao.init();
dao.add(statements.iterator());
dao.flush();
assertEquals(6, getRyaCollection().count());
} finally {
dao.destroy();
}
}
@Test
public void testDbCollectionFlush() throws Exception {
final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
final List<DBObject> objects = Lists.newArrayList(
storageStrategy.serialize(statement(1)),
storageStrategy.serialize(statement(2)),
storageStrategy.serialize(statement(2)),
null,
storageStrategy.serialize(statement(3)),
storageStrategy.serialize(statement(3)),
storageStrategy.serialize(statement(4))
);
final DbCollectionType collectionType = new DbCollectionType(getRyaDbCollection());
final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
final MongoDbBatchWriter<DBObject> mongoDbBatchWriter = new MongoDbBatchWriter<>(collectionType, mongoDbBatchWriterConfig);
mongoDbBatchWriter.start();
mongoDbBatchWriter.addObjectsToQueue(objects);
mongoDbBatchWriter.flush();
Thread.sleep(1_000);
mongoDbBatchWriter.addObjectsToQueue(objects);
mongoDbBatchWriter.flush();
Thread.sleep(1_000);
mongoDbBatchWriter.shutdown();
assertEquals(4, getRyaDbCollection().count());
}
@Test
public void testMongoCollectionFlush() throws Exception {
final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
final List<Document> documents = Lists.newArrayList(
toDocument(storageStrategy.serialize(statement(1))),
toDocument(storageStrategy.serialize(statement(2))),
toDocument(storageStrategy.serialize(statement(2))),
null,
toDocument(storageStrategy.serialize(statement(3))),
toDocument(storageStrategy.serialize(statement(3))),
toDocument(storageStrategy.serialize(statement(4)))
);
final MongoCollectionType mongoCollectionType = new MongoCollectionType(getRyaCollection());
final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
final MongoDbBatchWriter<Document> mongoDbBatchWriter = new MongoDbBatchWriter<>(mongoCollectionType, mongoDbBatchWriterConfig);
mongoDbBatchWriter.start();
mongoDbBatchWriter.addObjectsToQueue(documents);
mongoDbBatchWriter.flush();
Thread.sleep(1_000);
mongoDbBatchWriter.addObjectsToQueue(documents);
mongoDbBatchWriter.flush();
Thread.sleep(1_000);
mongoDbBatchWriter.shutdown();
assertEquals(4, getRyaCollection().count());
}
private static Document toDocument(final DBObject dbObject) {
if (dbObject == null) {
return null;
}
final Document document = Document.parse(dbObject.toString());
return document;
}
private static RyaIRI ryaIRI(final int v) {
return new RyaIRI("u:" + v);
}
private static RyaStatement statement(final int v) {
final RyaStatementBuilder builder = new RyaStatementBuilder();
builder.setPredicate(ryaIRI(v));
builder.setSubject(ryaIRI(v));
builder.setObject(ryaIRI(v));
return builder.build();
}
}