blob: 8dc9a5e4dfa7628a5dd6ca3550b739595a2e54b3 [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.DefaultCodec;
/**
* Implementation of {@link HLog.Writer} that delegates to
* SequenceFile.Writer.
*/
public class SequenceFileLogWriter implements HLog.Writer {
private final Log LOG = LogFactory.getLog(this.getClass());
// The sequence file we delegate to.
private SequenceFile.Writer writer;
// The dfsclient out stream gotten made accessible or null if not available.
private OutputStream dfsClient_out;
// The syncFs method from hdfs-200 or null if not available.
private Method syncFs;
private Class<? extends HLogKey> keyClass;
/**
* Default constructor.
*/
public SequenceFileLogWriter() {
super();
}
/**
* This constructor allows a specific HLogKey implementation to override that
* which would otherwise be chosen via configuration property.
*
* @param keyClass
*/
public SequenceFileLogWriter(Class<? extends HLogKey> keyClass) {
this.keyClass = keyClass;
}
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
if (null == keyClass) {
keyClass = HLog.getKeyClass(conf);
}
// Create a SF.Writer instance.
this.writer = SequenceFile.createWriter(fs, conf, path,
keyClass, WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),
conf.getLong("hbase.regionserver.hlog.blocksize",
fs.getDefaultBlockSize()),
SequenceFile.CompressionType.NONE,
new DefaultCodec(),
null,
new Metadata());
// Get at the private FSDataOutputStream inside in SequenceFile so we can
// call sync on it. Make it accessible. Stash it aside for call up in
// the sync method.
final Field fields [] = this.writer.getClass().getDeclaredFields();
final String fieldName = "out";
for (int i = 0; i < fields.length; ++i) {
if (fieldName.equals(fields[i].getName())) {
try {
// Make the 'out' field up in SF.Writer accessible.
fields[i].setAccessible(true);
FSDataOutputStream out =
(FSDataOutputStream)fields[i].get(this.writer);
this.dfsClient_out = out.getWrappedStream();
break;
} catch (IllegalAccessException ex) {
throw new IOException("Accessing " + fieldName, ex);
}
}
}
// Now do dirty work to see if syncFs is available.
// Test if syncfs is available.
Method m = null;
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
// function pointer to writer.syncFs()
m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
} catch (SecurityException e) {
throw new IOException("Failed test for syncfs", e);
} catch (NoSuchMethodException e) {
// Not available
}
}
this.syncFs = m;
LOG.info((this.syncFs != null)?
"Using syncFs -- HDFS-200":
("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append));
}
@Override
public void append(HLog.Entry entry) throws IOException {
this.writer.append(entry.getKey(), entry.getEdit());
}
@Override
public void close() throws IOException {
this.writer.close();
}
@Override
public void sync() throws IOException {
if (this.syncFs != null) {
try {
this.syncFs.invoke(this.writer, HLog.NO_ARGS);
} catch (Exception e) {
throw new IOException("Reflection", e);
}
}
}
@Override
public long getLength() throws IOException {
return this.writer.getLength();
}
/**
* @return The dfsclient out stream up inside SF.Writer made accessible, or
* null if not available.
*/
public OutputStream getDFSCOutputStream() {
return this.dfsClient_out;
}
}