blob: 1471ddfc1f1c55a9600b5e4595b685345622a3d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
import com.google.common.collect.Maps;
/**
* Manage the writes and responds asynchronously.
*/
public class WriteManager {
public static final Log LOG = LogFactory.getLog(WriteManager.class);
private final Configuration config;
private final IdUserGroup iug;
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
private AsyncDataService asyncDataService;
private boolean asyncDataServiceStarted = false;
private final StreamMonitor streamMonitor;
/**
* The time limit to wait for accumulate reordered sequential writes to the
* same file before the write is considered done.
*/
private long streamTimeout;
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx);
if (LOG.isDebugEnabled()) {
LOG.debug("After add the new stream " + h.getFileId()
+ ", the stream number:" + openFileMap.size());
}
}
WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug;
this.config = config;
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
DEFAULT_STREAM_TIMEOUT);
LOG.info("Stream timeout is " + streamTimeout + "ms.");
if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
LOG.info("Reset stream timeout to minimum value "
+ MINIMIUM_STREAM_TIMEOUT + "ms.");
streamTimeout = MINIMIUM_STREAM_TIMEOUT;
}
this.streamMonitor = new StreamMonitor();
}
private void startAsyncDataSerivce() {
streamMonitor.start();
this.asyncDataService = new AsyncDataService();
asyncDataServiceStarted = true;
}
private void shutdownAsyncDataService() {
asyncDataService.shutdown();
asyncDataServiceStarted = false;
streamMonitor.interrupt();
}
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
int xid, Nfs3FileAttributes preOpAttr) throws IOException {
// First write request starts the async data service
if (!asyncDataServiceStarted) {
startAsyncDataSerivce();
}
long offset = request.getOffset();
int count = request.getCount();
WriteStableHow stableHow = request.getStableHow();
byte[] data = request.getData().array();
if (data.length < count) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
return;
}
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
}
// Check if there is a stream to write
FileHandle fileHandle = request.getHandle();
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
HdfsDataOutputStream fos = null;
Nfs3FileAttributes latestAttr = null;
try {
int bufferSize = config.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (IOException e) {
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
if (fos != null) {
fos.close();
}
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
preOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
return;
}
// Add open stream
String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ fileHandle.getFileId());
addOpenFileStream(fileHandle, openFileCtx);
if (LOG.isDebugEnabled()) {
LOG.debug("opened stream for file:" + fileHandle.getFileId());
}
}
// Add write into the async job queue
openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
asyncDataService, iug);
// Block stable write
if (request.getStableHow() != WriteStableHow.UNSTABLE) {
if (handleCommit(fileHandle, offset + count)) {
Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
} else {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
}
}
return;
}
boolean handleCommit(FileHandle fileHandle, long commitOffset) {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return true;
}
long timeout = 30 * 1000; // 30 seconds
long startCommit = System.currentTimeMillis();
while (true) {
int ret = openFileCtx.checkCommit(commitOffset);
if (ret == OpenFileCtx.COMMIT_FINISHED) {
// Committed
return true;
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return true;
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
LOG.info("Inactive stream with pending writes, fileId="
+ fileHandle.getFileId() + " commitOffset=" + commitOffset);
return false;
}
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
if (ret == OpenFileCtx.COMMIT_ERROR) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
}
if (System.currentTimeMillis() - startCommit > timeout) {
// Commit took too long, return error
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return false;
}
}// while
}
/**
* If the file is in cache, update the size based on the cached data size
*/
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle,
IdUserGroup iug) throws IOException {
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if (attr != null) {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
}
}
return attr;
}
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
String fileName) throws IOException {
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
OpenFileCtx openFileCtx = openFileMap
.get(new FileHandle(attr.getFileId()));
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
}
}
return attr;
}
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
*/
class StreamMonitor extends Daemon {
private int rotation = 5 * 1000; // 5 seconds
private long lastWakeupTime = 0;
@Override
public void run() {
while (true) {
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
OpenFileCtx ctx = pairs.getValue();
if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
it.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("After remove stream " + pairs.getKey().getFileId()
+ ", the stream number:" + openFileMap.size());
}
}
}
// Check if it can sleep
try {
long workedTime = System.currentTimeMillis() - lastWakeupTime;
if (workedTime < rotation) {
if (LOG.isTraceEnabled()) {
LOG.trace("StreamMonitor can still have a sleep:"
+ ((rotation - workedTime) / 1000));
}
Thread.sleep(rotation - workedTime);
}
lastWakeupTime = System.currentTimeMillis();
} catch (InterruptedException e) {
LOG.info("StreamMonitor got interrupted");
return;
}
}
}
}
}