blob: 37821d58f6184e0f237da9f2eeac4eb9c07a9bc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.tools.proxy;
import java.lang.invoke.MethodHandles;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.protocol.ProtocolStatus;
import org.mortbay.jetty.Request;
/**
* XXX should turn this into a plugin?
*/
public class SegmentHandler extends AbstractTestbedHandler {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private Segment seg;
private static HashMap<Integer, Integer> protoCodes = new HashMap<Integer, Integer>();
static {
protoCodes.put(ProtocolStatus.ACCESS_DENIED,
HttpServletResponse.SC_UNAUTHORIZED);
protoCodes.put(ProtocolStatus.BLOCKED,
HttpServletResponse.SC_SERVICE_UNAVAILABLE);
protoCodes.put(ProtocolStatus.EXCEPTION,
HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
protoCodes.put(ProtocolStatus.FAILED, HttpServletResponse.SC_BAD_REQUEST);
protoCodes.put(ProtocolStatus.GONE, HttpServletResponse.SC_GONE);
protoCodes.put(ProtocolStatus.MOVED,
HttpServletResponse.SC_MOVED_PERMANENTLY);
protoCodes.put(ProtocolStatus.NOTFETCHING,
HttpServletResponse.SC_BAD_REQUEST);
protoCodes.put(ProtocolStatus.NOTFOUND, HttpServletResponse.SC_NOT_FOUND);
protoCodes.put(ProtocolStatus.NOTMODIFIED,
HttpServletResponse.SC_NOT_MODIFIED);
protoCodes.put(ProtocolStatus.PROTO_NOT_FOUND,
HttpServletResponse.SC_BAD_REQUEST);
protoCodes.put(ProtocolStatus.REDIR_EXCEEDED,
HttpServletResponse.SC_BAD_REQUEST);
protoCodes.put(ProtocolStatus.RETRY, HttpServletResponse.SC_BAD_REQUEST);
protoCodes.put(ProtocolStatus.ROBOTS_DENIED,
HttpServletResponse.SC_FORBIDDEN);
protoCodes.put(ProtocolStatus.SUCCESS, HttpServletResponse.SC_OK);
protoCodes.put(ProtocolStatus.TEMP_MOVED,
HttpServletResponse.SC_MOVED_TEMPORARILY);
protoCodes.put(ProtocolStatus.WOULDBLOCK,
HttpServletResponse.SC_BAD_REQUEST);
}
private static class SegmentPathFilter implements PathFilter {
public static final SegmentPathFilter INSTANCE = new SegmentPathFilter();
@Override
public boolean accept(Path p) {
return p.getName().startsWith("part-");
}
}
private static class Segment implements Closeable {
private static final Partitioner<Text, Writable> PARTITIONER = new HashPartitioner<Text, Writable>();
private Path segmentDir;
private Object cLock = new Object();
private Object crawlLock = new Object();
private MapFile.Reader[] content;
private MapFile.Reader[] parseText;
private MapFile.Reader[] parseData;
private MapFile.Reader[] crawl;
private Configuration conf;
public Segment(FileSystem fs, Path segmentDir, Configuration conf)
throws IOException {
this.segmentDir = segmentDir;
this.conf = conf;
}
public CrawlDatum getCrawlDatum(Text url) throws IOException {
synchronized (crawlLock) {
if (crawl == null)
crawl = getReaders(CrawlDatum.FETCH_DIR_NAME);
}
return (CrawlDatum) getEntry(crawl, url, new CrawlDatum());
}
public Content getContent(Text url) throws IOException {
synchronized (cLock) {
if (content == null)
content = getReaders(Content.DIR_NAME);
}
return (Content) getEntry(content, url, new Content());
}
/** Open the output generated by this format. */
private MapFile.Reader[] getReaders(String subDir) throws IOException {
Path dir = new Path(segmentDir, subDir);
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir,
SegmentPathFilter.INSTANCE));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(names[i], conf);
}
return parts;
}
private Writable getEntry(MapFile.Reader[] readers, Text url, Writable entry)
throws IOException {
return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry);
}
public void close() throws IOException {
if (content != null) {
closeReaders(content);
}
if (parseText != null) {
closeReaders(parseText);
}
if (parseData != null) {
closeReaders(parseData);
}
if (crawl != null) {
closeReaders(crawl);
}
}
private void closeReaders(MapFile.Reader[] readers) throws IOException {
for (int i = 0; i < readers.length; i++) {
readers[i].close();
}
}
}
public SegmentHandler(Configuration conf, Path name) throws Exception {
seg = new Segment(FileSystem.get(conf), name, conf);
}
@Override
public void handle(Request req, HttpServletResponse res, String target,
int dispatch) throws IOException, ServletException {
try {
String uri = req.getUri().toString();
LOG.info("URI: " + uri);
addMyHeader(res, "URI", uri);
Text url = new Text(uri.toString());
CrawlDatum cd = seg.getCrawlDatum(url);
if (cd != null) {
addMyHeader(res, "Res", "found");
LOG.info("-got " + cd.toString());
ProtocolStatus ps = (ProtocolStatus) cd.getMetaData().get(
Nutch.WRITABLE_PROTO_STATUS_KEY);
if (ps != null) {
Integer TrCode = protoCodes.get(ps.getCode());
if (TrCode != null) {
res.setStatus(TrCode.intValue());
} else {
res.setStatus(HttpServletResponse.SC_OK);
}
addMyHeader(res, "ProtocolStatus", ps.toString());
} else {
res.setStatus(HttpServletResponse.SC_OK);
}
Content c = seg.getContent(url);
if (c == null) { // missing content
req.setHandled(true);
res.addHeader("X-Handled-By", getClass().getSimpleName());
return;
}
byte[] data = c.getContent();
LOG.debug("-data len=" + data.length);
Metadata meta = c.getMetadata();
String[] names = meta.names();
LOG.debug("- " + names.length + " meta");
for (int i = 0; i < names.length; i++) {
boolean my = true;
char ch = names[i].charAt(0);
if (Character.isLetter(ch) && Character.isUpperCase(ch)) {
// pretty good chance it's a standard header
my = false;
}
String[] values = meta.getValues(names[i]);
for (int k = 0; k < values.length; k++) {
if (my) {
addMyHeader(res, names[i], values[k]);
} else {
res.addHeader(names[i], values[k]);
}
}
}
req.setHandled(true);
res.addHeader("X-Handled-By", getClass().getSimpleName());
res.setContentType(meta.get(Metadata.CONTENT_TYPE));
res.setContentLength(data.length);
OutputStream os = res.getOutputStream();
os.write(data, 0, data.length);
res.flushBuffer();
} else {
addMyHeader(res, "Res", "not found");
LOG.info(" -not found " + url);
}
} catch (Exception e) {
e.printStackTrace();
LOG.warn(StringUtils.stringifyException(e));
addMyHeader(res, "Res", "Exception: " + StringUtils.stringifyException(e));
}
}
}