blob: b41bbc98d8133bb0245c140d391103702953363e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.util;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Scaffolding class for the various Checker implementations. Can process cmdline input, stdin and TCP connections.
*
* @author Jurian Broertjes
*/
public abstract class AbstractChecker extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected boolean keepClientCnxOpen = false;
protected int tcpPort = -1;
protected boolean stdin = true;
protected String usage;
// Actual function for the processing of a single input
protected abstract int process(String line, StringBuilder output) throws Exception;
protected int parseArgs(String[] args, int i) {
if (args[i].equals("-listen")) {
tcpPort = Integer.parseInt(args[++i]);
return 2;
} else if (args[i].equals("-keepClientCnxOpen")) {
keepClientCnxOpen = true;
return 1;
} else if (args[i].equals("-stdin")) {
stdin = true;
return 1;
}
return 0;
}
protected int run() throws Exception {
// In listening mode?
if (tcpPort != -1) {
processTCP(tcpPort);
return 0;
} else if (stdin) {
return processStdin();
}
// Nothing to do?
return -1;
}
// Process single input and return
protected int processSingle(String input) throws Exception {
StringBuilder output = new StringBuilder();
int ret = process(input, output);
System.out.println(output);
return ret;
}
// Read from stdin
protected int processStdin() throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = in.readLine()) != null) {
StringBuilder output = new StringBuilder();
@SuppressWarnings("unused")
int ret = process(line, output);
System.out.println(output);
}
return 0;
}
// Open TCP socket and process input
@SuppressWarnings("resource")
protected void processTCP(int tcpPort) throws Exception {
ServerSocket server = null;
try {
server = new ServerSocket();
server.bind(new InetSocketAddress(tcpPort));
LOG.info(server.toString());
} catch (Exception e) {
LOG.error("Could not listen on port " + tcpPort, e);
System.exit(-1);
}
while(true){
Worker worker;
try {
worker = new Worker(server.accept());
Thread thread = new Thread(worker);
thread.start();
} catch (Exception e) {
LOG.error("Accept failed: " + tcpPort, e);
System.exit(-1);
}
}
}
private class Worker implements Runnable {
private Socket client;
Worker(Socket client) {
this.client = client;
LOG.info(client.toString());
}
public void run() {
// Setup streams
BufferedReader in = null;
OutputStream out = null;
try {
in = new BufferedReader(new InputStreamReader(client.getInputStream()));
out = client.getOutputStream();
} catch (IOException e) {
LOG.error("Failed initializing streams: ", e);
return;
}
// Listen for work
if (keepClientCnxOpen) {
try {
while (readWrite(in, out)) {} // keep connection open until it closes
} catch(Exception e) {
LOG.error("Read/Write failed: ", e);
}
} else {
try {
readWrite(in, out);
} catch(Exception e) {
LOG.error("Read/Write failed: ", e);
}
}
try { // close ourselves
client.close();
} catch (Exception e){
LOG.error(e.toString());
}
}
protected boolean readWrite(BufferedReader in, OutputStream out) throws Exception {
String line = in.readLine();
if (line == null) {
// End of stream
return false;
}
if (line.trim().length() > 1) {
// The actual work
StringBuilder output = new StringBuilder();
process(line, output);
output.append("\n");
out.write(output.toString().getBytes(StandardCharsets.UTF_8));
}
return true;
}
}
protected ProtocolOutput getProtocolOutput(String url, CrawlDatum datum) throws Exception {
ProtocolFactory factory = new ProtocolFactory(getConf());
Protocol protocol = factory.getProtocol(url);
Text turl = new Text(url);
return protocol.getProtocolOutput(turl, datum);
}
}