Impose a memory limit on the bookie journal (#2710)

* Impose a memory limit on the bookie journal

* Fixed checkstyle issues

* Fixed more checkstyle issues

* Added metrics for journal memory

* More checkstyle..

* Unused import
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MemoryLimitController.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MemoryLimitController.java
new file mode 100644
index 0000000..7fddfd7
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MemoryLimitController.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Controller for tracking the amount of memory used for some task.
+ */
+public class MemoryLimitController {
+
+    private final long memoryLimit;
+    private final AtomicLong currentUsage = new AtomicLong();
+    private final ReentrantLock mutex = new ReentrantLock(false);
+    private final Condition condition = mutex.newCondition();
+
+    public MemoryLimitController(long memoryLimitBytes) {
+        this.memoryLimit = memoryLimitBytes;
+    }
+
+    public boolean tryReserveMemory(long size) {
+        while (true) {
+            long current = currentUsage.get();
+            long newUsage = current + size;
+
+            // We allow one request to go over the limit, to make the notification
+            // path simpler and more efficient
+            if (current > memoryLimit && memoryLimit > 0) {
+                return false;
+            }
+
+            if (currentUsage.compareAndSet(current, newUsage)) {
+                return true;
+            }
+        }
+    }
+
+    public void reserveMemory(long size) throws InterruptedException {
+        if (!tryReserveMemory(size)) {
+            mutex.lock();
+
+            try {
+                while (!tryReserveMemory(size)) {
+                    condition.await();
+                }
+            } finally {
+                mutex.unlock();
+            }
+        }
+    }
+
+    public void releaseMemory(long size) {
+        long newUsage = currentUsage.addAndGet(-size);
+        if (newUsage + size > memoryLimit && newUsage <= memoryLimit) {
+            // We just crossed the limit. Now we have more space
+            mutex.lock();
+            try {
+                condition.signalAll();
+            } finally {
+                mutex.unlock();
+            }
+        }
+    }
+
+    public long currentUsage() {
+        return currentUsage.get();
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/MemoryLimitControllerTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/MemoryLimitControllerTest.java
new file mode 100644
index 0000000..ffd056c
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/MemoryLimitControllerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link MemoryLimitController}.
+ */
+public class MemoryLimitControllerTest {
+
+    private ExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newCachedThreadPool();
+    }
+
+    @After
+    public void teardown() {
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void testLimit() throws Exception {
+        MemoryLimitController mlc = new MemoryLimitController(100);
+
+        for (int i = 0; i < 101; i++) {
+            mlc.reserveMemory(1);
+        }
+
+        assertEquals(101, mlc.currentUsage());
+        assertFalse(mlc.tryReserveMemory(1));
+        mlc.releaseMemory(1);
+        assertEquals(100, mlc.currentUsage());
+
+        assertTrue(mlc.tryReserveMemory(1));
+        assertEquals(101, mlc.currentUsage());
+    }
+
+    @Test
+    public void testBlocking() throws Exception {
+        MemoryLimitController mlc = new MemoryLimitController(100);
+
+        for (int i = 0; i < 101; i++) {
+            mlc.reserveMemory(1);
+        }
+
+        CountDownLatch l1 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l1.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        CountDownLatch l2 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l2.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        CountDownLatch l3 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l3.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        // The threads are blocked since the quota is full
+        assertFalse(l1.await(100, TimeUnit.MILLISECONDS));
+        assertFalse(l2.await(100, TimeUnit.MILLISECONDS));
+        assertFalse(l3.await(100, TimeUnit.MILLISECONDS));
+
+        assertEquals(101, mlc.currentUsage());
+        mlc.releaseMemory(3);
+
+        assertTrue(l1.await(1, TimeUnit.SECONDS));
+        assertTrue(l2.await(1, TimeUnit.SECONDS));
+        assertTrue(l3.await(1, TimeUnit.SECONDS));
+        assertEquals(101, mlc.currentUsage());
+    }
+
+    @Test
+    public void testStepRelease() throws Exception {
+        MemoryLimitController mlc = new MemoryLimitController(100);
+
+        for (int i = 0; i < 101; i++) {
+            mlc.reserveMemory(1);
+        }
+
+        CountDownLatch l1 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l1.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        CountDownLatch l2 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l2.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        CountDownLatch l3 = new CountDownLatch(1);
+        executor.submit(() -> {
+            try {
+                mlc.reserveMemory(1);
+                l3.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        // The threads are blocked since the quota is full
+        assertFalse(l1.await(100, TimeUnit.MILLISECONDS));
+        assertFalse(l2.await(100, TimeUnit.MILLISECONDS));
+        assertFalse(l3.await(100, TimeUnit.MILLISECONDS));
+
+        assertEquals(101, mlc.currentUsage());
+
+        mlc.releaseMemory(1);
+        mlc.releaseMemory(1);
+        mlc.releaseMemory(1);
+
+        assertTrue(l1.await(1, TimeUnit.SECONDS));
+        assertTrue(l2.await(1, TimeUnit.SECONDS));
+        assertTrue(l3.await(1, TimeUnit.SECONDS));
+        assertEquals(101, mlc.currentUsage());
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index bc6f726..a5ae48f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -113,6 +113,8 @@
     String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
     String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
     String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
+    String JOURNAL_MEMORY_MAX = "JOURNAL_MEMORY_MAX";
+    String JOURNAL_MEMORY_USED = "JOURNAL_MEMORY_USED";
 
     // Ledger Storage Stats
     String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index a8430c9..87143f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
@@ -86,8 +88,10 @@
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
@@ -146,6 +150,18 @@
 
     private final ByteBufAllocator allocator;
 
+    @StatsDoc(
+            name = JOURNAL_MEMORY_MAX,
+            help = "The max amount of memory in bytes that can be used by the bookie journal"
+    )
+    private final Gauge<Long> journalMemoryMaxStats;
+
+    @StatsDoc(
+            name = JOURNAL_MEMORY_USED,
+            help = "The actual amount of memory in bytes currently used by the bookie journal"
+    )
+    private final Gauge<Long> journalMemoryUsedStats;
+
     /**
      * Exception is thrown when no such a ledger is found in this bookie.
      */
@@ -812,6 +828,37 @@
 
         // Expose Stats
         this.bookieStats = new BookieStats(statsLogger);
+        journalMemoryMaxStats = new Gauge<Long>() {
+            final long journalMaxMemory = conf.getJournalMaxMemorySizeMb() * 1024 * 1024;
+
+            @Override
+            public Long getDefaultValue() {
+                return journalMaxMemory;
+            }
+
+            @Override
+            public Long getSample() {
+                return journalMaxMemory;
+            }
+        };
+        statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_MAX, journalMemoryMaxStats);
+
+        journalMemoryUsedStats = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return -1L;
+            }
+
+            @Override
+            public Long getSample() {
+                long totalMemory = 0L;
+                for (int i = 0; i < journals.size(); i++) {
+                    totalMemory += journals.get(i).getMemoryUsage();
+                }
+                return totalMemory;
+            }
+        };
+        statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_USED, journalMemoryUsedStats);
     }
 
     StateManager initializeStateManager() throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 3486415..02e5de5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -52,6 +52,7 @@
 import org.apache.bookkeeper.bookie.stats.JournalStats;
 import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
 import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.MemoryLimitController;
 import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -295,7 +296,7 @@
     /**
      * Journal Entry to Record.
      */
-    private static class QueueEntry implements Runnable {
+    static class QueueEntry implements Runnable {
         ByteBuf entry;
         long ledgerId;
         long entryId;
@@ -631,6 +632,7 @@
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
     private final ByteBufAllocator allocator;
+    private final MemoryLimitController memoryLimitController;
 
     // Expose Stats
     private final JournalStats journalStats;
@@ -655,6 +657,9 @@
             forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
         }
 
+        // Adjust the journal max memory in case there are multiple journals configured.
+        long journalMaxMemory = conf.getJournalMaxMemorySizeMb() / conf.getJournalDirNames().length * 1024 * 1024;
+        this.memoryLimitController = new MemoryLimitController(journalMaxMemory);
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         this.journalDirectory = journalDirectory;
@@ -860,11 +865,14 @@
     public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
                             boolean ackBeforeSync, WriteCallback cb, Object ctx)
             throws InterruptedException {
-        //Retain entry until it gets written to journal
+        // Retain entry until it gets written to journal
         entry.retain();
 
         journalStats.getJournalQueueSize().inc();
         journalStats.getJournalCbQueueSize().inc();
+
+        memoryLimitController.reserveMemory(entry.readableBytes());
+
         queue.put(QueueEntry.create(
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalStats.getJournalAddEntryStats(),
@@ -1110,6 +1118,7 @@
                      * shouldn't write this special entry
                      * (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
                      */
+                    memoryLimitController.releaseMemory(qe.entry.readableBytes());
                     qe.entry.release();
                 } else if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
                     int entrySize = qe.entry.readableBytes();
@@ -1125,6 +1134,7 @@
 
                     bc.write(lenBuff);
                     bc.write(qe.entry);
+                    memoryLimitController.releaseMemory(qe.entry.readableBytes());
                     qe.entry.release();
                 }
 
@@ -1202,4 +1212,7 @@
         join();
     }
 
+    long getMemoryUsage() {
+        return memoryLimitController.currentUsage();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index ae82220..f5e28d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -22,6 +22,9 @@
 import com.google.common.annotations.Beta;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
 import java.io.File;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
@@ -140,6 +143,7 @@
     protected static final String NUM_JOURNAL_CALLBACK_THREADS = "numJournalCallbackThreads";
     protected static final String JOURNAL_FORMAT_VERSION_TO_WRITE = "journalFormatVersionToWrite";
     protected static final String JOURNAL_QUEUE_SIZE = "journalQueueSize";
+    protected static final String JOURNAL_MAX_MEMORY_SIZE_MB = "journalMaxMemorySizeMb";
     protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
     // backpressure control
     protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
@@ -820,6 +824,29 @@
     }
 
     /**
+     * Set the max amount of memory that can be used by the journal.
+     *
+     * @param journalMaxMemorySizeMb
+     *            the max amount of memory for the journal
+     * @return server configuration.
+     */
+    public ServerConfiguration setJournalMaxMemorySizeMb(long journalMaxMemorySizeMb) {
+        this.setProperty(JOURNAL_MAX_MEMORY_SIZE_MB, journalMaxMemorySizeMb);
+        return this;
+    }
+
+    /**
+     * Get the max amount of memory that can be used by the journal.
+     *
+     * @return the max amount of memory for the journal
+     */
+    public long getJournalMaxMemorySizeMb() {
+        // Default is taking 5% of max direct memory (and convert to MB).
+        long defaultValue = (long) (PlatformDependent.maxDirectMemory() * 0.05 / 1024 / 1024);
+        return this.getLong(JOURNAL_MAX_MEMORY_SIZE_MB, defaultValue);
+    }
+
+    /**
      * Set PageCache flush interval in second.
      *
      * @Param journalPageCacheFlushInterval
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalMaxMemoryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalMaxMemoryTest.java
new file mode 100644
index 0000000..02299ad
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalMaxMemoryTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.MemoryLimitController;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Test the bookie journal max memory controller.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JournalChannel.class, Journal.class})
+@Slf4j
+public class BookieJournalMaxMemoryTest {
+
+    private static final ByteBuf DATA = Unpooled.wrappedBuffer(new byte[1024 * 1024]);
+
+    @Rule
+    public TemporaryFolder tempDir = new TemporaryFolder();
+
+    @Test
+    public void testAckAfterSyncPageCacheFlush() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setJournalDirName(journalDir.getPath())
+                .setJournalMaxMemorySizeMb(1);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = spy(new Journal(0, journalDir, conf, ledgerDirsManager));
+        Whitebox.setInternalState(journal, "memoryLimitController",
+                spy(new MemoryLimitController(1)));
+        MemoryLimitController mlc = Whitebox.getInternalState(journal, "memoryLimitController");
+
+        journal.start();
+
+        CountDownLatch latch = new CountDownLatch(10);
+
+        for (int i = 0; i < 10; i++) {
+            long ledgerId = 1;
+            long entryId = i;
+
+            journal.logAddEntry(ledgerId, entryId, DATA, false,
+                    (rc, ledgerId1, entryId1, addr, ctx) -> latch.countDown(),
+                    null);
+        }
+
+        latch.await();
+
+        verify(mlc, times(10)).reserveMemory(DATA.readableBytes());
+        verify(mlc, times(10)).releaseMemory(DATA.readableBytes());
+
+        journal.shutdown();
+    }
+}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index b400899..897ea33 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -358,6 +358,11 @@
 # Set the size of the journal queue.
 # journalQueueSize=10000
 
+# Set the max amount of memory that can be used by the journal.
+# If empty, this will be set to use 5% of available direct memory
+# Setting it to 0, it will disable the max memory control for the journal.
+# journalMaxMemorySizeMb=
+
 # Set PageCache flush interval (millisecond) when journalSyncData disabled
 # journalPageCacheFlushIntervalMSec = 1000