blob: 369e17f193f90cd2c75b7f440cbad85c5d66e05e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.yarn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.exception.InvalidURLException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.pullserver.PullServerConstants;
import org.apache.tajo.pullserver.PullServerUtil;
import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.pullserver.retriever.IndexCacheKey;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
import org.apache.tajo.util.SizeOf;
import org.apache.tajo.util.TajoIdUtils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class TajoPullServerService extends AuxiliaryService {
private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
private TajoConf tajoConf;
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup("Pull server group");
private HttpChannelInitializer channelInitializer;
private int sslFileBufferSize;
private int maxUrlLength;
private ApplicationId appId;
private FileSystem localFS;
/**
* Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile
*/
private boolean manageOsCache;
private int readaheadLength;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
private static final Map<String,String> userRsrc =
new ConcurrentHashMap<>();
private String userName;
private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
private int lowCacheHitCheckThreshold;
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
static class ShuffleMetrics implements ChannelFutureListener {
@Metric({"OutputBytes","PullServer output in bytes"})
MutableCounterLong shuffleOutputBytes;
@Metric({"Failed","# of failed shuffle outputs"})
MutableCounterInt shuffleOutputsFailed;
@Metric({"Succeeded","# of succeeded shuffle outputs"})
MutableCounterInt shuffleOutputsOK;
@Metric({"Connections","# of current shuffle connections"})
MutableGaugeInt shuffleConnections;
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
shuffleOutputsOK.incr();
} else {
shuffleOutputsFailed.incr();
}
shuffleConnections.decr();
}
}
final ShuffleMetrics metrics;
TajoPullServerService(MetricsSystem ms) {
super(PullServerConstants.PULLSERVER_SERVICE_NAME);
metrics = ms.register(new ShuffleMetrics());
}
@SuppressWarnings("UnusedDeclaration")
public TajoPullServerService() {
this(DefaultMetricsSystem.instance());
}
@Override
public void initializeApplication(ApplicationInitializationContext context) {
// TODO these bytes should be versioned
// TODO: Once SHuffle is out of NM, this can use MR APIs
String user = context.getUser();
ApplicationId appId = context.getApplicationId();
// ByteBuffer secret = context.getApplicationDataForService();
userRsrc.put(appId.toString(), user);
}
@Override
public void stopApplication(ApplicationTerminationContext context) {
userRsrc.remove(context.getApplicationId().toString());
}
// TODO change AbstractService to throw InterruptedException
@Override
public void serviceInit(Configuration conf) throws Exception {
tajoConf = new TajoConf(conf);
manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("TajoPullServerService Netty Boss #%d")
.build();
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setNameFormat("TajoPullServerService Netty Worker #%d")
.build();
selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory),
workerNum);
localFS = new LocalFileSystem();
maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
try {
channelInitializer = new HttpChannelInitializer(tajoConf);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
bootstrap.setPipelineFactory(channelInitializer);
port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
Channel ch = bootstrap.bind(new InetSocketAddress(port));
accepted.add(ch);
port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
LOG.info(getName() + " listening on port " + port);
sslFileBufferSize = tajoConf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
indexReaderCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
.removalListener(removalListener)
.build(
new CacheLoader<IndexCacheKey, BSTIndexReader>() {
@Override
public BSTIndexReader load(IndexCacheKey key) throws Exception {
return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
}
}
);
lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
super.serviceInit(tajoConf);
LOG.info("TajoPullServerService started: port=" + port);
}
@Override
public void serviceStop() throws Exception {
// TODO: check this wait
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
if (selector != null) {
ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.releaseExternalResources();
}
if (channelInitializer != null) {
channelInitializer.destroy();
}
localFS.close();
indexReaderCache.invalidateAll();
super.serviceStop();
}
@VisibleForTesting
public int getPort() {
return port;
}
@Override
public ByteBuffer getMetaData() {
try {
return serializeMetaData(port);
} catch (IOException e) {
LOG.error("Error during getMeta", e);
// TODO add API to AuxiliaryServices to report failures
return null;
}
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
return ByteBuffer.allocate(SizeOf.SIZE_OF_INT).putInt(port);
}
class HttpChannelInitializer implements ChannelPipelineFactory {
final PullServer PullServer;
private SSLFactory sslFactory;
public HttpChannelInitializer(TajoConf conf) throws Exception {
PullServer = new PullServer(conf);
if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
}
public void destroy() {
if (sslFactory != null) {
sslFactory.destroy();
}
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", PullServer);
return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
// TODO factor out decode of index to permit alt. models
}
}
@ChannelHandler.Sharable
class PullServer extends SimpleChannelUpstreamHandler {
private final TajoConf conf;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
private final Gson gson = new Gson();
public PullServer(TajoConf conf) throws IOException {
this.conf = conf;
// init local temporal dir
lDirAlloc.getAllLocalPathsToRead(".", conf);
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) throws Exception {
accepted.add(evt.getChannel());
if(LOG.isDebugEnabled()) {
LOG.debug(String.format("Current number of shuffle connections (%d)", accepted.size()));
}
super.channelOpen(ctx, evt);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
HttpRequest request = (HttpRequest) evt.getMessage();
Channel ch = evt.getChannel();
if (request.getMethod() == HttpMethod.DELETE) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
ch.write(response).addListener(ChannelFutureListener.CLOSE);
clearIndexCache(request.getUri());
return;
} else if (request.getMethod() != HttpMethod.GET) {
sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
// Parsing the URL into key-values
try {
final PullServerParams params = new PullServerParams(request.getUri());
if (PullServerUtil.isChunkRequest(params.requestType())) {
handleChunkRequest(ctx, request, params);
} else {
handleMetaRequest(ctx, request, params);
}
} catch (Throwable e) {
LOG.error("Failed to handle request " + request.getUri());
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
}
}
/**
* Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
* It is called whenever an execution block is completed.
*
* @param uri query URI which indicates the execution block id
* @throws IOException
* @throws InvalidURLException
*/
public void clearIndexCache(String uri)
throws IOException, InvalidURLException {
// Simply parse the given uri
String[] tokens = uri.split("=");
if (tokens.length != 2 || !tokens[0].equals("ebid")) {
throw new IllegalArgumentException("invalid params: " + uri);
}
ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
String queryId = ebId.getQueryId().toString();
String ebSeqId = Integer.toString(ebId.getId());
List<IndexCacheKey> removed = new ArrayList<>();
synchronized (indexReaderCache) {
for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
IndexCacheKey key = e.getKey();
if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
e.getValue().forceClose();
removed.add(e.getKey());
}
}
indexReaderCache.invalidateAll(removed);
}
removed.clear();
synchronized (waitForRemove) {
for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
IndexCacheKey key = e.getKey();
if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
e.getValue().forceClose();
removed.add(e.getKey());
}
}
for (IndexCacheKey eachKey : removed) {
waitForRemove.remove(eachKey);
}
}
}
private void handleMetaRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
throws IOException, ExecutionException {
final List<String> jsonMetas;
try {
jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
lowCacheHitCheckThreshold);
} catch (FileNotFoundException e) {
sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
return;
} catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
} catch (ExecutionException e) {
// There are some problems in index cache
throw new TajoInternalError(e.getCause());
}
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
response.setContent(ChannelBuffers.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
response.setHeader(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
HttpHeaders.setContentLength(response, response.getContent().readableBytes());
if (HttpHeaders.isKeepAlive(request)) {
response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
}
ChannelFuture writeFuture = ctx.getChannel().write(response);
// Decide whether to close the connection or not.
if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
private void handleChunkRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
throws IOException {
final List<FileChunk> chunks;
try {
chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
lowCacheHitCheckThreshold);
} catch (FileNotFoundException e) {
sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
return;
} catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
} catch (ExecutionException e) {
// There are some problems in index cache
throw new TajoInternalError(e.getCause());
}
// Write the content.
final Channel ch = ctx.getChannel();
if (chunks.size() == 0) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
if (!HttpHeaders.isKeepAlive(request)) {
ch.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
ch.write(response);
}
} else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
ChannelFuture writeFuture = null;
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
StringBuilder sb = new StringBuilder();
for (FileChunk chunk : file) {
totalSize += chunk.length();
sb.append(Long.toString(chunk.length())).append(",");
}
sb.deleteCharAt(sb.length() - 1);
HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
HttpHeaders.setContentLength(response, totalSize);
if (HttpHeaders.isKeepAlive(request)) {
response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
}
// Write the initial line and the header.
writeFuture = ch.write(response);
for (FileChunk chunk : file) {
writeFuture = sendFile(ctx, chunk);
if (writeFuture == null) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
}
// Decide whether to close the connection or not.
if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}
private ChannelFuture sendFile(ChannelHandlerContext ctx,
FileChunk file) throws IOException {
Channel ch = ctx.getChannel();
RandomAccessFile spill = null;
ChannelFuture writeFuture;
try {
spill = new RandomAccessFile(file.getFile(), "r");
if (ctx.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ch.write(filePart);
writeFuture.addListener(new FileCloseListener(filePart));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
file.startOffset(), file.length(), sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
file.getFile().getAbsolutePath());
writeFuture = ch.write(chunk);
}
} catch (FileNotFoundException e) {
LOG.fatal(file.getFile() + " not found");
return null;
} catch (Throwable e) {
LOG.fatal("error while sending a file: ", e);
if (spill != null) {
//should close a opening file
LOG.warn("Close the file " + file.getFile().getAbsolutePath());
spill.close();
}
return null;
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(file.length()); // optimistic
return writeFuture;
}
private void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
sendError(ctx, "", status);
}
private void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
ChannelBuffer content = ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8);
response.setContent(content);
response.setHeader(Names.CONTENT_LENGTH, content.writerIndex());
// Close the connection as soon as the error message is sent.
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Channel ch = e.getChannel();
Throwable cause = e.getCause();
LOG.error(cause.getMessage(), cause);
if (ch.isOpen()) {
ch.close();
}
}
}
// Temporal space to wait for the completion of all index lookup operations
private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
// RemovalListener is triggered when an item is removed from the index reader cache.
// It closes index readers when they are not used anymore.
// If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
BSTIndexReader reader = removal.getValue();
if (reader.getReferenceNum() == 0) {
try {
reader.close(); // tear down properly
} catch (IOException e) {
throw new RuntimeException(e);
}
waitForRemove.remove(removal.getKey());
} else {
waitForRemove.put(removal.getKey(), reader);
}
};
}