Improve journal throughput when journalSyncData is disabled.

Descriptions of the changes in this PR:

### Motivation

There was a weird behavior observed: when `journalSyncData` is disabled, the journal throughput decreased with multiple journal dirs on same journal disk.

The problem is when `journalSyncData` is disabled, we still sync data to disk when rolling the journal file. The sync time increases to multiple seconds, because it has to flush all the files to disk and wait the whole sync operation to be completed.

### Changes

when `journalSyncData` is disabled, we trigger callbacks when the data is flushed to filesystem but still enqueue a fsync request to force write queue to make sure we still fsync data frequently.

Also added a journal perf tool to do performance testing on journal.




Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <zhaijia@apache.org>, Matteo Merli <mmerli@apache.org>

This closes #2150 from sijie/journal_perf
diff --git a/bin/bkperf b/bin/bkperf
index 6ec2909..857eda4 100755
--- a/bin/bkperf
+++ b/bin/bkperf
@@ -47,7 +47,7 @@
 fi
 CLI_LOG_DIR=${CLI_LOG_DIR:-"$BK_HOME/logs"}
 CLI_LOG_FILE=${CLI_LOG_FILE:-"bkperf.log"}
-CLI_ROOT_LOGGER=${CLI_ROOT_LOGGER:-"INFO,ROLLINGFILE"}
+CLI_ROOT_LOGGER=${CLI_ROOT_LOGGER:-"INFO,CONSOLE"}
 
 # Configure the classpath
 CLI_CLASSPATH="$CLI_JAR:$CLI_CLASSPATH:$CLI_EXTRA_CLASSPATH"
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 4af728a..19d5a79 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -852,7 +852,8 @@
     }
 
     @VisibleForTesting
