blob: 5432545417484fc13a834bced6a90da7e213d802 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.test;
import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests writing to concurrent ledgers.
*/
public class ConcurrentLedgerTest {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentLedgerTest.class);
Bookie bookie;
File txnDir, ledgerDir;
int recvTimeout = 10000;
Semaphore throttle;
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
final List<File> tempDirs = new ArrayList<File>();
private File createTempDir(String prefix, String suffix, File parent) throws IOException {
File dir = File.createTempFile(prefix, suffix, parent);
dir.delete();
tempDirs.add(dir);
return dir;
}
@Before
public void setUp() throws Exception {
String txnDirName = System.getProperty("txnDir");
if (txnDirName != null) {
txnDir = new File(txnDirName);
}
String ledgerDirName = System.getProperty("ledgerDir");
if (ledgerDirName != null) {
ledgerDir = new File(ledgerDirName);
}
File tmpFile = createTempDir("book", ".txn", txnDir);
txnDir = new File(tmpFile.getParent(), tmpFile.getName() + ".dir");
txnDir.mkdirs();
tmpFile = createTempDir("book", ".ledger", ledgerDir);
ledgerDir = new File(tmpFile.getParent(), tmpFile.getName() + ".dir");
ledgerDir.mkdirs();
conf.setBookiePort(5000);
conf.setMetadataServiceUri(null);
conf.setJournalDirName(txnDir.getPath());
conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
bookie = new BookieImpl(conf);
bookie.start();
}
static void recursiveDelete(File f) {
if (f.isFile()) {
f.delete();
} else {
for (File i: f.listFiles()) {
recursiveDelete(i);
}
f.delete();
}
}
@After
public void tearDown() {
bookie.shutdown();
recursiveDelete(txnDir);
recursiveDelete(ledgerDir);
}
byte[] zeros = new byte[16];
int iterations = 51;
{
String iterationsString = System.getProperty("iterations");
if (iterationsString != null) {
iterations = Integer.parseInt(iterationsString);
}
}
int iterationStep = 25;
{
String iterationsString = System.getProperty("iterationStep");
if (iterationsString != null) {
iterationStep = Integer.parseInt(iterationsString);
}
}
@Test
public void testConcurrentWrite() throws IOException, InterruptedException, BookieException {
int size = 1024;
int totalwrites = 128;
if (System.getProperty("totalwrites") != null) {
totalwrites = Integer.parseInt(System.getProperty("totalwrites"));
}
LOG.info("Running up to " + iterations + " iterations");
LOG.info("Total writes = " + totalwrites);
int ledgers;
for (ledgers = 1; ledgers <= iterations; ledgers += iterationStep) {
long duration = doWrites(ledgers, size, totalwrites);
LOG.info(totalwrites + " on " + ledgers + " took " + duration + " ms");
}
LOG.info("ledgers " + ledgers);
for (ledgers = 1; ledgers <= iterations; ledgers += iterationStep) {
long duration = doReads(ledgers, size, totalwrites);
LOG.info(ledgers + " read " + duration + " ms");
}
}
private long doReads(int ledgers, int size, int totalwrites)
throws IOException, InterruptedException, BookieException {
long start = System.currentTimeMillis();
for (int i = 1; i <= totalwrites / ledgers; i++) {
for (int j = 1; j <= ledgers; j++) {
ByteBuf entry = bookie.readEntry(j, i);
// skip the ledger id and the entry id
entry.readLong();
entry.readLong();
assertEquals(j + "@" + i, j + 2, entry.readLong());
assertEquals(j + "@" + i, i + 3, entry.readLong());
}
}
long finish = System.currentTimeMillis();
return finish - start;
}
private long doWrites(int ledgers, int size, int totalwrites)
throws IOException, InterruptedException, BookieException {
throttle = new Semaphore(10000);
WriteCallback cb = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
AtomicInteger counter = (AtomicInteger) ctx;
counter.getAndIncrement();
throttle.release();
}
};
AtomicInteger counter = new AtomicInteger();
long start = System.currentTimeMillis();
for (int i = 1; i <= totalwrites / ledgers; i++) {
for (int j = 1; j <= ledgers; j++) {
ByteBuffer bytes = ByteBuffer.allocate(size);
bytes.putLong(j);
bytes.putLong(i);
bytes.putLong(j + 2);
bytes.putLong(i + 3);
bytes.put(("This is ledger " + j + " entry " + i).getBytes());
bytes.position(0);
bytes.limit(bytes.capacity());
throttle.acquire();
bookie.addEntry(Unpooled.wrappedBuffer(bytes), false, cb, counter, zeros);
}
}
long finish = System.currentTimeMillis();
return finish - start;
}
}