blob: 5f361cd81cddb6c976f85cd4f3b71063957d5d1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* The MIT License (MIT)
* Copyright (c) 2014 Martin Kleppmann
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package org.apache.hadoop.hbase.test.util.warc;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you
* can write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
* not tied to the MapReduce APIs -- that link is provided by the mapred
* {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce
* {@link com.martinkl.warc.mapreduce.WARCOutputFormat}.
*
* WARCFileWriter keeps track of how much data it has written (optionally gzip-compressed);
* when the file becomes larger than some threshold, it is automatically closed and a
* new segment is started. A segment number is appended to the filename for that purpose.
* The segment number always starts at 00000, and by default a new segment is started when
* the file size exceeds 1GB. To change the target size for a segment, you can set the
* `warc.output.segment.size` key in the Hadoop configuration to the number of bytes.
* (Files may actually be a bit larger than this threshold, since we finish writing the
* current record before opening a new file.)
*/
public class WARCFileWriter {
private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class);
public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB
private final Configuration conf;
private final CompressionCodec codec;
private final Path workOutputPath;
private final Progressable progress;
private final String extensionFormat;
private final long maxSegmentSize;
private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0;
private CountingOutputStream byteStream;
private DataOutputStream dataStream;
/**
* Creates a WARC file, and opens it for writing. If a file with the same name already
* exists, an attempt number in the filename is incremented until we find a file that
* doesn't already exist.
*
* @param conf The Hadoop configuration.
* @param codec If null, the file is uncompressed. If non-null, this compression codec
* will be used. The codec's default file extension is appended to the filename.
* @param workOutputPath The directory and filename prefix to which the data should be
* written. We append a segment number and filename extensions to it.
*/
public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath)
throws IOException {
this(conf, codec, workOutputPath, null);
}
/**
* Creates a WARC file, and opens it for writing. If a file with the same name already
* exists, it is *overwritten*. Note that this is different behaviour from the other
* constructor. Yes, this sucks. It will probably change in a future version.
*
* @param conf The Hadoop configuration.
* @param codec If null, the file is uncompressed. If non-null, this compression codec
* will be used. The codec's default file extension is appended to the filename.
* @param workOutputPath The directory and filename prefix to which the data should be
* written. We append a segment number and filename extensions to it.
* @param progress An object used by the mapred API for tracking a task's progress.
*/
public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath,
Progressable progress) throws IOException {
this.conf = conf;
this.codec = codec;
this.workOutputPath = workOutputPath;
this.progress = progress;
this.extensionFormat = ".seg-%05d.attempt-%05d.warc" +
(codec == null ? "" : codec.getDefaultExtension());
this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE);
createSegment();
}
/**
* Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the
* most common compression applied to WARC files.
*
* @param conf The Hadoop configuration.
*/
public static CompressionCodec getGzipCodec(Configuration conf) {
try {
return (CompressionCodec) ReflectionUtils.newInstance(
conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec")
.asSubclass(CompressionCodec.class),
conf);
} catch (ClassNotFoundException e) {
logger.warn("GzipCodec could not be instantiated", e);
return null;
}
}
/**
* Creates an output segment file and sets up the output streams to point at it.
* If the file already exists, retries with a different filename. This is a bit nasty --
* after all, {@link FileOutputFormat}'s work directory concept is supposed to prevent
* filename clashes -- but it looks like Amazon Elastic MapReduce prevents use of per-task
* work directories if the output of a job is on S3.
*
* TODO: Investigate this and find a better solution.
*/
private void createSegment() throws IOException {
segmentsAttempted = 0;
bytesWritten = 0;
boolean success = false;
while (!success) {
Path path = workOutputPath.suffix(String.format(extensionFormat, segmentsCreated,
segmentsAttempted));
FileSystem fs = path.getFileSystem(conf);
try {
// The o.a.h.mapred OutputFormats overwrite existing files, whereas
// the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre...
// Here, overwrite if progress != null, i.e. if using mapred API.
FSDataOutputStream fsStream = (progress == null) ? fs.create(path, false) :
fs.create(path, progress);
byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream));
dataStream = new DataOutputStream(codec == null ? byteStream :
codec.createOutputStream(byteStream));
segmentsCreated++;
logger.info("Writing to output file: {}", path);
success = true;
} catch (IOException e) {
if (e.getMessage().startsWith("File already exists")) {
logger.warn("Tried to create file {} but it already exists; retrying.", path);
segmentsAttempted++; // retry
} else {
throw e;
}
}
}
}
/**
* Appends a {@link WARCRecord} to the file, in WARC/1.0 format.
* @param record The record to be written.
*/
public void write(WARCRecord record) throws IOException {
if (bytesWritten > maxSegmentSize) {
dataStream.close();
createSegment();
}
record.write(dataStream);
}
/**
* Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file.
* @param record The wrapper around the record to be written.
*/
public void write(WARCWritable record) throws IOException {
if (record.getRecord() != null) {
write(record.getRecord());
}
}
/**
* Flushes any buffered data and closes the file.
*/
public void close() throws IOException {
dataStream.close();
}
private class CountingOutputStream extends FilterOutputStream {
public CountingOutputStream(OutputStream out) {
super(out);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
bytesWritten += len;
}
@Override
public void write(int b) throws IOException {
out.write(b);
bytesWritten++;
}
// Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior:
// it silently ignores any exception thrown by flush(). Instead, just close the delegate
// stream. It should flush itself if necessary. (Thanks to the Guava project for noticing
// this.)
@Override
public void close() throws IOException {
out.close();
}
}
}