blob: 761f2ceaaa8bd27d207db77f078a462bc36ffc5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3.INode.FileType;
import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3OutputStream extends OutputStream {
private Configuration conf;
private int bufferSize;
private FileSystemStore store;
private Path path;
private long blockSize;
private File backupFile;
private OutputStream backupStream;
private Random r = new Random();
private boolean closed;
private int pos = 0;
private long filePos = 0;
private int bytesWrittenToBlock = 0;
private byte[] outBuf;
private List<Block> blocks = new ArrayList<Block>();
private Block nextBlock;
private static final Log LOG =
LogFactory.getLog(S3OutputStream.class.getName());
public S3OutputStream(Configuration conf, FileSystemStore store,
Path path, long blockSize, Progressable progress,
int buffersize) throws IOException {
this.conf = conf;
this.store = store;
this.path = path;
this.blockSize = blockSize;
this.backupFile = newBackupFile();
this.backupStream = new FileOutputStream(backupFile);
this.bufferSize = buffersize;
this.outBuf = new byte[bufferSize];
}
private File newBackupFile() throws IOException {
File dir = new File(conf.get("fs.s3.buffer.dir"));
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Cannot create S3 buffer directory: " + dir);
}
File result = File.createTempFile("output-", ".tmp", dir);
result.deleteOnExit();
return result;
}
public long getPos() throws IOException {
return filePos;
}
@Override
public synchronized void write(int b) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
flush();
}
outBuf[pos++] = (byte) b;
filePos++;
}
@Override
public synchronized void write(byte b[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
while (len > 0) {
int remaining = bufferSize - pos;
int toWrite = Math.min(remaining, len);
System.arraycopy(b, off, outBuf, pos, toWrite);
pos += toWrite;
off += toWrite;
len -= toWrite;
filePos += toWrite;
if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
flush();
}
}
}
@Override
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (bytesWrittenToBlock + pos >= blockSize) {
flushData((int) blockSize - bytesWrittenToBlock);
}
if (bytesWrittenToBlock == blockSize) {
endBlock();
}
flushData(pos);
}
private synchronized void flushData(int maxPos) throws IOException {
int workingPos = Math.min(pos, maxPos);
if (workingPos > 0) {
//
// To the local block backup, write just the bytes
//
backupStream.write(outBuf, 0, workingPos);
//
// Track position
//
bytesWrittenToBlock += workingPos;
System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
pos -= workingPos;
}
}
private synchronized void endBlock() throws IOException {
//
// Done with local copy
//
backupStream.close();
//
// Send it to S3
//
// TODO: Use passed in Progressable to report progress.
nextBlockOutputStream();
store.storeBlock(nextBlock, backupFile);
internalClose();
//
// Delete local backup, start new one
//
boolean b = backupFile.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
backupFile = newBackupFile();
backupStream = new FileOutputStream(backupFile);
bytesWrittenToBlock = 0;
}
private synchronized void nextBlockOutputStream() throws IOException {
long blockId = r.nextLong();
while (store.blockExists(blockId)) {
blockId = r.nextLong();
}
nextBlock = new Block(blockId, bytesWrittenToBlock);
blocks.add(nextBlock);
bytesWrittenToBlock = 0;
}
private synchronized void internalClose() throws IOException {
INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
.size()]));
store.storeINode(path, inode);
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
flush();
if (filePos == 0 || bytesWrittenToBlock != 0) {
endBlock();
}
backupStream.close();
boolean b = backupFile.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
super.close();
closed = true;
}
}