blob: d3f7c86f2d566b9d8328dcd5568e42eb1fbc97fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.appmaster;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.internal.json.ResourceReportAdapter;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentCompressor;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Webservice that the Application Master will register back to the resource manager
* for clients to track application progress. Currently used purely for getting a
* breakdown of resource usage as a {@link org.apache.twill.api.ResourceReport}.
*/
public final class TrackerService extends AbstractIdleService {
// TODO: This is temporary. When support more REST API, this would get moved.
public static final String PATH = "/resources";
private static final Logger LOG = LoggerFactory.getLogger(TrackerService.class);
private static final int NUM_BOSS_THREADS = 1;
private static final int CLOSE_CHANNEL_TIMEOUT = 5;
private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
private final Supplier<ResourceReport> resourceReport;
private final ChannelGroup channelGroup;
private String host;
private ServerBootstrap bootstrap;
private InetSocketAddress bindAddress;
private URL url;
/**
* Initialize the service.
*
* @param resourceReport live report that the service will return to clients.
*/
TrackerService(Supplier<ResourceReport> resourceReport) {
this.channelGroup = new DefaultChannelGroup("appMasterTracker");
this.resourceReport = resourceReport;
}
/**
* Sets the hostname which the tracker service will bind to. This method must be called before starting this
* tracker service.
*/
void setHost(String host) {
this.host = host;
}
/**
* Returns the address this tracker service is bounded to.
*/
InetSocketAddress getBindAddress() {
return bindAddress;
}
/**
* @return tracker url.
*/
URL getUrl() {
return url;
}
@Override
protected void startUp() throws Exception {
Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("boss-thread")
.build());
Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("worker-thread#%d")
.build());
ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("compressor", new HttpContentCompressor());
pipeline.addLast("handler", new ReportHandler());
return pipeline;
}
});
Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
bindAddress = (InetSocketAddress) channel.getLocalAddress();
url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
channelGroup.add(channel);
LOG.info("Tracker service started at {}", url);
}
@Override
protected void shutDown() throws Exception {
try {
if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn("Timeout when closing all channels.");
}
} finally {
bootstrap.releaseExternalResources();
}
LOG.info("Tracker service stopped at {}", url);
}
/**
* Handler to return resources used by this application master, which will be available through
* the host and port set when this application master registered itself to the resource manager.
*/
final class ReportHandler extends SimpleChannelUpstreamHandler {
private final ResourceReportAdapter reportAdapter;
public ReportHandler() {
this.reportAdapter = ResourceReportAdapter.create();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (!isValid(request)) {
write404(e);
return;
}
writeResponse(e);
}
// only accepts GET on /resources for now
private boolean isValid(HttpRequest request) {
return (request.getMethod() == HttpMethod.GET) && PATH.equals(request.getUri());
}
private void write404(MessageEvent e) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
ChannelFuture future = e.getChannel().write(response);
future.addListener(ChannelFutureListener.CLOSE);
}
private void writeResponse(MessageEvent e) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
ChannelBuffer content = ChannelBuffers.dynamicBuffer();
Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
reportAdapter.toJson(resourceReport.get(), writer);
try {
writer.close();
} catch (IOException e1) {
LOG.error("error writing resource report", e1);
}
response.setContent(content);
ChannelFuture future = e.getChannel().write(response);
future.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getChannel().close();
}
}
}