blob: 901c5c0a89e5fe5d88385012585949d5ac0bf4d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* An implementation of the abstract class {@link EditLogOutputStream}, which
* stores edits in a local file.
*/
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
private static boolean shouldSkipFsyncForTests = false;
static {
fill.position(0);
for (int i = 0; i < fill.capacity(); i++) {
fill.put(FSEditLogOpCodes.OP_INVALID.getOpCode());
}
}
/**
* Creates output buffers and file object.
*
* @param name
* File name to store edit log
* @param size
* Size of flush buffer
* @throws IOException
*/
public EditLogFileOutputStream(File name, int size) throws IOException {
super();
file = name;
doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
fc.position(fc.size());
}
@Override
public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
}
/**
* Write a transaction to the stream. The serialization format is:
* <ul>
* <li>the opcode (byte)</li>
* <li>the transaction id (long)</li>
* <li>the actual Writables for the transaction</li>
* </ul>
* */
@Override
public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
doubleBuf.writeRaw(bytes, offset, length);
}
/**
* Create empty edits logs file.
*/
@Override
public void create() throws IOException {
fc.truncate(0);
fc.position(0);
writeHeader(doubleBuf.getCurrentBuf());
setReadyToFlush();
flush();
}
/**
* Write header information for this EditLogFileOutputStream to the provided
* DataOutputSream.
*
* @param out the output stream to write the header to.
* @throws IOException in the event of error writing to the stream.
*/
@VisibleForTesting
public static void writeHeader(DataOutputStream out) throws IOException {
out.writeInt(HdfsConstants.LAYOUT_VERSION);
}
@Override
public void close() throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
try {
// close should have been called after all pending transactions
// have been flushed & synced.
// if already closed, just skip
if (doubleBuf != null) {
doubleBuf.close();
doubleBuf = null;
}
// remove any preallocated padding bytes from the transaction log.
if (fc != null && fc.isOpen()) {
fc.truncate(fc.position());
fc.close();
fc = null;
}
fp.close();
fp = null;
} finally {
IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
doubleBuf = null;
fc = null;
fp = null;
}
fp = null;
}
@Override
public void abort() throws IOException {
if (fp == null) {
return;
}
IOUtils.cleanup(LOG, fp);
fp = null;
}
/**
* All data that has been written to the stream so far will be flushed. New
* data can be still written to the stream while flushing is performed.
*/
@Override
public void setReadyToFlush() throws IOException {
doubleBuf.setReadyToFlush();
}
/**
* Flush ready buffer to persistent store. currentBuffer is not flushed as it
* accumulates new log records while readyBuffer will be flushed and synced.
*/
@Override
public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
if (doubleBuf.isFlushed()) {
LOG.info("Nothing to flush");
return;
}
preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp);
if (durable && !shouldSkipFsyncForTests) {
fc.force(false); // metadata updates not needed
}
}
/**
* @return true if the number of buffered data exceeds the intial buffer size
*/
@Override
public boolean shouldForceSync() {
return doubleBuf.shouldForceSync();
}
private void preallocate() throws IOException {
long position = fc.position();
long size = fc.size();
int bufSize = doubleBuf.getReadyBuf().getLength();
long need = bufSize - (size - position);
if (need <= 0) {
return;
}
long oldSize = size;
long total = 0;
long fillCapacity = fill.capacity();
while (need > 0) {
fill.position(0);
IOUtils.writeFully(fc, fill, size);
need -= fillCapacity;
size += fillCapacity;
total += fillCapacity;
}
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Preallocated " + total + " bytes at the end of " +
"the edit log (offset " + oldSize + ")");
}
}
/**
* Returns the file associated with this stream.
*/
File getFile() {
return file;
}
@Override
public String toString() {
return "EditLogFileOutputStream(" + file + ")";
}
/**
* @return true if this stream is currently open.
*/
public boolean isOpen() {
return fp != null;
}
@VisibleForTesting
public void setFileChannelForTesting(FileChannel fc) {
this.fc = fc;
}
@VisibleForTesting
public FileChannel getFileChannelForTesting() {
return fc;
}
/**
* For the purposes of unit tests, we don't need to actually
* write durably to disk. So, we can skip the fsync() calls
* for a speed improvement.
* @param skip true if fsync should <em>not</em> be called
*/
@VisibleForTesting
public static void setShouldSkipFsyncForTesting(boolean skip) {
shouldSkipFsyncForTests = skip;
}
}