blob: 0f06e49f089bbcf6346ace57f153e87411c901f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.tools.warc;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.parse.ParseSegment;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.tools.WARCUtils;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.martinkl.warc.WARCRecord;
import com.martinkl.warc.WARCWritable;
import com.martinkl.warc.mapreduce.WARCOutputFormat;
/**
* MapReduce job to exports Nutch segments as WARC files. The file format is
* documented in the [ISO
* Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
* Generates elements of type response if the configuration 'store.http.headers'
* was set to true during the fetching and the http headers were stored
* verbatim; generates elements of type 'resource' otherwise.
**/
public class WARCExporter extends Configured implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private static final String CRLF = "\r\n";
private static final byte[] CRLF_BYTES = { 13, 10 };
public WARCExporter() {
super(null);
}
public WARCExporter(Configuration conf) {
super(conf);
}
public static class WARCMapReduce {
public static class WARCMapper extends
Mapper<Text, Writable, Text, NutchWritable> {
@Override
public void map(Text key, Writable value, Context context)
throws IOException, InterruptedException {
context.write(key, new NutchWritable(value));
}
}
public static class WARCReducer extends
Reducer<Text, NutchWritable, NullWritable, WARCWritable> {
@Override
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
Content content = null;
CrawlDatum cd = null;
SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'",
Locale.ENGLISH);
// aggregate the values found
for (NutchWritable val : values) {
final Writable value = val.get(); // unwrap
if (value instanceof Content) {
content = (Content) value;
continue;
}
if (value instanceof CrawlDatum) {
cd = (CrawlDatum) value;
continue;
}
}
// check that we have everything we need
if (content == null) {
LOG.info("Missing content for {}", key);
context.getCounter("WARCExporter", "missing content").increment(1);
return;
}
if (cd == null) {
LOG.info("Missing fetch datum for {}", key);
context.getCounter("WARCExporter", "missing metadata").increment(1);
return;
}
// were the headers stored as is? Can write a response element then
String headersVerbatim = content.getMetadata().get("_response.headers_");
headersVerbatim = WARCUtils.fixHttpHeaders(headersVerbatim, content.getContent().length);
byte[] httpheaders = new byte[0];
if (StringUtils.isNotBlank(headersVerbatim)) {
// check that ends with an empty line
if (!headersVerbatim.endsWith(CRLF + CRLF)) {
headersVerbatim += CRLF + CRLF;
}
httpheaders = headersVerbatim.getBytes();
}
StringBuilder buffer = new StringBuilder();
buffer.append(WARCRecord.WARC_VERSION);
buffer.append(CRLF);
buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
.append(UUID.randomUUID().toString()).append(">").append(CRLF);
int contentLength = 0;
if (content != null) {
contentLength = content.getContent().length;
}
// add the length of the http header
contentLength += httpheaders.length;
buffer.append("Content-Length").append(": ")
.append(Integer.toString(contentLength)).append(CRLF);
Date fetchedDate = new Date(cd.getFetchTime());
buffer.append("WARC-Date").append(": ").append(warcdf.format(fetchedDate))
.append(CRLF);
// check if http headers have been stored verbatim
// if not generate a response instead
String WARCTypeValue = "resource";
if (StringUtils.isNotBlank(headersVerbatim)) {
WARCTypeValue = "response";
}
buffer.append("WARC-Type").append(": ").append(WARCTypeValue)
.append(CRLF);
// "WARC-IP-Address" if present
String IP = content.getMetadata().get("_ip_");
if (StringUtils.isNotBlank(IP)) {
buffer.append("WARC-IP-Address").append(": ").append("IP").append(CRLF);
}
// detect if truncated only for fetch success
String status = CrawlDatum.getStatusName(cd.getStatus());
if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
&& ParseSegment.isTruncated(content)) {
buffer.append("WARC-Truncated").append(": ").append("unspecified")
.append(CRLF);
}
// must be a valid URI
try {
String normalised = key.toString().replaceAll(" ", "%20");
URI uri = URI.create(normalised);
buffer.append("WARC-Target-URI").append(": ")
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
context.getCounter("WARCExporter", "invalid URI").increment(1);
return;
}
// provide a ContentType if type response
if (WARCTypeValue.equals("response")) {
buffer.append("Content-Type: application/http; msgtype=response")
.append(CRLF);
}
// finished writing the WARC headers, now let's serialize it
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// store the headers
bos.write(buffer.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES);
// the http headers
bos.write(httpheaders);
// the binary content itself
if (content.getContent() != null) {
bos.write(content.getContent());
}
bos.write(CRLF_BYTES);
bos.write(CRLF_BYTES);
try {
DataInput in = new DataInputStream(
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
context.getCounter("WARCExporter", "records generated").increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error("Exception when generating WARC record for {} : {}", key,
exception.getMessage());
context.getCounter("WARCExporter", "exception").increment(1);
}
}
}
}
public int generateWARC(String output, List<Path> segments) throws IOException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("WARCExporter: starting at {}", sdf.format(start));
final Job job = NutchJob.getInstance(getConf());
job.setJobName("warc-exporter " + output);
for (final Path segment : segments) {
LOG.info("warc-exporter: adding segment: {}", segment);
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
FileInputFormat.addInputPath(job,
new Path(segment, CrawlDatum.FETCH_DIR_NAME));
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setJarByClass(WARCMapReduce.class);
job.setMapperClass(WARCMapReduce.WARCMapper.class);
job.setReducerClass(WARCMapReduce.WARCReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class);
FileOutputFormat.setOutputPath(job, new Path(output));
// using the old api
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "WARCExporter job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
LOG.info(job.getCounters().toString());
long end = System.currentTimeMillis();
LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
TimingUtil.elapsedTime(start, end));
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("WARCExporter job failed: {}", e.getMessage());
return -1;
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(
"Usage: WARCExporter <output> (<segment> ... | -dir <segments>)");
return -1;
}
final List<Path> segments = new ArrayList<>();
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-dir")) {
Path dir = new Path(args[++i]);
FileSystem fs = dir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(dir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
Path[] files = HadoopFSUtil.getPaths(fstats);
for (Path p : files) {
segments.add(p);
}
} else {
segments.add(new Path(args[i]));
}
}
return generateWARC(args[0], segments);
}
public static void main(String[] args) throws Exception {
final int res = ToolRunner.run(NutchConfiguration.create(),
new WARCExporter(), args);
System.exit(res);
}
}