blob: 6c87a8c8c7637579c59dfbf6e8d4676c238176f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
import org.apache.hadoop.io.BytesWritable.Comparator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
* synchronized by its member lock.
*/
class OpenFileCtx {
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
// Pending writes water mark for dump, 1MB
private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
public final static int COMMIT_FINISHED = 0;
public final static int COMMIT_WAIT = 1;
public final static int COMMIT_INACTIVE_CTX = 2;
public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
public final static int COMMIT_ERROR = 4;
// The stream status. False means the stream is closed.
private volatile boolean activeState;
// The stream write-back status. True means one thread is doing write back.
private volatile boolean asyncStatus;
/**
* The current offset of the file in HDFS. All the content before this offset
* has been written back to HDFS.
*/
private AtomicLong nextOffset;
private final HdfsDataOutputStream fos;
// TODO: make it mutable and update it after each writing back to HDFS
private final Nfs3FileAttributes latestAttr;
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
// The last write, commit request or write-back event. Updating time to keep
// output steam alive.
private long lastAccessTime;
private volatile boolean enabledDump;
private FileOutputStream dumpOut;
private AtomicLong nonSequentialWriteInMemory;
private RandomAccessFile raf;
private final String dumpFilePath;
private Daemon dumpThread;
private void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
}
private boolean checkStreamTimeout(long streamTimeout) {
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
}
public long getNextOffset() {
return nextOffset.get();
}
// Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) {
long newValue = nonSequentialWriteInMemory.addAndGet(count);
if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+ newValue);
}
Preconditions.checkState(newValue >= 0,
"nonSequentialWriteInMemory is negative after update with count "
+ count);
return newValue;
}
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath) {
this.fos = fos;
this.latestAttr = latestAttr;
// We use the ReverseComparatorOnMin as the comparator of the map. In this
// way, we first dump the data with larger offset. In the meanwhile, we
// retrieve the last element to write back to HDFS.
pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
OffsetRange.ReverseComparatorOnMin);
updateLastAccessTime();
activeState = true;
asyncStatus = false;
dumpOut = null;
raf = null;
nonSequentialWriteInMemory = new AtomicLong(0);
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
nextOffset = new AtomicLong();
nextOffset.set(latestAttr.getSize());
try {
assert(nextOffset.get() == this.fos.getPos());
} catch (IOException e) {}
dumpThread = null;
}
public Nfs3FileAttributes getLatestAttr() {
return latestAttr;
}
// Get flushed offset. Note that flushed data may not be persisted.
private long getFlushedOffset() throws IOException {
return fos.getPos();
}
// Check if need to dump the new writes
private void checkDump() {
if (!enabledDump) {
if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled.");
}
return;
}
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
return;
}
// wake up the dumper thread to dump the data
synchronized (this) {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asking dumper to dump...");
}
if (dumpThread == null) {
dumpThread = new Daemon(new Dumper());
dumpThread.start();
} else {
this.notifyAll();
}
}
}
}
class Dumper implements Runnable {
/** Dump data into a file */
private void dump() {
// Create dump outputstream for the first time
if (dumpOut == null) {
LOG.info("Create dump file:" + dumpFilePath);
File dumpFile = new File(dumpFilePath);
try {
synchronized (this) {
// check if alive again
Preconditions.checkState(dumpFile.createNewFile(),
"The dump file should not exist: %s", dumpFilePath);
dumpOut = new FileOutputStream(dumpFile);
}
} catch (IOException e) {
LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
enabledDump = false;
if (dumpOut != null) {
try {
dumpOut.close();
} catch (IOException e1) {
LOG.error("Can't close dump stream " + dumpFilePath, e);
}
}
return;
}
}
// Get raf for the first dump
if (raf == null) {
try {
raf = new RandomAccessFile(dumpFilePath, "r");
} catch (FileNotFoundException e) {
LOG.error("Can't get random access to file " + dumpFilePath);
// Disable dump
enabledDump = false;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+ nonSequentialWriteInMemory.get());
}
Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
while (activeState && it.hasNext()
&& nonSequentialWriteInMemory.get() > 0) {
OffsetRange key = it.next();
WriteCtx writeCtx = pendingWrites.get(key);
if (writeCtx == null) {
// This write was just deleted
continue;
}
try {
long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
if (dumpedDataSize > 0) {
updateNonSequentialWriteInMemory(-dumpedDataSize);
}
} catch (IOException e) {
LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+ " OpenFileCtx state:" + activeState);
// Disable dump
enabledDump = false;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("After dump, nonSequentialWriteInMemory == "
+ nonSequentialWriteInMemory.get());
}
}
@Override
public void run() {
while (activeState && enabledDump) {
try {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
dump();
}
synchronized (OpenFileCtx.this) {
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
try {
OpenFileCtx.this.wait();
if (LOG.isDebugEnabled()) {
LOG.debug("Dumper woke up");
}
} catch (InterruptedException e) {
LOG.info("Dumper is interrupted, dumpFilePath= "
+ OpenFileCtx.this.dumpFilePath);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
+ " enabledDump: " + enabledDump);
}
} catch (Throwable t) {
LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+ OpenFileCtx.this.dumpFilePath);
}
}
}
}
private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
Channel channel, int xid) {
OffsetRange range = new OffsetRange(request.getOffset(),
request.getOffset() + request.getCount());
WriteCtx writeCtx = pendingWrites.get(range);
if (writeCtx== null) {
return null;
} else {
if (xid != writeCtx.getXid()) {
LOG.warn("Got a repeated request, same range, with a different xid:"
+ xid + " xid in old request:" + writeCtx.getXid());
//TODO: better handling.
}
return writeCtx;
}
}
public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, AsyncDataService asyncDataService,
IdUserGroup iug) {
if (!activeState) {
LOG.info("OpenFileCtx is inactive, fileId:"
+ request.getHandle().getFileId());
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel,
response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
xid);
} else {
// Update the write time first
updateLastAccessTime();
// Handle repeated write requests (same xid or not).
// If already replied, send reply again. If not replied, drop the
// repeated request.
WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
xid);
if (existantWriteCtx != null) {
if (!existantWriteCtx.getReplied()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Repeated write request which hasn't be served: xid="
+ xid + ", drop it.");
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Repeated write request which is already served: xid="
+ xid + ", resend response.");
}
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, request.getCount(), request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
}
} else {
// not a repeated write request
receivedNewWriteInternal(dfsClient, request, channel, xid,
asyncDataService, iug);
}
}
}
@VisibleForTesting
public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
long offset = request.getOffset();
int count = request.getCount();
long smallerCount = offset + count - cachedOffset;
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+ " current offset %d," + " drop the overlapped section (%d-%d)"
+ " and append new data (%d-%d).", offset, (offset + count - 1),
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ count - 1)));
}
ByteBuffer data = request.getData();
Preconditions.checkState(data.position() == 0,
"The write request data has non-zero position");
data.position((int) (cachedOffset - offset));
Preconditions.checkState(data.limit() - data.position() == smallerCount,
"The write request buffer has wrong limit/position regarding count");
request.setOffset(cachedOffset);
request.setCount((int) smallerCount);
}
/**
* Creates and adds a WriteCtx into the pendingWrites map. This is a
* synchronized method to handle concurrent writes.
*
* @return A non-null {@link WriteCtx} instance if the incoming write
* request's offset >= nextOffset. Otherwise null.
*/
private synchronized WriteCtx addWritesToCache(WRITE3Request request,
Channel channel, int xid) {
long offset = request.getOffset();
int count = request.getCount();
long cachedOffset = nextOffset.get();
int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current offset="
+ cachedOffset);
}
// Handle a special case first
if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
// One Linux client behavior: after a file is closed and reopened to
// write, the client sometimes combines previous written data(could still
// be in kernel buffer) with newly appended data in one write. This is
// usually the first write after file reopened. In this
// case, we log the event and drop the overlapped section.
LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+ " current offset %d," + " drop the overlapped section (%d-%d)"
+ " and append new data (%d-%d).", offset, (offset + count - 1),
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ count - 1)));
if (!pendingWrites.isEmpty()) {
LOG.warn("There are other pending writes, fail this jumbo write");
return null;
}
LOG.warn("Modify this write to write only the appended data");
alterWriteRequest(request, cachedOffset);
// Update local variable
originalCount = count;
offset = request.getOffset();
count = request.getCount();
}
// Fail non-append call
if (offset < cachedOffset) {
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ nextOffset + ")");
return null;
} else {
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
: WriteCtx.DataState.ALLOW_DUMP;
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), originalCount,
request.getStableHow(), request.getData(), channel, xid, false,
dataState);
if (LOG.isDebugEnabled()) {
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+ " and requesed offset=" + offset);
}
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
// update the memory size
updateNonSequentialWriteInMemory(count);
}
// check if there is a WriteCtx with the same range in pendingWrites
WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
if (oldWriteCtx == null) {
addWrite(writeCtx);
} else {
LOG.warn("Got a repeated request, same range, with xid:"
+ writeCtx.getXid());
}
return writeCtx;
}
}
/** Process an overwrite write request */
private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, IdUserGroup iug) {
WccData wccData = new WccData(latestAttr.getWccAttr(), null);
long offset = request.getOffset();
int count = request.getCount();
WriteStableHow stableHow = request.getStableHow();
WRITE3Response response;
long cachedOffset = nextOffset.get();
if (offset + count > cachedOffset) {
LOG.warn("Treat this jumbo write as a real random write, no support.");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Process perfectOverWrite");
}
// TODO: let executor handle perfect overwrite
response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
request.getData().array(),
Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
}
updateLastAccessTime();
Nfs3Utils.writeChannel(channel,
response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
xid);
}
/**
* Check if we can start the write (back to HDFS) now. If there is no hole for
* writing, and there is no other threads writing (i.e., asyncStatus is
* false), start the writing and set asyncStatus to true.
*
* @return True if the new write is sequencial and we can start writing
* (including the case that there is already a thread writing).
*/
private synchronized boolean checkAndStartWrite(
AsyncDataService asyncDataService, WriteCtx writeCtx) {
if (writeCtx.getOffset() == nextOffset.get()) {
if (!asyncStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trigger the write back task. Current nextOffset: "
+ nextOffset.get());
}
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The write back thread is working.");
}
}
return true;
} else {
return false;
}
}
private void receivedNewWriteInternal(DFSClient dfsClient,
WRITE3Request request, Channel channel, int xid,
AsyncDataService asyncDataService, IdUserGroup iug) {
WriteStableHow stableHow = request.getStableHow();
WccAttr preOpAttr = latestAttr.getWccAttr();
int count = request.getCount();
WriteCtx writeCtx = addWritesToCache(request, channel, xid);
if (writeCtx == null) {
// offset < nextOffset
processOverWrite(dfsClient, request, channel, xid, iug);
} else {
// The writes is added to pendingWrites.
// Check and start writing back if necessary
boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
if (!startWriting) {
// offset > nextOffset. check if we need to dump data
checkDump();
// In test, noticed some Linux client sends a batch (e.g., 1MB)
// of reordered writes and won't send more writes until it gets
// responses of the previous batch. So here send response immediately
// for unstable non-sequential write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
if (LOG.isDebugEnabled()) {
LOG.debug("UNSTABLE write request, send response for offset: "
+ writeCtx.getOffset());
}
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils
.writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
xid, new VerifierNone()), xid);
writeCtx.setReplied(true);
}
}
}
}
/**
* Honor 2 kinds of overwrites: 1). support some application like touch(write
* the same content back to change mtime), 2) client somehow sends the same
* write again in a different RPC.
*/
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) {
WRITE3Response response = null;
// Read the content back
byte[] readbuffer = new byte[count];
int readCount = 0;
FSDataInputStream fis = null;
try {
// Sync file data and length to avoid partial read failure
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} catch (ClosedChannelException closedException) {
LOG.info("The FSDataOutputStream has been closed. " +
"Continue processing the perfect overwrite.");
} catch (IOException e) {
LOG.info("hsync failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
try {
fis = new FSDataInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:"
+ readCount);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
} catch (IOException e) {
LOG.info("Read failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
} finally {
IOUtils.cleanup(LOG, fis);
}
// Compare with the request
Comparator comparator = new Comparator();
if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
LOG.info("Perfect overwrite has different content");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
} else {
LOG.info("Perfect overwrite has same content,"
+ " updating the mtime, then return success");
Nfs3FileAttributes postOpAttr = null;
try {
dfsClient.setTimes(path, System.currentTimeMillis(), -1);
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
} catch (IOException e) {
LOG.info("Got error when processing perfect overwrite, path=" + path
+ " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
wccData.setPostOpAttr(postOpAttr);
response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
}
return response;
}
/**
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_ERROR
*/
public int checkCommit(long commitOffset) {
return activeState ? checkCommitInternal(commitOffset)
: COMMIT_INACTIVE_CTX;
}
private int checkCommitInternal(long commitOffset) {
if (commitOffset == 0) {
// Commit whole file
commitOffset = nextOffset.get();
}
long flushed = 0;
try {
flushed = getFlushedOffset();
} catch (IOException e) {
LOG.error("Can't get flushed offset, error:" + e);
return COMMIT_ERROR;
}
if (LOG.isDebugEnabled()) {
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
}
if (flushed < commitOffset) {
// Keep stream active
updateLastAccessTime();
return COMMIT_WAIT;
}
int ret = COMMIT_WAIT;
try {
// Sync file data and length
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// Nothing to do for metadata since attr related change is pass-through
ret = COMMIT_FINISHED;
} catch (ClosedChannelException cce) {
ret = COMMIT_INACTIVE_CTX;
if (pendingWrites.isEmpty()) {
ret = COMMIT_INACTIVE_CTX;
} else {
ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
}
} catch (IOException e) {
LOG.error("Got stream error during data sync:" + e);
// Do nothing. Stream will be closed eventually by StreamMonitor.
ret = COMMIT_ERROR;
}
// Keep stream active
updateLastAccessTime();
return ret;
}
private void addWrite(WriteCtx writeCtx) {
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
// For the offset range (min, max), min is inclusive, and max is exclusive
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
/**
* Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream
*/
public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
+ WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
}
boolean flag = false;
// Check the stream timeout
if (checkStreamTimeout(streamTimeout)) {
if (LOG.isDebugEnabled()) {
LOG.debug("closing stream for fileId:" + fileId);
}
cleanup();
flag = true;
}
return flag;
}
/**
* Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
*
* @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
* offset is larger than nextOffSet.
*/
private synchronized WriteCtx offerNextToWrite() {
if (pendingWrites.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("The asyn write task has no pending writes, fileId: "
+ latestAttr.getFileId());
}
this.asyncStatus = false;
} else {
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
OffsetRange range = lastEntry.getKey();
WriteCtx toWrite = lastEntry.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ nextOffset);
}
long offset = nextOffset.get();
if (range.getMin() > offset) {
if (LOG.isDebugEnabled()) {
LOG.debug("The next sequencial write has not arrived yet");
}
this.asyncStatus = false;
} else if (range.getMin() < offset && range.getMax() > offset) {
// shouldn't happen since we do sync for overlapped concurrent writers
LOG.warn("Got a overlapping write (" + range.getMin() + ","
+ range.getMax() + "), nextOffset=" + offset
+ ". Silently drop it now");
pendingWrites.remove(range);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+ ") from the list");
}
// after writing, remove the WriteCtx from cache
pendingWrites.remove(range);
// update nextOffset
nextOffset.addAndGet(toWrite.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("Change nextOffset to " + nextOffset.get());
}
return toWrite;
}
}
return null;
}
/** Invoked by AsynDataService to write back to HDFS */
void executeWriteBack() {
Preconditions.checkState(asyncStatus,
"The openFileCtx has false async status");
try {
while (activeState) {
WriteCtx toWrite = offerNextToWrite();
if (toWrite != null) {
// Do the write
doSingleWrite(toWrite);
updateLastAccessTime();
} else {
break;
}
}
if (!activeState && LOG.isDebugEnabled()) {
LOG.debug("The openFileCtx is not active anymore, fileId: "
+ +latestAttr.getFileId());
}
} finally {
// make sure we reset asyncStatus to false
asyncStatus = false;
}
}
private void doSingleWrite(final WriteCtx writeCtx) {
Channel channel = writeCtx.getChannel();
int xid = writeCtx.getXid();
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
WriteStableHow stableHow = writeCtx.getStableHow();
FileHandle handle = writeCtx.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
}
try {
// The write is not protected by lock. asyncState is used to make sure
// there is one thread doing write back at any time
writeCtx.writeData(fos);
long flushedOffset = getFlushedOffset();
if (flushedOffset != (offset + count)) {
throw new IOException("output stream is out of sync, pos="
+ flushedOffset + " and nextOffset should be"
+ (offset + count));
}
// Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
synchronized (writeCtx) {
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
updateNonSequentialWriteInMemory(-count);
if (LOG.isDebugEnabled()) {
LOG.debug("After writing " + handle.getFileId() + " at offset "
+ offset + ", updated the memory count, new value:"
+ nonSequentialWriteInMemory.get());
}
}
}
}
if (!writeCtx.getReplied()) {
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+ " instead of real data count:" + count);
count = writeCtx.getOriginalCount();
}
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
}
} catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+ offset + " and length " + count, e);
if (!writeCtx.getReplied()) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
// Keep stream open. Either client retries or SteamMonitor closes it.
}
LOG.info("Clean up open file context for fileId: "
+ latestAttr.getFileid());
cleanup();
}
}
private synchronized void cleanup() {
if (!activeState) {
LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
return;
}
activeState = false;
// stop the dump thread
if (dumpThread != null) {
dumpThread.interrupt();
try {
dumpThread.join(3000);
} catch (InterruptedException e) {
}
}
// Close stream
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
LOG.info("Can't close stream for fileId:" + latestAttr.getFileid()
+ ", error:" + e);
}
// Reply error for pending writes
LOG.info("There are " + pendingWrites.size() + " pending writes.");
WccAttr preOpAttr = latestAttr.getWccAttr();
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
+ "), nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(writeCtx.getChannel(), response
.writeHeaderAndResponse(new XDR(), writeCtx.getXid(),
new VerifierNone()), writeCtx.getXid());
}
}
// Cleanup dump file
if (dumpOut != null) {
try {
dumpOut.close();
} catch (IOException e) {
e.printStackTrace();
}
File dumpFile = new File(dumpFilePath);
if (dumpFile.exists() && !dumpFile.delete()) {
LOG.error("Failed to delete dumpfile: " + dumpFile);
}
}
if (raf != null) {
try {
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}