Abstract the FileChannel in the JournalChannel
### Motivation
Make the FileChannel in the JournalChannel can use different implement.
We found we can use [pmemstore](https://github.com/pmem/pcj)
as the JournalChannel read from. So abstract the FileChannel in the
JournnalChannel to make us can have a different implementation.
### Changes
- Add interface for supporting implement different FileChannel
This PR doesn't introduce any new things in the code. So make sure it can pass the CI
Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Hang Chen <chenhang@apache.org>, Nicolò Boschi <boschi1997@gmail.com>, Andrey Yegorov <None>
This closes #2742 from zymap/pmem-provider
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java
new file mode 100644
index 0000000..6ab77bb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * A FileChannel for the JournalChannel read and write, we can use this interface to extend the FileChannel
+ * which we use in the JournalChannel.
+ */
+interface BookieFileChannel {
+
+ /**
+ * An interface for get the FileChannel from the provider.
+ * @return
+ */
+ FileChannel getFileChannel() throws FileNotFoundException, IOException;
+
+ /**
+ * Check the given file if exists.
+ *
+ * @param file
+ * @return
+ */
+ boolean fileExists(File file);
+
+ /**
+ * Get the file descriptor of the opened file.
+ *
+ * @return
+ * @throws IOException
+ */
+ FileDescriptor getFD() throws IOException;
+
+ /**
+ * Close file channel and release all resources.
+ */
+ void close() throws IOException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannel.java
new file mode 100644
index 0000000..2829685
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannel.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+class DefaultFileChannel implements BookieFileChannel {
+ private final File file;
+ private RandomAccessFile randomAccessFile;
+ private final ServerConfiguration configuration;
+
+ DefaultFileChannel(File file, ServerConfiguration serverConfiguration) throws IOException {
+ this.file = file;
+ this.configuration = serverConfiguration;
+ }
+
+ @Override
+ public FileChannel getFileChannel() throws FileNotFoundException {
+ synchronized (this) {
+ if (randomAccessFile == null) {
+ randomAccessFile = new RandomAccessFile(file, "rw");
+ }
+ return randomAccessFile.getChannel();
+ }
+ }
+
+ @Override
+ public boolean fileExists(File file) {
+ return file.exists();
+ }
+
+ @Override
+ public FileDescriptor getFD() throws IOException {
+ synchronized (this) {
+ if (randomAccessFile == null) {
+ throw new IOException("randomAccessFile is null, please initialize it by calling getFileChannel");
+ }
+ return randomAccessFile.getFD();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (this) {
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
+ }
+ }
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java
new file mode 100644
index 0000000..e9444a7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * A wrapper of FileChannel.
+ */
+public class DefaultFileChannelProvider implements FileChannelProvider{
+ @Override
+ public BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException {
+ return new DefaultFileChannel(file, configuration);
+ }
+
+ @Override
+ public void close(BookieFileChannel bookieFileChannel) throws IOException {
+ bookieFileChannel.close();
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java
new file mode 100644
index 0000000..b1c5548
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * An interface of the FileChannelProvider.
+ */
+public interface FileChannelProvider extends Closeable {
+ /**
+ *
+ * @param providerClassName Provided class name for file channel.
+ * @return FileChannelProvider. A file channel provider loaded from providerClassName
+ * @throws IOException Possible IOException.
+ */
+ static FileChannelProvider newProvider(String providerClassName) throws IOException {
+ try {
+ Class<?> providerClass = Class.forName(providerClassName);
+ Object obj = providerClass.getConstructor().newInstance();
+ return (FileChannelProvider) obj;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Get the BookieFileChannel with the given file and configuration.
+ *
+ * @param file File path related to bookie.
+ * @param configuration Server configuration.
+ * @return BookieFileChannel related to file parameter.
+ * @throws IOException Possible IOException.
+ */
+ BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException;
+
+ /**
+ * Close bookieFileChannel.
+ * @param bookieFileChannel The bookieFileChannel to be closed.
+ * @throws IOException Possible IOException.
+ */
+ void close(BookieFileChannel bookieFileChannel) throws IOException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1d35b32..489974c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -782,10 +782,10 @@
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
- recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
+ recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
- journalPos);
+ journalPos, conf);
}
int journalVersion = recLog.getFormatVersion();
try {
@@ -960,7 +960,7 @@
journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache,
- journalFormatVersionToWrite, getBufferedChannelBuilder());
+ journalFormatVersionToWrite, getBufferedChannelBuilder(), conf);
journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 5434d16..abaef88 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -33,6 +33,7 @@
import java.nio.channels.FileChannel;
import java.util.Arrays;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.NativeIO;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
@@ -45,7 +46,9 @@
class JournalChannel implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JournalChannel.class);
- final RandomAccessFile randomAccessFile;
+ static final long MB = 1024 * 1024L;
+ final FileChannelProvider fileChannelProvider;
+ final BookieFileChannel channel;
final int fd;
final FileChannel fc;
final BufferedChannel bc;
@@ -56,7 +59,7 @@
static final int SECTOR_SIZE = 512;
private static final int START_OF_FILE = -12345;
- private static long cacheDropLagBytes = 8 * 1024 * 1024;
+ private static long cacheDropLagBytes = 8 * MB;
// No header
static final int V1 = 1;
@@ -85,37 +88,41 @@
// The position of the file channel's last drop position
private long lastDropPosition = 0L;
+ final ServerConfiguration configuration;
+
// Mostly used by tests
JournalChannel(File journalDirectory, long logId) throws IOException {
- this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE);
+ this(journalDirectory, logId, 4 * MB, 65536, START_OF_FILE, new ServerConfiguration());
}
// Open journal for scanning starting from the first record in journal.
- JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
- this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
+ JournalChannel(File journalDirectory, long logId,
+ long preAllocSize, int writeBufferSize, ServerConfiguration conf) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, conf);
}
// Open journal for scanning starting from given position.
JournalChannel(File journalDirectory, long logId,
- long preAllocSize, int writeBufferSize, long position) throws IOException {
+ long preAllocSize, int writeBufferSize, long position, ServerConfiguration conf) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
- position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
+ position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER, conf);
}
// Open journal to write
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
- boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
- this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
- fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
+ boolean fRemoveFromPageCache, int formatVersionToWrite,
+ ServerConfiguration configuration) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, fRemoveFromPageCache,
+ formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER, configuration);
}
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
boolean fRemoveFromPageCache, int formatVersionToWrite,
- Journal.BufferedChannelBuilder bcBuilder) throws IOException {
+ Journal.BufferedChannelBuilder bcBuilder, ServerConfiguration configuration) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
- START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
+ START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder, configuration);
}
/**
@@ -143,27 +150,31 @@
private JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
long position, boolean fRemoveFromPageCache,
- int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
+ int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder,
+ ServerConfiguration configuration) throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
this.fRemoveFromPageCache = fRemoveFromPageCache;
+ this.configuration = configuration;
+
File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
+ fileChannelProvider = FileChannelProvider.newProvider(configuration.getJournalChannelProvider());
+ channel = fileChannelProvider.open(fn, configuration);
if (formatVersionToWrite < V4) {
throw new IOException("Invalid journal format to write : version = " + formatVersionToWrite);
}
LOG.info("Opening journal {}", fn);
- if (!fn.exists()) { // new file, write version
+ if (!channel.fileExists(fn)) { // new file, write version
if (!fn.createNewFile()) {
LOG.error("Journal file {}, that shouldn't exist, already exists. "
+ " is there another bookie process running?", fn);
throw new IOException("File " + fn
+ " suddenly appeared, is another bookie process running?");
}
- randomAccessFile = new RandomAccessFile(fn, "rw");
- fc = openFileChannel(randomAccessFile);
+ fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;
int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
@@ -180,8 +191,7 @@
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
} else { // open an existing file
- randomAccessFile = new RandomAccessFile(fn, "r");
- fc = openFileChannel(randomAccessFile);
+ fc = channel.getFileChannel();
bc = null; // readonly
ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE);
@@ -231,7 +241,7 @@
}
}
if (fRemoveFromPageCache) {
- this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
+ this.fd = NativeIO.getSysFileDescriptor(channel.getFD());
} else {
this.fd = -1;
}
@@ -266,6 +276,9 @@
if (bc != null) {
bc.close();
}
+ if (fileChannelProvider != null) {
+ fileChannelProvider.close();
+ }
}
public void forceWrite(boolean forceMetadata) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 00ef028..dbc94bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -27,6 +27,7 @@
// CHECKSTYLE.ON: IllegalImport
import java.io.File;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.FileChannelProvider;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
@@ -146,6 +147,7 @@
protected static final String JOURNAL_QUEUE_SIZE = "journalQueueSize";
protected static final String JOURNAL_MAX_MEMORY_SIZE_MB = "journalMaxMemorySizeMb";
protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
+ protected static final String JOURNAL_CHANNEL_PROVIDER = "journalChannelProvider";
// backpressure control
protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
@@ -869,6 +871,26 @@
}
/**
+ * Set JournalChannelProvider classname.
+ * @param journalChannelProvider
+ * The JournalChannelProvider classname. The class must implements {@link FileChannelProvider} and
+ * no args constructor is needed.
+ * @return
+ */
+ public ServerConfiguration setJournalChannelProvider(String journalChannelProvider) {
+ this.setProperty(JOURNAL_CHANNEL_PROVIDER, journalChannelProvider);
+ return this;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public String getJournalChannelProvider() {
+ return this.getString(JOURNAL_CHANNEL_PROVIDER, "org.apache.bookkeeper.bookie.DefaultFileChannelProvider");
+ }
+
+ /**
* Get max number of adds in progress. 0 == unlimited.
*
* @return Max number of adds in progress.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 984b085..6d0aaba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -63,7 +63,7 @@
* Test the bookie journal.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({JournalChannel.class, Journal.class})
+@PrepareForTest({JournalChannel.class, Journal.class, DefaultFileChannel.class})
@Slf4j
public class BookieJournalForceTest {
@@ -379,4 +379,26 @@
journal.shutdown();
}
+ @Test
+ public void testFileChannelProvider() throws Exception {
+ File bookieFileDirectory = tempDir.newFile();
+ ServerConfiguration config = TestBKConfiguration.newServerConfiguration();
+
+ DefaultFileChannel defaultFileChannel = spy(new DefaultFileChannel(bookieFileDirectory, config));
+
+ FileChannelProvider provider = spy(DefaultFileChannelProvider.class);
+ when(provider.open(bookieFileDirectory, config)).thenReturn(defaultFileChannel);
+ log.info("Journal Channel Provider: " + config.getJournalChannelProvider());
+ // Open should return spied DefaultFileChannel here.
+ BookieFileChannel bookieFileChannel = provider.open(bookieFileDirectory, config);
+ bookieFileChannel.getFileChannel();
+ verify(defaultFileChannel, times (1)).getFileChannel();
+ bookieFileChannel.getFD();
+ verify(defaultFileChannel, times (1)).getFD();
+ bookieFileChannel.fileExists(bookieFileDirectory);
+ verify(defaultFileChannel, times (1)).fileExists(bookieFileDirectory);
+ provider.close(bookieFileChannel);
+ verify(defaultFileChannel, times (1)).close();
+ }
+
}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 575d8a4..53e4135 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -377,6 +377,9 @@
# Set PageCache flush interval (millisecond) when journalSyncData disabled
# journalPageCacheFlushIntervalMSec = 1000
+# Set the Channel Provider for journal.
+# The default value is
+# journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider
#############################################################################
## Ledger storage settings
#############################################################################