blob: c5090dbceba327e7857e8305b26f1436decdb3b2 [file] [log] [blame]
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
@RunWith(Parameterized.class)
public class WriteBatchThreadedTest {
@Parameters(name = "WriteBatchThreadedTest(threadCount={0})")
public static Iterable<Integer> data() {
return Arrays.asList(new Integer[]{1, 10, 50, 100});
}
@Parameter
public int threadCount;
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
RocksDB db;
@Before
public void setUp() throws Exception {
RocksDB.loadLibrary();
final Options options = new Options()
.setCreateIfMissing(true)
.setIncreaseParallelism(32);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
assert (db != null);
}
@After
public void tearDown() throws Exception {
if (db != null) {
db.close();
}
}
@Test
public void threadedWrites() throws InterruptedException, ExecutionException {
final List<Callable<Void>> callables = new ArrayList<>();
for (int i = 0; i < 100; i++) {
final int offset = i * 100;
callables.add(new Callable<Void>() {
@Override
public Void call() throws RocksDBException {
try (final WriteBatch wb = new WriteBatch();
final WriteOptions w_opt = new WriteOptions()) {
for (int i = offset; i < offset + 100; i++) {
wb.put(ByteBuffer.allocate(4).putInt(i).array(), "parallel rocks test".getBytes());
}
db.write(w_opt, wb);
}
return null;
}
});
}
//submit the callables
final ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
try {
final ExecutorCompletionService<Void> completionService =
new ExecutorCompletionService<>(executorService);
final Set<Future<Void>> futures = new HashSet<>();
for (final Callable<Void> callable : callables) {
futures.add(completionService.submit(callable));
}
while (futures.size() > 0) {
final Future<Void> future = completionService.take();
futures.remove(future);
try {
future.get();
} catch (final ExecutionException e) {
for (final Future<Void> f : futures) {
f.cancel(true);
}
throw e;
}
}
} finally {
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
}
}