blob: 129b508e903a477663ae3cf5d559b410aecc7a61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.io;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Formatter;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.util.FileSystemUtil;
/**
* An output stream that writes to an underlying filesystem, opening
* a new file after a specified number of bytes have been written to the
* current one.
*/
public class SplittingOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(
SplittingOutputStream.class.getName());
private OutputStream writeStream;
private CountingOutputStream countingFilterStream;
private Configuration conf;
private Path destDir;
private String filePrefix;
private long cutoffBytes;
private CompressionCodec codec;
private int fileNum;
/**
* Create a new SplittingOutputStream.
* @param conf the Configuration to use to interface with HDFS
* @param destDir the directory where the files will go (should already
* exist).
* @param filePrefix the first part of the filename, which will be appended
* by a number. This file will be placed inside destDir.
* @param cutoff the approximate number of bytes to use per file
* @param doGzip if true, then output files will be gzipped and have a .gz
* suffix.
*/
public SplittingOutputStream(final Configuration conf, final Path destDir,
final String filePrefix, final long cutoff, final CompressionCodec codec)
throws IOException {
this.conf = conf;
this.destDir = destDir;
this.filePrefix = filePrefix;
this.cutoffBytes = cutoff;
if (this.cutoffBytes < 0) {
this.cutoffBytes = 0; // splitting disabled.
}
this.codec = codec;
this.fileNum = 0;
openNextFile();
}
/** Initialize the OutputStream to the next file to write to.
*/
private void openNextFile() throws IOException {
StringBuffer sb = new StringBuffer();
Formatter fmt = new Formatter(sb);
fmt.format("%05d", this.fileNum++);
String filename = filePrefix + fmt.toString();
if (codec != null) {
filename = filename + codec.getDefaultExtension();
}
Path destFile = new Path(destDir, filename);
FileSystem fs = destFile.getFileSystem(conf);
LOG.debug("Opening next output file: " + destFile);
if (fs.exists(destFile)) {
Path canonicalDest = fs.makeQualified(destFile);
throw new IOException("Destination file " + canonicalDest
+ " already exists");
}
OutputStream fsOut = fs.create(destFile);
// Count how many actual bytes hit HDFS.
this.countingFilterStream = new CountingOutputStream(fsOut);
if (codec != null) {
// Wrap that in a compressing stream.
this.writeStream = codec.createOutputStream(this.countingFilterStream);
} else {
// Write to the counting stream directly.
this.writeStream = this.countingFilterStream;
}
}
/**
* @return true if allowSplit() would actually cause a split.
*/
public boolean wouldSplit() {
return this.cutoffBytes > 0
&& this.countingFilterStream.getByteCount() >= this.cutoffBytes;
}
/** If we've written more to the disk than the user's split size,
* open the next file.
*/
private void checkForNextFile() throws IOException {
if (wouldSplit()) {
LOG.debug("Starting new split");
this.writeStream.flush();
this.writeStream.close();
openNextFile();
}
}
/** Defines a point in the stream when it is acceptable to split to a new
file; e.g., the end of a record.
*/
public void allowSplit() throws IOException {
checkForNextFile();
}
public void close() throws IOException {
this.writeStream.close();
}
public void flush() throws IOException {
this.writeStream.flush();
}
public void write(byte [] b) throws IOException {
this.writeStream.write(b);
}
public void write(byte [] b, int off, int len) throws IOException {
this.writeStream.write(b, off, len);
}
public void write(int b) throws IOException {
this.writeStream.write(b);
}
}