-    void logAddEntry(long ledgerId, long entryId, ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
+    public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
+                            boolean ackBeforeSync, WriteCallback cb, Object ctx)
             throws InterruptedException {
         //Retain entry until it gets written to journal
         entry.retain();
@@ -1049,25 +1050,11 @@
                                 .registerSuccessfulValue(batchSize);
 
                             boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
-                            if (syncData) {
-                                // Trigger data sync to disk in the "Force-Write" thread.
-                                // Callback will be triggered after data is committed to disk
-                                forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
-                                                                               toFlush, shouldRolloverJournal, false));
-                                toFlush = entryListRecycler.newInstance();
-                                numEntriesToFlush = 0;
-                            } else {
-                                // Data is already written on the file (though it might still be in the OS page-cache)
-                                lastLogMark.setCurLogMark(logId, lastFlushPosition);
-                                toFlush.clear();
-                                numEntriesToFlush = 0;
-                                if (shouldRolloverJournal) {
-                                    forceWriteRequests.put(
-                                            createForceWriteRequest(
-                                                    logFile, logId, lastFlushPosition,
-                                                    EMPTY_ARRAY_LIST, shouldRolloverJournal, false));
-                                }
-                            }
+                            // Trigger data sync to disk in the "Force-Write" thread.
+                            forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
+                                                                           toFlush, shouldRolloverJournal, false));
+                            toFlush = entryListRecycler.newInstance();
+                            numEntriesToFlush = 0;
 
                             batchSize = 0L;
                             // check whether journal file is over file limit
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 75e230c..9a2c42d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -713,6 +713,17 @@
     }
 
     /**
+     * Set the size of the write buffers used for the journal.
+     *
+     * @param bufferSizeKB the size of the write buffer used for the journal, in KB.
+     * @return server configuration
+     */
+    public ServerConfiguration setJournalWriteBufferSizeKB(int bufferSizeKB) {
+        setProperty(JOURNAL_WRITE_BUFFER_SIZE, bufferSizeKB);
+        return this;
+    }
+
+    /**
      * Max number of older journal files kept.
      *
      * @return max number of older journal files to kept
@@ -1972,6 +1983,17 @@
     }
 
     /**
+     * Set maximum bytes to buffer to impose on a journal write to achieve grouping.
+     *
+     * @param maxBytes maximum bytes to buffer to impose on a journal write
+     * @return max bytes to buffer
+     */
+    public ServerConfiguration setJournalBufferedWritesThreshold(long maxBytes) {
+        setProperty(JOURNAL_BUFFERED_WRITES_THRESHOLD, maxBytes);
+        return this;
+    }
+
+    /**
      * Maximum entries to buffer to impose on a journal write to achieve grouping.
      * Use {@link #getJournalBufferedWritesThreshold()} if this is set to zero or
      * less than zero.
diff --git a/tools/perf/pom.xml b/tools/perf/pom.xml
index ddbbf37..cb04620 100644
--- a/tools/perf/pom.xml
+++ b/tools/perf/pom.xml
@@ -40,6 +40,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>prometheus-metrics-provider</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.hdrhistogram</groupId>
       <artifactId>HdrHistogram</artifactId>
     </dependency>
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
index 3aac74e..e7ad543 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
@@ -36,7 +36,8 @@
             .withFlags(new BKFlags())
             .withConsole(System.out)
             .addCommand(new DlogPerfCommandGroup())
-            .addCommand(new TablePerfCommandGroup());
+            .addCommand(new TablePerfCommandGroup())
+            .addCommand(new JournalPerfCommandGroup());
 
         CliSpec<BKFlags> spec = specBuilder.build();
 
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/JournalPerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/JournalPerfCommandGroup.java
new file mode 100644
index 0000000..982ed5d
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/JournalPerfCommandGroup.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.journal.AppendCommand;
+
+/**
+ * Commands that evaluate performance of the bookkeeper journal library.
+ */
+public class JournalPerfCommandGroup extends CliCommandGroup<BKFlags> implements PerfCommandGroup<BKFlags> {
+
+    private static final String NAME = "journal";
+    private static final String DESC = "Commands on evaluating performance of journal library";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent(BKPerf.NAME)
+        .addCommand(new AppendCommand())
+        .build();
+
+    public JournalPerfCommandGroup() {
+        super(spec);
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/AppendCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/AppendCommand.java
new file mode 100644
index 0000000..4d04961
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/AppendCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.journal;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.journal.JournalWriter.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to write log records to distributedlog streams.
+ */
+@Slf4j
+public class AppendCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "append";
+    private static final String DESC = "Append entries to a bookie journal";
+
+    public AppendCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags, Flags cmdFlags) {
+        JournalWriter writer = new JournalWriter(conf, cmdFlags);
+        writer.run();
+        return true;
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
new file mode 100644
index 0000000..1f38c42
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.journal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.Stats;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class JournalWriter implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-t", "--num-test-threads"
+            },
+            description = "Num of test threads to append entries to journal"
+        )
+        public int numTestThreads = 1;
+
+        @Parameter(
+            names = {
+                "-nl", "--num-ledgers"
+            },
+            description = "Num of ledgers to append entries to journal"
+        )
+        public int numLedgers = 24;
+
+        @Parameter(
+            names = {
+                "-r", "--rate"
+            },
+            description = "Write rate bytes/s across journals")
+        public int writeRate = 0;
+
+        @Parameter(
+            names = {
+                "-s", "--entry-size"
+            },
+            description = "Entry size")
+        public int recordSize = 1024;
+
+        @Parameter(
+            names = {
+                "-j", "--journal-dirs"
+            },
+            description = "The list of journal directories, separated by comma",
+            required = true)
+        public List<String> journalDirs;
+
+        @Parameter(
+            names = {
+                "-mob", "--max-outstanding-megabytes"
+            },
+            description = "Number of threads writing")
+        public long maxOutstandingMB = 200;
+
+        @Parameter(
+            names = {
+                "-n", "--num-entries"
+            },
+            description = "Number of entries to write in total. If 0, it will keep writing")
+        public long numEntries = 0;
+
+        @Parameter(
+            names = {
+                "-b", "--num-bytes"
+            },
+            description = "Number of bytes to write in total. If 0, it will keep writing")
+        public long numBytes = 0;
+
+        @Parameter(
+            names = {
+                "-wb", "--write-buffer-size-kb"
+            },
+            description = "Journal write buffer size"
+        )
+        public int writeBufferSizeKB = 1024;
+
+        @Parameter(
+            names = {
+                "--sync"
+            },
+            description = "Journal sync enabled"
+        )
+        public boolean journalSyncEnabled = false;
+
+        @Parameter(
+            names = {
+                "-gci", "--group-commit-interval-ms"
+            },
+            description = "Journal group commit interval in milliseconds"
+        )
+        public int groupCommitIntervalMs = 1;
+
+        @Parameter(
+            names = {
+                "-gcb", "--group-commit-max-bytes"
+            },
+            description = "Journal group commit max buffered bytes"
+        )
+        public int groupCommitMaxBytes = 512 * 1024;
+
+        @Parameter(
+            names = {
+                "-q", "--journal-queue-size"
+            },
+            description = "Journal queue size"
+        )
+        public int journalQueueSize = 10000;
+
+        @Parameter(
+            names = {
+                "-jt", "--num-journal-callback-threads"
+            },
+            description = "Number of journal callback threads"
+        )
+        public int numJournalCallbackThreads = 8;
+
+    }
+
+
+    // stats
+    private final LongAdder recordsWritten = new LongAdder();
+    private final LongAdder bytesWritten = new LongAdder();
+
+    private final ServerConfiguration conf;
+    private final Flags flags;
+    private final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    JournalWriter(CompositeConfiguration conf, Flags flags) {
+        this.conf = new ServerConfiguration();
+        this.conf.addConfiguration(conf);
+        this.flags = flags;
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting journal perf writer with config : {}", w.writeValueAsString(flags));
+
+        checkArgument(flags.journalDirs.size() > 0, "No journal dirs is provided");
+
+        updateServerConf(conf, flags);
+
+        log.info("Benchmark the journal perf with server config : {}",
+            conf.asJson());
+
+        Stats.loadStatsProvider(conf);
+        Stats.get().start(conf);
+
+        StatsLogger statsLogger = Stats.get().getStatsLogger("")
+            .scope(BOOKIE_SCOPE);
+
+
+        ByteBufAllocator allocator = getAllocator(conf);
+        DiskChecker checker = new DiskChecker(
+            conf.getDiskUsageThreshold(),
+            conf.getDiskUsageWarnThreshold()
+        );
+        LedgerDirsManager manager = new LedgerDirsManager(
+            conf,
+            conf.getLedgerDirs(),
+            checker,
+            NullStatsLogger.INSTANCE
+        );
+        Journal[] journals = new Journal[flags.journalDirs.size()];
+        for (int i = 0; i < journals.length; i++) {
+            Journal journal = new Journal(
+                i,
+                new File(flags.journalDirs.get(i)),
+                conf,
+                manager,
+                statsLogger.scope(JOURNAL_SCOPE),
+                allocator);
+            journals[i] = journal;
+            journal.start();
+        }
+        try {
+            execute(journals);
+        } finally {
+            for (Journal journal : journals) {
+                journal.shutdown();
+            }
+
+            Stats.get().stop();
+        }
+    }
+
+    void execute(Journal[] journals) throws Exception {
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ScheduledExecutorService flushExecutor = Executors.newSingleThreadScheduledExecutor();
+        flushExecutor.scheduleAtFixedRate(() -> {
+            for (Journal journal : journals) {
+                Checkpoint cp = journal.newCheckpoint();
+                try {
+                    journal.checkpointComplete(cp, true);
+                } catch (IOException e) {
+                    log.error("Failed to complete checkpoint {}", cp, e);
+                }
+            }
+        }, 30L, 30L, TimeUnit.SECONDS);
+
+        ExecutorService executor = Executors.newFixedThreadPool(flags.numTestThreads);
+        try {
+            for (int i = 0; i < flags.numTestThreads; i++) {
+                final int idx = i;
+                final long numRecordsForThisThread = flags.numEntries / flags.numTestThreads;
+                final long numBytesForThisThread = flags.numBytes / flags.numTestThreads;
+                final double writeRateForThisThread = flags.writeRate / (double) flags.numTestThreads;
+                final long maxOutstandingBytesForThisThread =
+                    flags.maxOutstandingMB * 1024 * 1024 / flags.numTestThreads;
+                final int numLedgersForThisThread = flags.numLedgers / flags.numTestThreads;
+                executor.submit(() -> {
+                    try {
+                        write(
+                            idx,
+                            journals,
+                            numLedgersForThisThread,
+                            writeRateForThisThread,
+                            (int) maxOutstandingBytesForThisThread,
+                            numRecordsForThisThread,
+                            numBytesForThisThread);
+                    } catch (Throwable t) {
+                        log.error("Encountered error at writing records", t);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numTestThreads);
+            reportStats();
+        } finally {
+            flushExecutor.shutdown();
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    void write(int threadIdx,
+               Journal[] journals,
+               int numLedgersForThisThread,
+               double writeRate,
+               int maxOutstandingBytesForThisThread,
+               long numRecordsForThisThread,
+               long numBytesForThisThread) throws Exception {
+        log.info("Write thread {} started with : rate = {},"
+                + " num records = {}, num bytes = {}, max outstanding bytes = {}",
+            threadIdx,
+            writeRate,
+            numRecordsForThisThread,
+            numBytesForThisThread,
+            maxOutstandingBytesForThisThread);
+
+        RateLimiter limiter;
+        if (writeRate > 0) {
+            limiter = RateLimiter.create(writeRate);
+        } else {
+            limiter = null;
+        }
+        final Semaphore semaphore;
+        if (maxOutstandingBytesForThisThread > 0) {
+            semaphore = new Semaphore(maxOutstandingBytesForThisThread);
+        } else {
+            semaphore = null;
+        }
+
+        // Acquire 1 second worth of records to have a slower ramp-up
+        if (limiter != null) {
+            limiter.acquire((int) writeRate);
+        }
+
+        long totalWritten = 0L;
+        long totalBytesWritten = 0L;
+        final int numJournals = journals.length;
+        byte[] payload = new byte[flags.recordSize];
+        ThreadLocalRandom.current().nextBytes(payload);
+        ByteBuf payloadBuf = Unpooled.wrappedBuffer(payload);
+        long[] entryIds = new long[numLedgersForThisThread];
+        Arrays.fill(entryIds, 0L);
+        while (true) {
+            for (int i = 0; i < numJournals; i++) {
+                int ledgerIdx = ThreadLocalRandom.current().nextInt(numLedgersForThisThread);
+                long lid = threadIdx * numLedgersForThisThread + ledgerIdx;
+                long eid = entryIds[ledgerIdx]++;
+                ByteBuf buf = payloadBuf.retainedDuplicate();
+                int len = buf.readableBytes();
+
+                if (numRecordsForThisThread > 0
+                    && totalWritten >= numRecordsForThisThread) {
+                    markPerfDone();
+                }
+                if (numBytesForThisThread > 0
+                    && totalBytesWritten >= numBytesForThisThread) {
+                    markPerfDone();
+                }
+                if (null != semaphore) {
+                    semaphore.acquire(len);
+                }
+
+                totalWritten++;
+                totalBytesWritten += len;
+                if (null != limiter) {
+                    limiter.acquire(len);
+                }
+                final long sendTime = System.nanoTime();
+                journals[i].logAddEntry(
+                    lid,
+                    eid,
+                    buf,
+                    false,
+                    (rc, ledgerId, entryId, addr, ctx) -> {
+                        buf.release();
+                        if (0 == rc) {
+                            if (null != semaphore) {
+                                semaphore.release(len);
+                            }
+
+                            recordsWritten.increment();
+                            bytesWritten.add(len);
+
+                            long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+                                System.nanoTime() - sendTime
+                            );
+                            recorder.recordValue(latencyMicros);
+                            cumulativeRecorder.recordValue(latencyMicros);
+                        } else {
+                            log.warn("Error at writing records : {}", BookieException.create(rc));
+                            Runtime.getRuntime().exit(-1);
+                        }
+                    },
+                    null
+                );
+            }
+        }
+    }
+
+    @SuppressFBWarnings("DM_EXIT")
+    void markPerfDone() throws Exception {
+        log.info("------------------- DONE -----------------------");
+        printAggregatedStats(cumulativeRecorder);
+        isDone.set(true);
+        Thread.sleep(5000);
+        System.exit(0);
+    }
+
+    void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsWritten.sumThenReset() / elapsed;
+            double throughput = bytesWritten.sumThenReset() / elapsed / 1024 / 1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info(
+                    "Throughput written : {}  records/s --- {} MB/s --- Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    throughputFormat.format(rate), throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    private static void updateServerConf(ServerConfiguration conf, Flags flags) {
+
+        conf.setJournalWriteBufferSizeKB(flags.writeBufferSizeKB);
+        conf.setJournalMaxGroupWaitMSec(flags.groupCommitIntervalMs);
+        conf.setJournalBufferedWritesThreshold(flags.groupCommitMaxBytes);
+        conf.setNumJournalCallbackThreads(flags.numJournalCallbackThreads);
+        conf.setJournalQueueSize(flags.journalQueueSize);
+        conf.setJournalSyncData(flags.journalSyncEnabled);
+        conf.setLedgerDirNames(flags.journalDirs.toArray(new String[0]));
+        conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+        File[] currentDirs = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+        for (File dir : currentDirs) {
+            if (dir.mkdirs()) {
+                log.info("Successfully created dir {}", dir);
+            }
+        }
+    }
+
+    private static ByteBufAllocator getAllocator(ServerConfiguration conf) {
+        return ByteBufAllocatorBuilder.create()
+                .poolingPolicy(conf.getAllocatorPoolingPolicy())
+                .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+                .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+                .outOfMemoryListener((ex) -> {
+                    log.error("Unable to allocate memory, exiting bookie", ex);
+                })
+                .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                .build();
+    }
+
+    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+
+    private static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/package-info.java
new file mode 100644
index 0000000..0f432e9
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BookKeeper Journal related perf command.
+ */
+package org.apache.bookkeeper.tools.perf.journal;
\ No newline at end of file