blob: 0a78f2f89893336ee4a43407da6147c75b550696 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.collector.servlet;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
@Deprecated
public class ServletCollector extends HttpServlet {
static final boolean FANCY_DIAGNOSTICS = false;
public static final String PATH = "chukwa";
/**
* If a chunk is committed; then the ack will start with the following string.
*/
public static final String ACK_PREFIX = "ok: ";
transient ChukwaWriter writer = null;
private static final long serialVersionUID = 6286162898591407111L;
transient Logger log = Logger.getLogger(ServletCollector.class);
boolean COMPRESS;
String CODEC_NAME;
transient CompressionCodec codec;
public void setWriter(ChukwaWriter w) {
writer = w;
}
public ChukwaWriter getWriter() {
return writer;
}
long statTime = 0L;
int numberHTTPConnection = 0;
int numberchunks = 0;
long lifetimechunks = 0;
transient Configuration conf;
public ServletCollector(Configuration c) {
conf = c;
}
public void init(ServletConfig servletConf) throws ServletException {
log.info("initing servletCollector");
if (servletConf == null) {
log.fatal("no servlet config");
return;
}
Timer statTimer = new Timer();
statTimer.schedule(new TimerTask() {
public void run() {
log.info("stats:ServletCollector,numberHTTPConnection:"
+ numberHTTPConnection + ",numberchunks:" + numberchunks);
statTime = System.currentTimeMillis();
numberHTTPConnection = 0;
numberchunks = 0;
}
}, (1000), (60 * 1000));
if (writer != null) {
log.info("writer set up statically, no need for Collector.init() to do it");
return;
}
try {
String writerClassName = conf.get("chukwaCollector.writerClass",
SeqFileWriter.class.getCanonicalName());
Class<?> writerClass = Class.forName(writerClassName);
if (writerClass != null
&& ChukwaWriter.class.isAssignableFrom(writerClass))
writer = (ChukwaWriter) writerClass.newInstance();
} catch (Exception e) {
log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
}
COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
if( COMPRESS) {
CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
Class<?> codecClass = null;
try {
codecClass = Class.forName( CODEC_NAME);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
log.info("codec " + CODEC_NAME + " loaded for network compression");
} catch (ClassNotFoundException e) {
log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
COMPRESS = false;
}
}
// We default to here if the pipeline construction failed or didn't happen.
try {
if (writer == null) {
writer = new SeqFileWriter();
}
writer.init(conf);
} catch (Throwable e) {
log.warn("Exception trying to initialize SeqFileWriter",e);
}
}
@Override
protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
}
protected void accept(HttpServletRequest req, HttpServletResponse resp)
throws ServletException {
numberHTTPConnection++;
final long currentTime = System.currentTimeMillis();
try {
log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
java.io.InputStream in = req.getInputStream();
ServletOutputStream l_out = resp.getOutputStream();
DataInputStream di = null;
boolean compressNetwork = COMPRESS;
if( compressNetwork){
InputStream cin = codec.createInputStream( in);
di = new DataInputStream(cin);
}
else {
di = new DataInputStream(in);
}
final int numEvents = di.readInt();
// log.info("saw " + numEvents+ " in request");
List<Chunk> events = new LinkedList<Chunk>();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numEvents; i++) {
ChunkImpl logEvent = ChunkImpl.read(di);
events.add(logEvent);
}
int responseStatus = HttpServletResponse.SC_OK;
// write new data to data sync file
if (writer != null) {
ChukwaWriter.CommitStatus result = writer.add(events);
// this is where we ACK this connection
if(result == ChukwaWriter.COMMIT_OK) {
// only count the chunks if result is commit or commit pending
numberchunks += events.size();
lifetimechunks += events.size();
for(Chunk receivedChunk: events) {
sb.append(ACK_PREFIX);
sb.append(receivedChunk.getData().length);
sb.append(" bytes ending at offset ");
sb.append(receivedChunk.getSeqID() - 1).append("\n");
}
} else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
// only count the chunks if result is commit or commit pending
numberchunks += events.size();
lifetimechunks += events.size();
for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
sb.append(s);
} else if(result == ChukwaWriter.COMMIT_FAIL) {
sb.append("Commit failed");
responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
}
l_out.print(sb.toString());
} else {
l_out.println("can't write: no writer");
}
resp.setStatus(responseStatus);
} catch (Throwable e) {
log.warn("Exception talking to " + req.getRemoteHost() + " at t="
+ currentTime, e);
throw new ServletException(e);
}
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
accept(req, resp);
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
resp.setStatus(200);
String pingAtt = req.getParameter("ping");
if (pingAtt != null) {
out.println("Date:" + statTime);
out.println("Now:" + System.currentTimeMillis());
out.println("numberHTTPConnection in time window:"
+ numberHTTPConnection);
out.println("numberchunks in time window:" + numberchunks);
out.println("lifetimechunks:" + lifetimechunks);
} else {
out.println("<html><body><h2>Chukwa servlet running</h2>");
out.println("</body></html>");
}
}
@Override
public String getServletInfo() {
return "Chukwa Servlet Collector";
}
@Override
public void destroy() {
try {
writer.close();
} catch (WriterException e) {
log.warn("Exception during close", e);
e.printStackTrace();
}
super.destroy();
}
}