blob: d22053c318bc964ee497fa7189c323c67a4bf1fd [file] [log] [blame]
/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.commons.logging.*;
import java.io.*;
import java.net.*;
import java.util.*;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
*
* @author Mike Cafarella, Tessa MacDuff
********************************************************/
class DFSClient implements FSConstants {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
ClientProtocol namenode;
String localName;
boolean running = true;
Random r = new Random();
String clientName;
Daemon leaseChecker;
private Configuration conf;
private long defaultBlockSize;
private short defaultReplication;
// required for unknown reason to make WritableFactories work distributed
static { new DFSFileInfo(); }
/**
* A map from name -> DFSOutputStream of files that are currently being
* written by this client.
*/
private TreeMap pendingCreates = new TreeMap();
/**
* A class to track the list of DFS clients, so that they can be closed
* on exit.
* @author Owen O'Malley
*/
private static class ClientFinalizer extends Thread {
private List clients = new ArrayList();
public synchronized void addClient(DFSClient client) {
clients.add(client);
}
public synchronized void run() {
Iterator itr = clients.iterator();
while (itr.hasNext()) {
DFSClient client = (DFSClient) itr.next();
if (client.running) {
try {
client.close();
} catch (IOException ie) {
System.err.println("Error closing client");
ie.printStackTrace();
}
}
}
}
}
// add a cleanup thread
private static ClientFinalizer clientFinalizer = new ClientFinalizer();
static {
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
/**
* Create a new DFSClient connected to the given namenode server.
*/
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {
this.conf = conf;
this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);
try {
this.localName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException uhe) {
this.localName = "";
}
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
} else {
this.clientName = "DFSClient_" + r.nextInt();
}
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);
this.leaseChecker = new Daemon(new LeaseChecker());
this.leaseChecker.start();
}
private void checkOpen() throws IOException {
if (!running) {
IOException result = new IOException("Filesystem closed");
throw result;
}
}
/**
* Close the file system, abadoning all of the leases and files being
* created.
*/
public void close() throws IOException {
// synchronize in here so that we don't need to change the API
synchronized (this) {
checkOpen();
synchronized (pendingCreates) {
Iterator file_itr = pendingCreates.keySet().iterator();
while (file_itr.hasNext()) {
String name = (String) file_itr.next();
try {
namenode.abandonFileInProgress(name, clientName);
} catch (IOException ie) {
System.err.println("Exception abandoning create lock on " + name);
ie.printStackTrace();
}
}
pendingCreates.clear();
}
this.running = false;
try {
leaseChecker.join();
} catch (InterruptedException ie) {
}
}
}
/**
* Get the default block size for this cluster
* @return the default block size in bytes
*/
public long getDefaultBlockSize() {
return defaultBlockSize;
}
public long getBlockSize(Path f) throws IOException {
// if we already know the answer, use it.
if (f instanceof DfsPath) {
return ((DfsPath) f).getBlockSize();
}
int retries = 4;
while (true) {
try {
return namenode.getBlockSize(f.toString());
} catch (IOException ie) {
LOG.info("Problem getting block size: " +
StringUtils.stringifyException(ie));
if (--retries == 0) {
throw ie;
}
}
}
}
public short getDefaultReplication() {
return defaultReplication;
}
/**
* Get hints about the location of the indicated block(s). The
* array returned is as long as there are blocks in the indicated
* range. Each block may have one or more locations.
*/
public String[][] getHints(UTF8 src, long start, long len) throws IOException {
return namenode.getHints(src.toString(), start, len);
}
/**
* Create an input stream that obtains a nodelist from the
* namenode, and then reads from all the right places. Creates
* inner subclass of InputStream that does the right out-of-band
* work.
*/
public FSInputStream open(UTF8 src) throws IOException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(src.toString());
}
/**
* Create a new dfs file and return an output stream for writing into it.
*
* @param src stream name
* @param overwrite do not check for file existence if true
* @return output stream
* @throws IOException
*/
public FSOutputStream create( UTF8 src,
boolean overwrite
) throws IOException {
return create( src, overwrite, defaultReplication, defaultBlockSize);
}
/**
* Create a new dfs file with the specified block replication
* and return an output stream for writing into the file.
*
* @param src stream name
* @param overwrite do not check for file existence if true
* @param replication block replication
* @return output stream
* @throws IOException
*/
public FSOutputStream create( UTF8 src,
boolean overwrite,
short replication,
long blockSize
) throws IOException {
checkOpen();
FSOutputStream result = new DFSOutputStream(src, overwrite,
replication, blockSize);
synchronized (pendingCreates) {
pendingCreates.put(src.toString(), result);
}
return result;
}
/**
* Set replication for an existing file.
*
* @see ClientProtocol#setReplication(String, short)
* @param replication
* @throws IOException
* @return true is successful or false if file does not exist
* @author shv
*/
public boolean setReplication(UTF8 src,
short replication
) throws IOException {
return namenode.setReplication(src.toString(), replication);
}
/**
* Make a direct connection to namenode and manipulate structures
* there.
*/
public boolean rename(UTF8 src, UTF8 dst) throws IOException {
checkOpen();
return namenode.rename(src.toString(), dst.toString());
}
/**
* Make a direct connection to namenode and manipulate structures
* there.
*/
public boolean delete(UTF8 src) throws IOException {
checkOpen();
return namenode.delete(src.toString());
}
/**
*/
public boolean exists(UTF8 src) throws IOException {
checkOpen();
return namenode.exists(src.toString());
}
/**
*/
public boolean isDirectory(UTF8 src) throws IOException {
checkOpen();
return namenode.isDir(src.toString());
}
/**
*/
public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
checkOpen();
return namenode.getListing(src.toString());
}
/**
*/
public long totalRawCapacity() throws IOException {
long rawNums[] = namenode.getStats();
return rawNums[0];
}
/**
*/
public long totalRawUsed() throws IOException {
long rawNums[] = namenode.getStats();
return rawNums[1];
}
public DatanodeInfo[] datanodeReport() throws IOException {
return namenode.getDatanodeReport();
}
/**
*/
public boolean mkdirs(UTF8 src) throws IOException {
checkOpen();
return namenode.mkdirs(src.toString());
}
/**
*/
public void lock(UTF8 src, boolean exclusive) throws IOException {
long start = System.currentTimeMillis();
boolean hasLock = false;
while (! hasLock) {
hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);
if (! hasLock) {
try {
Thread.sleep(400);
if (System.currentTimeMillis() - start > 5000) {
LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
Thread.sleep(2000);
}
} catch (InterruptedException ie) {
}
}
}
}
/**
*
*/
public void release(UTF8 src) throws IOException {
boolean hasReleased = false;
while (! hasReleased) {
hasReleased = namenode.releaseLock(src.toString(), clientName);
if (! hasReleased) {
LOG.info("Could not release. Retrying...");
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
}
}
}
}
/**
* Pick the best node from which to stream the data.
* That's the local one, if available.
*/
private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
if ((nodes == null) ||
(nodes.length - deadNodes.size() < 1)) {
throw new IOException("No live nodes contain current block");
}
DatanodeInfo chosenNode = null;
for (int i = 0; i < nodes.length; i++) {
if (deadNodes.contains(nodes[i])) {
continue;
}
String nodename = nodes[i].getHost();
if (localName.equals(nodename)) {
chosenNode = nodes[i];
break;
}
}
if (chosenNode == null) {
do {
chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
} while (deadNodes.contains(chosenNode));
}
return chosenNode;
}
/***************************************************************
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
***************************************************************/
class LeaseChecker implements Runnable {
/**
*/
public void run() {
long lastRenewed = 0;
while (running) {
if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) {
try {
namenode.renewLease(clientName);
lastRenewed = System.currentTimeMillis();
} catch (IOException ie) {
String err = StringUtils.stringifyException(ie);
LOG.warn("Problem renewing lease for " + clientName +
": " + err);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
}
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
class DFSInputStream extends FSInputStream {
private Socket s = null;
boolean closed = false;
private String src;
private DataInputStream blockStream;
private Block blocks[] = null;
private DatanodeInfo nodes[][] = null;
private long pos = 0;
private long filelen = 0;
private long blockEnd = -1;
/**
*/
public DFSInputStream(String src) throws IOException {
this.src = src;
openInfo();
this.blockStream = null;
for (int i = 0; i < blocks.length; i++) {
this.filelen += blocks[i].getNumBytes();
}
}
/**
* Grab the open-file info from namenode
*/
void openInfo() throws IOException {
Block oldBlocks[] = this.blocks;
LocatedBlock results[] = namenode.open(src);
Vector blockV = new Vector();
Vector nodeV = new Vector();
for (int i = 0; i < results.length; i++) {
blockV.add(results[i].getBlock());
nodeV.add(results[i].getLocations());
}
Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);
if (oldBlocks != null) {
for (int i = 0; i < oldBlocks.length; i++) {
if (! oldBlocks[i].equals(newBlocks[i])) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
if (oldBlocks.length != newBlocks.length) {
throw new IOException("Blocklist for " + src + " now has different length");
}
}
this.blocks = newBlocks;
this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
}
/**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized void blockSeekTo(long target) throws IOException {
if (target >= filelen) {
throw new IOException("Attempted to read past end of file");
}
if (s != null) {
s.close();
s = null;
}
//
// Compute desired block
//
int targetBlock = -1;
long targetBlockStart = 0;
long targetBlockEnd = 0;
for (int i = 0; i < blocks.length; i++) {
long blocklen = blocks[i].getNumBytes();
targetBlockEnd = targetBlockStart + blocklen - 1;
if (target >= targetBlockStart && target <= targetBlockEnd) {
targetBlock = i;
break;
} else {
targetBlockStart = targetBlockEnd + 1;
}
}
if (targetBlock < 0) {
throw new IOException("Impossible situation: could not find target position " + target);
}
long offsetIntoBlock = target - targetBlockStart;
//
// Connect to best DataNode for desired Block, with potential offset
//
int failures = 0;
InetSocketAddress targetAddr = null;
TreeSet deadNodes = new TreeSet();
while (s == null) {
DatanodeInfo chosenNode;
try {
chosenNode = bestNode(nodes[targetBlock], deadNodes);
targetAddr = DataNode.createSocketAddr(chosenNode.getName());
} catch (IOException ie) {
String blockInfo =
blocks[targetBlock]+" file="+src+" offset="+target;
if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
throw new IOException("Could not obtain block: " + blockInfo);
}
if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) {
LOG.info("No node available for block: " + blockInfo);
}
LOG.info("Could not obtain block from any node: " + ie);
try {
Thread.sleep(3000);
} catch (InterruptedException iex) {
}
deadNodes.clear();
openInfo();
failures++;
continue;
}
try {
s = new Socket();
s.connect(targetAddr, READ_TIMEOUT);
s.setSoTimeout(READ_TIMEOUT);
//
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(OP_READSKIP_BLOCK);
blocks[targetBlock].write(out);
out.writeLong(offsetIntoBlock);
out.flush();
//
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
long curBlockSize = in.readLong();
long amtSkipped = in.readLong();
if (curBlockSize != blocks[targetBlock].len) {
throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
}
if (amtSkipped != offsetIntoBlock) {
throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
}
this.pos = target;
this.blockEnd = targetBlockEnd;
this.blockStream = in;
} catch (IOException ex) {
// Put chosen node into dead list, continue
LOG.info("Failed to connect to " + targetAddr + ":" + ex);
deadNodes.add(chosenNode);
if (s != null) {
try {
s.close();
} catch (IOException iex) {
}
}
s = null;
}
}
}
/**
* Close it down!
*/
public synchronized void close() throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
if (s != null) {
blockStream.close();
s.close();
s = null;
}
super.close();
closed = true;
}
/**
* Basic read()
*/
public synchronized int read() throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
int result = -1;
if (pos < filelen) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
result = blockStream.read();
if (result >= 0) {
pos++;
}
}
return result;
}
/**
* Read the entire buffer.
*/
public synchronized int read(byte buf[], int off, int len) throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
if (pos < filelen) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int result = blockStream.read(buf, off, Math.min(len, (int) (blockEnd - pos + 1)));
if (result >= 0) {
pos += result;
}
return result;
}
return -1;
}
/**
* Seek to a new arbitrary location
*/
public synchronized void seek(long targetPos) throws IOException {
if (targetPos >= filelen) {
throw new IOException("Cannot seek after EOF");
}
pos = targetPos;
blockEnd = -1;
}
/**
*/
public synchronized long getPos() throws IOException {
return pos;
}
/**
*/
public synchronized int available() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
return (int) (filelen - pos);
}
/**
* We definitely don't support marks
*/
public boolean markSupported() {
return false;
}
public void mark(int readLimit) {
}
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
}
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
****************************************************************/
class DFSOutputStream extends FSOutputStream {
private Socket s;
boolean closed = false;
private byte outBuf[] = new byte[BUFFER_SIZE];
private int pos = 0;
private UTF8 src;
private boolean overwrite;
private short replication;
private boolean firstTime = true;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private File backupFile;
private OutputStream backupStream;
private Block block;
private long filePos = 0;
private int bytesWrittenToBlock = 0;
private String datanodeName;
private long blockSize;
/**
* Create a new output stream to the given DataNode.
*/
public DFSOutputStream(UTF8 src, boolean overwrite,
short replication, long blockSize
) throws IOException {
this.src = src;
this.overwrite = overwrite;
this.replication = replication;
this.backupFile = newBackupFile();
this.blockSize = blockSize;
this.backupStream = new FileOutputStream(backupFile);
}
private File newBackupFile() throws IOException {
File result = conf.getFile("dfs.data.dir",
"tmp"+File.separator+
"client-"+Math.abs(r.nextLong()));
result.deleteOnExit();
return result;
}
/**
* Open a DataOutputStream to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
*/
private synchronized void nextBlockOutputStream() throws IOException {
boolean retry = false;
long startTime = System.currentTimeMillis();
do {
retry = false;
LocatedBlock lb;
if (firstTime) {
lb = locateNewBlock();
} else {
lb = locateFollowingBlock(startTime);
}
block = lb.getBlock();
DatanodeInfo nodes[] = lb.getLocations();
//
// Connect to first DataNode in the list. Abort if this fails.
//
InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
try {
s = new Socket();
s.connect(target, READ_TIMEOUT);
s.setSoTimeout(replication * READ_TIMEOUT);
datanodeName = nodes[0].getName();
} catch (IOException ie) {
// Connection failed. Let's wait a little bit and retry
try {
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + target);
}
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
if (firstTime) {
namenode.abandonFileInProgress(src.toString(),
clientName);
} else {
namenode.abandonBlock(block, src.toString());
}
retry = true;
continue;
}
//
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(OP_WRITE_BLOCK);
out.writeBoolean(false);
block.write(out);
out.writeInt(nodes.length);
for (int i = 0; i < nodes.length; i++) {
nodes[i].write(out);
}
out.write(CHUNKED_ENCODING);
bytesWrittenToBlock = 0;
blockStream = out;
blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
} while (retry);
firstTime = false;
}
private LocatedBlock locateNewBlock() throws IOException {
int retries = 3;
while (true) {
while (true) {
try {
return namenode.create(src.toString(), clientName.toString(),
localName, overwrite, replication, blockSize);
} catch (RemoteException e) {
if (--retries == 0 ||
!AlreadyBeingCreatedException.class.getName().
equals(e.getClassName())) {
throw e;
} else {
// because failed tasks take upto LEASE_PERIOD to
// release their pendingCreates files, if the file
// we want to create is already being created,
// wait and try again.
LOG.info(StringUtils.stringifyException(e));
try {
Thread.sleep(LEASE_PERIOD);
} catch (InterruptedException ie) {
}
}
}
}
}
}
private LocatedBlock locateFollowingBlock(long start
) throws IOException {
int retries = 5;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
return namenode.addBlock(src.toString(),
clientName.toString());
} catch (RemoteException e) {
if (--retries == 0 ||
!NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
throw e;
} else {
LOG.info(StringUtils.stringifyException(e));
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Waiting for replication for " +
(System.currentTimeMillis() - localstart)/1000 +
" seconds");
}
try {
Thread.sleep(400);
} catch (InterruptedException ie) {
}
}
}
}
}
}
/**
* We're referring to the file pos here
*/
public synchronized long getPos() throws IOException {
return filePos;
}
/**
* Writes the specified byte to this output stream.
*/
public synchronized void write(int b) throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
if ((bytesWrittenToBlock + pos == blockSize) ||
(pos >= BUFFER_SIZE)) {
flush();
}
outBuf[pos++] = (byte) b;
filePos++;
}
/**
* Writes the specified bytes to this output stream.
*/
public synchronized void write(byte b[], int off, int len)
throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
while (len > 0) {
int remaining = BUFFER_SIZE - pos;
int toWrite = Math.min(remaining, len);
System.arraycopy(b, off, outBuf, pos, toWrite);
pos += toWrite;
off += toWrite;
len -= toWrite;
filePos += toWrite;
if ((bytesWrittenToBlock + pos >= blockSize) ||
(pos == BUFFER_SIZE)) {
flush();
}
}
}
/**
* Flush the buffer, getting a stream to a new block if necessary.
*/
public synchronized void flush() throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
if (bytesWrittenToBlock + pos >= blockSize) {
flushData((int) blockSize - bytesWrittenToBlock);
}
if (bytesWrittenToBlock == blockSize) {
endBlock();
}
flushData(pos);
}
/**
* Actually flush the accumulated bytes to the remote node,
* but no more bytes than the indicated number.
*/
private synchronized void flushData(int maxPos) throws IOException {
int workingPos = Math.min(pos, maxPos);
if (workingPos > 0) {
//
// To the local block backup, write just the bytes
//
backupStream.write(outBuf, 0, workingPos);
//
// Track position
//
bytesWrittenToBlock += workingPos;
System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
pos -= workingPos;
}
}
/**
* We're done writing to the current block.
*/
private synchronized void endBlock() throws IOException {
//
// Done with local copy
//
backupStream.close();
//
// Send it to datanode
//
boolean sentOk = false;
int remainingAttempts =
conf.getInt("dfs.client.block.write.retries", 3);
while (!sentOk) {
nextBlockOutputStream();
InputStream in = new FileInputStream(backupFile);
try {
byte buf[] = new byte[BUFFER_SIZE];
int bytesRead = in.read(buf);
while (bytesRead > 0) {
blockStream.writeLong((long) bytesRead);
blockStream.write(buf, 0, bytesRead);
bytesRead = in.read(buf);
}
internalClose();
sentOk = true;
} catch (IOException ie) {
handleSocketException(ie);
remainingAttempts -= 1;
if (remainingAttempts == 0) {
throw ie;
}
} finally {
in.close();
}
}
//
// Delete local backup, start new one
//
backupFile.delete();
backupFile = newBackupFile();
backupStream = new FileOutputStream(backupFile);
bytesWrittenToBlock = 0;
}
/**
* Close down stream to remote datanode.
*/
private synchronized void internalClose() throws IOException {
try {
blockStream.writeLong(0);
blockStream.flush();
long complete = blockReplyStream.readLong();
if (complete != WRITE_COMPLETE) {
LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
}
} catch (IOException ie) {
throw (IOException)
new IOException("failure closing block of file " +
src.toString() + " to node " +
(datanodeName == null ? "?" : datanodeName)
).initCause(ie);
}
LocatedBlock lb = new LocatedBlock();
lb.readFields(blockReplyStream);
namenode.reportWrittenBlock(lb);
s.close();
s = null;
}
private void handleSocketException(IOException ie) throws IOException {
LOG.warn("Error while writing.", ie);
try {
if (s != null) {
s.close();
s = null;
}
} catch (IOException ie2) {
LOG.warn("Error closing socket.", ie2);
}
namenode.abandonBlock(block, src.toString());
}
/**
* Closes this output stream and releases any system
* resources associated with this stream.
*/
public synchronized void close() throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
try {
flush();
if (filePos == 0 || bytesWrittenToBlock != 0) {
try {
endBlock();
} catch (IOException e) {
namenode.abandonFileInProgress(src.toString(), clientName);
throw e;
}
}
backupStream.close();
backupFile.delete();
if (s != null) {
s.close();
s = null;
}
super.close();
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (! fileComplete) {
fileComplete = namenode.complete(src.toString(), clientName.toString());
if (!fileComplete) {
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Could not complete file, retrying...");
}
} catch (InterruptedException ie) {
}
}
}
closed = true;
} finally {
synchronized (pendingCreates) {
pendingCreates.remove(src.toString());
}
}
}
}
}