blob: 20a3d855b4a809d81b08e36e96140d88c8592c2d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie.storage.directentrylogger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.commons.lang3.SystemUtils;
class DirectWriter implements LogWriter {
final NativeIO nativeIO;
final int fd;
final int id;
final String filename;
final BufferPool bufferPool;
final ExecutorService writeExecutor;
final Object bufferLock = new Object();
final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
final Slogger slog;
Buffer nativeBuffer;
long offset;
private static volatile boolean useFallocate = true;
DirectWriter(int id,
String filename,
long maxFileSize,
ExecutorService writeExecutor,
BufferPool bufferPool,
NativeIO nativeIO, Slogger slog) throws IOException {
checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
this.id = id;
this.filename = filename;
this.writeExecutor = writeExecutor;
this.nativeIO = nativeIO;
this.slog = slog.ctx(DirectWriter.class);
offset = 0;
try {
fd = nativeIO.open(filename,
NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
00644);
checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
} catch (NativeIOException ne) {
throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
.kv("errno", ne.getErrno()).toString(), ne);
}
if (useFallocate) {
if (!SystemUtils.IS_OS_LINUX) {
disableUseFallocate();
this.slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
} else {
try {
int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
} catch (NativeIOException ex) {
// fallocate(2) is not supported on all filesystems. Since this is an optimization, disable
// subsequent usage instead of failing the operation.
disableUseFallocate();
this.slog.kv("message", ex.getMessage())
.kv("file", filename)
.kv("errno", ex.getErrno())
.warn(Events.FALLOCATE_NOT_AVAILABLE);
}
}
}
this.bufferPool = bufferPool;
this.nativeBuffer = bufferPool.acquire();
}
private static void disableUseFallocate() {
DirectWriter.useFallocate = false;
}
@Override
public int logId() {
return id;
}
@Override
public void writeAt(long offset, ByteBuf buf) throws IOException {
checkArgument(Buffer.isAligned(offset),
"Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
checkArgument(Buffer.isAligned(buf.readableBytes()),
"Buffer must write multiple of alignment bytes (%d), %d is not",
Buffer.ALIGNMENT, buf.readableBytes());
int bytesToWrite = buf.readableBytes();
if (bytesToWrite <= 0) {
return;
}
Buffer tmpBuffer = bufferPool.acquire();
tmpBuffer.reset();
tmpBuffer.writeByteBuf(buf);
Future<?> f = writeExecutor.submit(() -> {
writeByteBuf(tmpBuffer, bytesToWrite, offset);
return null;
});
addOutstandingWrite(f);
}
private void writeByteBuf(Buffer buffer, int bytesToWrite, long offsetToWrite) throws IOException{
try {
if (bytesToWrite <= 0) {
return;
}
int ret = nativeIO.pwrite(fd, buffer.pointer(), bytesToWrite, offsetToWrite);
if (ret != bytesToWrite) {
throw new IOException(exMsg("Incomplete write")
.kv("filename", filename)
.kv("pointer", buffer.pointer())
.kv("offset", offsetToWrite)
.kv("writeSize", bytesToWrite)
.kv("bytesWritten", ret)
.toString());
}
} catch (NativeIOException ne) {
throw new IOException(exMsg("Write error")
.kv("filename", filename)
.kv("offset", offsetToWrite)
.kv("writeSize", bytesToWrite)
.kv("pointer", buffer.pointer())
.kv("errno", ne.getErrno())
.toString());
} finally {
bufferPool.release(buffer);
}
}
@Override
public int writeDelimited(ByteBuf buf) throws IOException {
synchronized (bufferLock) {
if (!nativeBuffer.hasSpace(serializedSize(buf))) {
flushBuffer();
}
int readable = buf.readableBytes();
long bufferPosition = position() + Integer.BYTES;
if (bufferPosition > Integer.MAX_VALUE) {
throw new IOException(exMsg("Cannot write past max int")
.kv("filename", filename)
.kv("writeSize", readable)
.kv("position", bufferPosition)
.toString());
}
nativeBuffer.writeInt(readable);
nativeBuffer.writeByteBuf(buf);
return (int) bufferPosition;
}
}
@Override
public void position(long offset) throws IOException {
synchronized (bufferLock) {
if (nativeBuffer != null && nativeBuffer.position() > 0) {
flushBuffer();
}
if ((offset % Buffer.ALIGNMENT) != 0) {
throw new IOException(exMsg("offset must be multiple of alignment")
.kv("offset", offset)
.kv("alignment", Buffer.ALIGNMENT)
.toString());
}
this.offset = offset;
}
}
@Override
public long position() {
synchronized (bufferLock) {
return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
}
}
@Override
public void flush() throws IOException {
flushBuffer();
waitForOutstandingWrites();
try {
int ret = nativeIO.fsync(fd);
checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
} catch (NativeIOException ne) {
throw new IOException(exMsg(ne.getMessage())
.kv("file", filename)
.kv("errno", ne.getErrno()).toString());
}
}
@Override
public void close() throws IOException {
synchronized (bufferLock) {
if (nativeBuffer != null && nativeBuffer.position() > 0) {
flush();
}
}
try {
int ret = nativeIO.close(fd);
checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
} catch (NativeIOException ne) {
throw new IOException(exMsg(ne.getMessage())
.kv("file", filename)
.kv("errno", ne.getErrno()).toString());
} finally {
synchronized (bufferLock) {
bufferPool.release(nativeBuffer);
nativeBuffer = null;
}
}
}
private void addOutstandingWrite(Future<?> toAdd) throws IOException {
synchronized (outstandingWrites) {
outstandingWrites.add(toAdd);
Iterator<Future<?>> iter = outstandingWrites.iterator();
while (iter.hasNext()) { // clear out completed futures
Future<?> f = iter.next();
if (f.isDone()) {
waitForFuture(f);
iter.remove();
} else {
break;
}
}
}
}
private void waitForOutstandingWrites() throws IOException {
synchronized (outstandingWrites) {
Iterator<Future<?>> iter = outstandingWrites.iterator();
while (iter.hasNext()) { // clear out completed futures
Future<?> f = iter.next();
waitForFuture(f);
iter.remove();
}
}
}
private void waitForFuture(Future<?> f) throws IOException {
try {
f.get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException(ie);
} catch (Throwable t) {
if (t.getCause() instanceof IOException) {
throw (IOException) t.getCause();
} else {
throw new IOException(t);
}
}
}
private void flushBuffer() throws IOException {
synchronized (bufferLock) {
if (this.nativeBuffer != null) {
int bytesToWrite = this.nativeBuffer.padToAlignment();
if (bytesToWrite == 0) {
return;
}
Buffer bufferToFlush = this.nativeBuffer;
this.nativeBuffer = null;
long offsetToWrite = offset;
offset += bytesToWrite;
Future<?> f = writeExecutor.submit(() -> {
writeByteBuf(bufferToFlush, bytesToWrite, offsetToWrite);
return null;
});
addOutstandingWrite(f);
// must acquire after triggering the write
// otherwise it could try to acquire a buffer without kicking off
// a subroutine that will free another
this.nativeBuffer = bufferPool.acquire();
}
}
}
@Override
public int serializedSize(ByteBuf buf) {
return buf.readableBytes() + Integer.BYTES;
}
}