blob: 3211384c2800a50903ffd730cd27342a19e336ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.junit.Assert.assertNotNull;
public class BlobThroughPutTest {
private static final int NO_OF_NODES = 100;
private static final int BLOB_SIZE = 1024 * 1024 * 2;
private static final String TEST_DB1 = "tptest1";
private static final String TEST_DB2 = "tptest2";
private static final int MAX_EXEC_TIME = 5; //In seconds
private static final int[] READERS = {5, 10, 15, 20};
private static final int[] WRITERS = {0, 1, 2, 4};
private final List<Result> results = new ArrayList<Result>();
private static final BiMap<WriteConcern,String> namedConcerns;
static {
BiMap<WriteConcern,String> bimap = HashBiMap.create();
bimap.put(WriteConcern.FSYNC_SAFE,"FSYNC_SAFE");
bimap.put(WriteConcern.JOURNAL_SAFE,"JOURNAL_SAFE");
// bimap.put(WriteConcern.MAJORITY,"MAJORITY");
bimap.put(WriteConcern.UNACKNOWLEDGED,"UNACKNOWLEDGED");
bimap.put(WriteConcern.NORMAL,"NORMAL");
// bimap.put(WriteConcern.REPLICAS_SAFE,"REPLICAS_SAFE");
bimap.put(WriteConcern.SAFE,"SAFE");
namedConcerns = Maps.unmodifiableBiMap(bimap);
}
private final String localServer = "localhost:27017/test";
private final String remoteServer = "remote:27017/test";
@Ignore
@Test
public void performBenchMark() throws InterruptedException {
MongoClient local = new MongoClient(new MongoClientURI(localServer));
MongoClient remote = new MongoClient(new MongoClientURI(remoteServer));
run(local, false, false);
run(local, true, false);
run(remote, false, true);
run(remote, true, true);
dumpResult();
}
@Ignore
@Test
public void performBenchMark_WriteConcern() throws InterruptedException {
MongoClient mongo = new MongoClient(new MongoClientURI(remoteServer));
final MongoDatabase db = mongo.getDatabase(TEST_DB1);
final MongoCollection<BasicDBObject> nodes = db.getCollection("nodes", BasicDBObject.class);
final MongoCollection<BasicDBObject> blobs = db.getCollection("blobs", BasicDBObject.class);
int readers = 0;
int writers = 2;
for(WriteConcern wc : namedConcerns.keySet()){
prepareDB(nodes, blobs);
final Benchmark b = new Benchmark(nodes, blobs);
Result r = b.run(readers, writers, true, wc);
results.add(r);
}
prepareDB(nodes, blobs);
dumpResult();
}
private void dumpResult() {
PrintStream ps = System.out;
ps.println(Result.OUTPUT_FORMAT);
for (Result r : results) {
ps.println(r.toString());
}
}
private void run(MongoClient mongo, boolean useSameDB, boolean remote) throws InterruptedException {
final MongoDatabase nodeDB = mongo.getDatabase(TEST_DB1);
final MongoDatabase blobDB = useSameDB ? mongo.getDatabase(TEST_DB1) : mongo.getDatabase(TEST_DB2);
final MongoCollection<BasicDBObject> nodes = nodeDB.getCollection("nodes", BasicDBObject.class);
final MongoCollection<BasicDBObject> blobs = blobDB.getCollection("blobs", BasicDBObject.class);
for (int readers : READERS) {
for (int writers : WRITERS) {
prepareDB(nodes, blobs);
final Benchmark b = new Benchmark(nodes, blobs);
Result r = b.run(readers, writers, remote, WriteConcern.SAFE);
results.add(r);
}
}
}
private void prepareDB(MongoCollection<BasicDBObject> nodes,
MongoCollection<BasicDBObject> blobs) {
MongoUtils.dropCollections(nodes.getNamespace().getDatabaseName());
MongoUtils.dropCollections(blobs.getNamespace().getDatabaseName());
createTestNodes(nodes);
}
private void createTestNodes(MongoCollection<BasicDBObject> nodes) {
for (int i = 0; i < NO_OF_NODES; i++) {
BasicDBObject obj = new BasicDBObject("_id", i)
.append("foo", "bar1" + i);
nodes.insertOne(obj);
}
}
private static class Result {
final static String OUTPUT_FORMAT = "remote, samedb, readers, writers, reads, writes, " +
"time, readThroughPut, writeThroughPut, writeConcern";
int totalReads;
int totalWrites = 0;
int noOfReaders;
int noOfWriters;
int execTime;
int dataSize = BLOB_SIZE;
boolean sameDB;
boolean remote;
WriteConcern writeConcern;
double readThroughPut() {
return totalReads / execTime;
}
double writeThroughPut() {
return totalWrites * dataSize / execTime;
}
String getWriteConcern(){
return namedConcerns.get(writeConcern);
}
@Override
public String toString() {
return String.format("%s,%s,%d,%d,%d,%d,%d,%1.0f,%s,%s",
remote,
sameDB,
noOfReaders,
noOfWriters,
totalReads,
totalWrites,
execTime,
readThroughPut(),
humanReadableByteCount((long) writeThroughPut()),
getWriteConcern());
}
}
private static class Benchmark {
private final MongoCollection<BasicDBObject> nodes;
private final MongoCollection<BasicDBObject> blobs;
private final Random random = new Random();
private final AtomicBoolean stopTest = new AtomicBoolean(false);
private final static byte[] DATA;
private final CountDownLatch startLatch = new CountDownLatch(1);
static {
try {
DATA = ByteStreams.toByteArray(new RandomStream(BLOB_SIZE, 100));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private Benchmark(MongoCollection<BasicDBObject> nodes,
MongoCollection<BasicDBObject> blobs) {
this.nodes = nodes;
this.blobs = blobs;
}
public Result run(int noOfReaders, int noOfWriters, boolean remote, WriteConcern writeConcern) throws InterruptedException {
boolean sameDB = nodes.getNamespace().getDatabaseName().equals(blobs.getNamespace().getDatabaseName());
List<Reader> readers = new ArrayList<Reader>(noOfReaders);
List<Writer> writers = new ArrayList<Writer>(noOfWriters);
List<Runnable> runnables = new ArrayList<Runnable>(noOfReaders + noOfWriters);
final CountDownLatch stopLatch = new CountDownLatch(noOfReaders + noOfWriters);
for (int i = 0; i < noOfReaders; i++) {
readers.add(new Reader(stopLatch));
}
for (int i = 0; i < noOfWriters; i++) {
writers.add(new Writer(i, stopLatch, writeConcern));
}
runnables.addAll(readers);
runnables.addAll(writers);
stopTest.set(false);
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < runnables.size(); i++) {
Thread worker = new Thread(runnables.get(i));
worker.start();
threads.add(worker);
}
System.err.printf("Running with [%d] readers and [%d] writers. " +
"Same DB [%s], Remote server [%s], Max Time [%d] seconds, WriteConcern [%s] %n",
noOfReaders, noOfWriters, sameDB, remote, MAX_EXEC_TIME,namedConcerns.get(writeConcern));
startLatch.countDown();
TimeUnit.SECONDS.sleep(MAX_EXEC_TIME);
stopTest.set(true);
stopLatch.await();
int totalReads = 0;
for (Reader r : readers) {
totalReads += r.readCount;
}
int totalWrites = 0;
for (Writer w : writers) {
totalWrites += w.writeCount;
}
Result r = new Result();
r.execTime = MAX_EXEC_TIME;
r.noOfReaders = noOfReaders;
r.noOfWriters = noOfWriters;
r.totalReads = totalReads;
r.totalWrites = totalWrites;
r.remote = remote;
r.sameDB = sameDB;
r.writeConcern = writeConcern;
System.err.printf("Run complete. Reads [%d] and writes [%d] %n", totalReads, totalWrites);
System.err.println(r.toString());
return r;
}
private void waitForStart() {
try {
startLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
private class Reader implements Runnable {
int readCount = 0;
final CountDownLatch stopLatch;
public Reader(CountDownLatch stopLatch) {
this.stopLatch = stopLatch;
}
public void run() {
waitForStart();
while (!stopTest.get()) {
int id = random.nextInt(NO_OF_NODES);
BasicDBObject o = nodes.find(Filters.eq("_id", id)).first();
assertNotNull("did not found object with id " + id, o);
readCount++;
}
stopLatch.countDown();
}
}
private class Writer implements Runnable {
int writeCount = 0;
final int id;
final CountDownLatch stopLatch;
final WriteConcern writeConcern;
private Writer(int id, CountDownLatch stopLatch, WriteConcern writeConcern) {
this.id = id;
this.stopLatch = stopLatch;
this.writeConcern = writeConcern;
}
public void run() {
waitForStart();
while (!stopTest.get()) {
String _id = id + "-" + writeCount;
BasicDBObject obj = new BasicDBObject()
.append("foo", _id);
obj.put("blob", DATA);
blobs.withWriteConcern(writeConcern).insertOne(obj);
writeCount++;
}
stopLatch.countDown();
}
}
}
}