| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.pullserver; |
| |
| import com.google.common.collect.Lists; |
| import io.netty.bootstrap.ServerBootstrap; |
| import io.netty.buffer.Unpooled; |
| import io.netty.channel.*; |
| import io.netty.channel.group.ChannelGroup; |
| import io.netty.channel.group.DefaultChannelGroup; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.channel.socket.nio.NioServerSocketChannel; |
| import io.netty.handler.codec.http.*; |
| import io.netty.handler.ssl.SslHandler; |
| import io.netty.handler.stream.ChunkedWriteHandler; |
| import io.netty.util.CharsetUtil; |
| import io.netty.util.concurrent.GenericFutureListener; |
| import io.netty.util.concurrent.GlobalEventExecutor; |
| import org.apache.commons.codec.binary.Base64; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.ReadaheadPool; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.annotation.Metric; |
| import org.apache.hadoop.metrics2.annotation.Metrics; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.metrics2.lib.MutableCounterInt; |
| import org.apache.hadoop.metrics2.lib.MutableCounterLong; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeInt; |
| import org.apache.hadoop.security.ssl.SSLFactory; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.tajo.catalog.Schema; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.conf.TajoConf.ConfVars; |
| import org.apache.tajo.pullserver.retriever.FileChunk; |
| import org.apache.tajo.rpc.NettyUtils; |
| import org.apache.tajo.storage.*; |
| import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; |
| import org.apache.tajo.storage.index.bst.BSTIndex; |
| |
| import java.io.*; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| |
| public class TajoPullServerService extends AbstractService { |
| |
| private static final Log LOG = LogFactory.getLog(TajoPullServerService.class); |
| |
| public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache"; |
| public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; |
| |
| public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes"; |
| public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; |
| |
| private int port; |
| private ServerBootstrap selector; |
| private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| private HttpChannelInitializer channelInitializer; |
| private int sslFileBufferSize; |
| |
| private ApplicationId appId; |
| private FileSystem localFS; |
| |
| /** |
| * Should the shuffle use posix_fadvise calls to manage the OS cache during |
| * sendfile |
| */ |
| private boolean manageOsCache; |
| private int readaheadLength; |
| private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); |
| |
| |
| public static final String PULLSERVER_SERVICEID = "tajo.pullserver"; |
| |
| private static final Map<String,String> userRsrc = |
| new ConcurrentHashMap<String,String>(); |
| private String userName; |
| |
| public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = |
| "tajo.pullserver.ssl.file.buffer.size"; |
| |
| public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; |
| |
| private static boolean STANDALONE = false; |
| |
| private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER; |
| private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER; |
| |
| static { |
| /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */ |
| SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile"); |
| REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles"); |
| |
| String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE"); |
| if (!StringUtils.isEmpty(standalone)) { |
| STANDALONE = standalone.equalsIgnoreCase("true"); |
| } |
| } |
| |
| @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") |
| static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> { |
| @Metric({"OutputBytes","PullServer output in bytes"}) |
| MutableCounterLong shuffleOutputBytes; |
| @Metric({"Failed","# of failed shuffle outputs"}) |
| MutableCounterInt shuffleOutputsFailed; |
| @Metric({"Succeeded","# of succeeded shuffle outputs"}) |
| MutableCounterInt shuffleOutputsOK; |
| @Metric({"Connections","# of current shuffle connections"}) |
| MutableGaugeInt shuffleConnections; |
| |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| if (future.isSuccess()) { |
| shuffleOutputsOK.incr(); |
| } else { |
| shuffleOutputsFailed.incr(); |
| } |
| shuffleConnections.decr(); |
| } |
| } |
| |
| final ShuffleMetrics metrics; |
| |
| TajoPullServerService(MetricsSystem ms) { |
| super("httpshuffle"); |
| metrics = ms.register(new ShuffleMetrics()); |
| } |
| |
| @SuppressWarnings("UnusedDeclaration") |
| public TajoPullServerService() { |
| this(DefaultMetricsSystem.instance()); |
| } |
| |
| public void initApp(String user, ApplicationId appId, ByteBuffer secret) { |
| // TODO these bytes should be versioned |
| // TODO: Once SHuffle is out of NM, this can use MR APIs |
| this.appId = appId; |
| this.userName = user; |
| userRsrc.put(appId.toString(), user); |
| } |
| |
| public void stopApp(ApplicationId appId) { |
| userRsrc.remove(appId.toString()); |
| } |
| |
| @Override |
| public void init(Configuration conf) { |
| try { |
| manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, |
| DEFAULT_SHUFFLE_MANAGE_OS_CACHE); |
| |
| readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, |
| DEFAULT_SHUFFLE_READAHEAD_BYTES); |
| |
| int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", |
| Runtime.getRuntime().availableProcessors() * 2); |
| |
| selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum) |
| .option(ChannelOption.TCP_NODELAY, true) |
| .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) |
| .childOption(ChannelOption.TCP_NODELAY, true); |
| |
| localFS = new LocalFileSystem(); |
| |
| conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname |
| , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal)); |
| super.init(conf); |
| LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength); |
| } catch (Throwable t) { |
| LOG.error(t, t); |
| } |
| } |
| |
| // TODO change AbstractService to throw InterruptedException |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| if (!(conf instanceof TajoConf)) { |
| throw new IllegalArgumentException("Configuration must be a TajoConf instance"); |
| } |
| |
| ServerBootstrap bootstrap = selector.clone(); |
| TajoConf tajoConf = (TajoConf)conf; |
| try { |
| channelInitializer = new HttpChannelInitializer(tajoConf); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| bootstrap.childHandler(channelInitializer) |
| .channel(NioServerSocketChannel.class); |
| |
| port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, |
| ConfVars.PULLSERVER_PORT.defaultIntVal); |
| ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)) |
| .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE) |
| .syncUninterruptibly(); |
| |
| accepted.add(future.channel()); |
| port = ((InetSocketAddress)future.channel().localAddress()).getPort(); |
| conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); |
| LOG.info(getName() + " listening on port " + port); |
| |
| sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, |
| DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); |
| |
| |
| if (STANDALONE) { |
| File pullServerPortFile = getPullServerPortFile(); |
| if (pullServerPortFile.exists()) { |
| pullServerPortFile.delete(); |
| } |
| pullServerPortFile.getParentFile().mkdirs(); |
| LOG.info("Write PullServerPort to " + pullServerPortFile); |
| FileOutputStream out = null; |
| try { |
| out = new FileOutputStream(pullServerPortFile); |
| out.write(("" + port).getBytes()); |
| } catch (Exception e) { |
| LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile + |
| ", " + e.getMessage(), e); |
| System.exit(-1); |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| } |
| super.serviceInit(conf); |
| LOG.info("TajoPullServerService started: port=" + port); |
| } |
| |
| public static boolean isStandalone() { |
| return STANDALONE; |
| } |
| |
| private static File getPullServerPortFile() { |
| String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR"); |
| if (StringUtils.isEmpty(pullServerPortInfoFile)) { |
| pullServerPortInfoFile = "/tmp"; |
| } |
| return new File(pullServerPortInfoFile + "/pullserver.port"); |
| } |
| |
| // TODO change to get port from master or tajoConf |
| public static int readPullServerPort() { |
| FileInputStream in = null; |
| try { |
| File pullServerPortFile = getPullServerPortFile(); |
| |
| if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) { |
| return -1; |
| } |
| in = new FileInputStream(pullServerPortFile); |
| byte[] buf = new byte[1024]; |
| int readBytes = in.read(buf); |
| return Integer.parseInt(new String(buf, 0, readBytes)); |
| } catch (IOException e) { |
| LOG.fatal(e.getMessage(), e); |
| return -1; |
| } finally { |
| IOUtils.closeStream(in); |
| } |
| } |
| |
| public int getPort() { |
| return port; |
| } |
| |
| @Override |
| public void stop() { |
| try { |
| accepted.close(); |
| if (selector != null) { |
| if (selector.group() != null) { |
| selector.group().shutdownGracefully(); |
| } |
| if (selector.childGroup() != null) { |
| selector.childGroup().shutdownGracefully(); |
| } |
| } |
| |
| if (channelInitializer != null) { |
| channelInitializer.destroy(); |
| } |
| |
| localFS.close(); |
| } catch (Throwable t) { |
| LOG.error(t, t); |
| } finally { |
| super.stop(); |
| } |
| } |
| |
| class HttpChannelInitializer extends ChannelInitializer<SocketChannel> { |
| |
| final PullServer PullServer; |
| private SSLFactory sslFactory; |
| |
| public HttpChannelInitializer(TajoConf conf) throws Exception { |
| PullServer = new PullServer(conf); |
| if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, |
| ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { |
| sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); |
| sslFactory.init(); |
| } |
| } |
| |
| public void destroy() { |
| if (sslFactory != null) { |
| sslFactory.destroy(); |
| } |
| } |
| |
| @Override |
| protected void initChannel(SocketChannel channel) throws Exception { |
| ChannelPipeline pipeline = channel.pipeline(); |
| if (sslFactory != null) { |
| pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); |
| } |
| |
| int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, |
| ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal); |
| pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize)); |
| pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); |
| pipeline.addLast("chunking", new ChunkedWriteHandler()); |
| pipeline.addLast("shuffle", PullServer); |
| // TODO factor security manager into pipeline |
| // TODO factor out encode/decode to permit binary shuffle |
| // TODO factor out decode of index to permit alt. models |
| } |
| } |
| |
| |
| Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>(); |
| |
| public void completeFileChunk(FileRegion filePart, |
| String requestUri, |
| long startTime) { |
| ProcessingStatus status = processingStatusMap.get(requestUri); |
| if (status != null) { |
| status.decrementRemainFiles(filePart, startTime); |
| } |
| } |
| |
| class ProcessingStatus { |
| String requestUri; |
| int numFiles; |
| long startTime; |
| long makeFileListTime; |
| long minTime = Long.MAX_VALUE; |
| long maxTime; |
| volatile int numSlowFile; |
| volatile int remainFiles; |
| |
| public ProcessingStatus(String requestUri) { |
| this.requestUri = requestUri; |
| this.startTime = System.currentTimeMillis(); |
| } |
| |
| public void setNumFiles(int numFiles) { |
| this.numFiles = numFiles; |
| this.remainFiles = numFiles; |
| } |
| |
| public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { |
| long fileSendTime = System.currentTimeMillis() - fileStartTime; |
| |
| if (fileSendTime > maxTime) { |
| maxTime = fileSendTime; |
| } |
| if (fileSendTime < minTime) { |
| minTime = fileSendTime; |
| } |
| |
| if (fileSendTime > 20 * 1000) { |
| LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " + |
| "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri); |
| SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1); |
| } |
| |
| REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1); |
| if (REMAIN_FILE_UPDATER.get(this) <= 0) { |
| processingStatusMap.remove(requestUri); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " |
| + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " |
| + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); |
| } |
| } |
| } |
| } |
| |
| @ChannelHandler.Sharable |
| class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> { |
| |
| private final TajoConf conf; |
| // private final IndexCache indexCache; |
| private final LocalDirAllocator lDirAlloc = |
| new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); |
| |
| public PullServer(TajoConf conf) throws IOException { |
| this.conf = conf; |
| |
| // init local temporal dir |
| lDirAlloc.getAllLocalPathsToRead(".", conf); |
| } |
| |
| @Override |
| public void channelRegistered(ChannelHandlerContext ctx) throws Exception { |
| accepted.add(ctx.channel()); |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(String.format("Current number of shuffle connections (%d)", accepted.size())); |
| } |
| super.channelRegistered(ctx); |
| } |
| |
| @Override |
| public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) |
| throws Exception { |
| |
| if (request.getMethod() != HttpMethod.GET) { |
| sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); |
| return; |
| } |
| |
| ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); |
| processingStatusMap.put(request.getUri().toString(), processingStatus); |
| |
| // Parsing the URL into key-values |
| Map<String, List<String>> params = null; |
| try { |
| params = decodeParams(request.getUri()); |
| } catch (Throwable e) { |
| sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST); |
| } |
| |
| String partId = params.get("p").get(0); |
| String queryId = params.get("qid").get(0); |
| String shuffleType = params.get("type").get(0); |
| String sid = params.get("sid").get(0); |
| |
| final List<String> taskIdList = params.get("ta"); |
| final List<String> offsetList = params.get("offset"); |
| final List<String> lengthList = params.get("length"); |
| |
| long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; |
| long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; |
| |
| List<String> taskIds = splitMaps(taskIdList); |
| |
| Path queryBaseDir = getBaseOutputDir(queryId, sid); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId |
| + ", taskIds=" + taskIdList); |
| |
| // the working dir of tajo worker for each query |
| LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); |
| } |
| |
| final List<FileChunk> chunks = Lists.newArrayList(); |
| |
| // if a stage requires a range shuffle |
| if (shuffleType.equals("r")) { |
| Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output"); |
| if (!lDirAlloc.ifExists(outputPath.toString(), conf)) { |
| LOG.warn(outputPath + "does not exist."); |
| sendError(ctx, HttpResponseStatus.NO_CONTENT); |
| return; |
| } |
| Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf)); |
| String startKey = params.get("start").get(0); |
| String endKey = params.get("end").get(0); |
| boolean last = params.get("final") != null; |
| |
| FileChunk chunk; |
| try { |
| chunk = getFileChunks(path, startKey, endKey, last); |
| } catch (Throwable t) { |
| LOG.error("ERROR Request: " + request.getUri(), t); |
| sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); |
| return; |
| } |
| if (chunk != null) { |
| chunks.add(chunk); |
| } |
| |
| // if a stage requires a hash shuffle or a scattered hash shuffle |
| } else if (shuffleType.equals("h") || shuffleType.equals("s")) { |
| int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); |
| Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId); |
| if (!lDirAlloc.ifExists(partPath.toString(), conf)) { |
| LOG.warn("Partition shuffle file not exists: " + partPath); |
| sendError(ctx, HttpResponseStatus.NO_CONTENT); |
| return; |
| } |
| |
| Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf)); |
| |
| File file = new File(path.toUri()); |
| long startPos = (offset >= 0 && length >= 0) ? offset : 0; |
| long readLen = (offset >= 0 && length >= 0) ? length : file.length(); |
| |
| if (startPos >= file.length()) { |
| String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]"; |
| LOG.error(errorMessage); |
| sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST); |
| return; |
| } |
| LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); |
| FileChunk chunk = new FileChunk(file, startPos, readLen); |
| chunks.add(chunk); |
| } else { |
| LOG.error("Unknown shuffle type: " + shuffleType); |
| sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST); |
| return; |
| } |
| |
| processingStatus.setNumFiles(chunks.size()); |
| processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime; |
| // Write the content. |
| if (chunks.size() == 0) { |
| HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); |
| |
| if (!HttpHeaders.isKeepAlive(request)) { |
| ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); |
| } else { |
| response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); |
| ctx.writeAndFlush(response); |
| } |
| } else { |
| FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); |
| ChannelFuture writeFuture = null; |
| HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); |
| long totalSize = 0; |
| for (FileChunk chunk : file) { |
| totalSize += chunk.length(); |
| } |
| HttpHeaders.setContentLength(response, totalSize); |
| |
| if (HttpHeaders.isKeepAlive(request)) { |
| response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); |
| } |
| // Write the initial line and the header. |
| writeFuture = ctx.write(response); |
| |
| for (FileChunk chunk : file) { |
| writeFuture = sendFile(ctx, chunk, request.getUri().toString()); |
| if (writeFuture == null) { |
| sendError(ctx, HttpResponseStatus.NOT_FOUND); |
| return; |
| } |
| } |
| if (ctx.pipeline().get(SslHandler.class) == null) { |
| writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
| } else { |
| ctx.flush(); |
| } |
| |
| // Decide whether to close the connection or not. |
| if (!HttpHeaders.isKeepAlive(request)) { |
| // Close the connection when the whole content is written out. |
| writeFuture.addListener(ChannelFutureListener.CLOSE); |
| } |
| } |
| } |
| |
| private ChannelFuture sendFile(ChannelHandlerContext ctx, |
| FileChunk file, |
| String requestUri) throws IOException { |
| long startTime = System.currentTimeMillis(); |
| RandomAccessFile spill = null; |
| ChannelFuture writeFuture; |
| try { |
| spill = new RandomAccessFile(file.getFile(), "r"); |
| if (ctx.pipeline().get(SslHandler.class) == null) { |
| final FadvisedFileRegion filePart = new FadvisedFileRegion(spill, |
| file.startOffset(), file.length(), manageOsCache, readaheadLength, |
| readaheadPool, file.getFile().getAbsolutePath()); |
| writeFuture = ctx.write(filePart); |
| writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this)); |
| } else { |
| // HTTPS cannot be done with zero copy. |
| final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, |
| file.startOffset(), file.length(), sslFileBufferSize, |
| manageOsCache, readaheadLength, readaheadPool, |
| file.getFile().getAbsolutePath()); |
| writeFuture = ctx.write(new HttpChunkedInput(chunk)); |
| } |
| } catch (FileNotFoundException e) { |
| LOG.info(file.getFile() + " not found"); |
| return null; |
| } catch (Throwable e) { |
| if (spill != null) { |
| //should close a opening file |
| spill.close(); |
| } |
| return null; |
| } |
| metrics.shuffleConnections.incr(); |
| metrics.shuffleOutputBytes.incr(file.length()); // optimistic |
| return writeFuture; |
| } |
| |
| private void sendError(ChannelHandlerContext ctx, |
| HttpResponseStatus status) { |
| sendError(ctx, "", status); |
| } |
| |
| private void sendError(ChannelHandlerContext ctx, String message, |
| HttpResponseStatus status) { |
| FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, |
| Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); |
| response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); |
| |
| // Close the connection as soon as the error message is sent. |
| ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); |
| } |
| |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) |
| throws Exception { |
| LOG.error(cause.getMessage(), cause); |
| if (ctx.channel().isOpen()) { |
| ctx.channel().close(); |
| } |
| } |
| } |
| |
| public static FileChunk getFileChunks(Path outDir, |
| String startKey, |
| String endKey, |
| boolean last) throws IOException { |
| BSTIndex index = new BSTIndex(new TajoConf()); |
| try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) { |
| Schema keySchema = idxReader.getKeySchema(); |
| TupleComparator comparator = idxReader.getComparator(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")"); |
| } |
| |
| File data = new File(URI.create(outDir.toUri() + "/output")); |
| byte[] startBytes = Base64.decodeBase64(startKey); |
| byte[] endBytes = Base64.decodeBase64(endKey); |
| |
| RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); |
| Tuple start; |
| Tuple end; |
| try { |
| start = decoder.toTuple(startBytes); |
| } catch (Throwable t) { |
| throw new IllegalArgumentException("StartKey: " + startKey |
| + ", decoded byte size: " + startBytes.length, t); |
| } |
| |
| try { |
| end = decoder.toTuple(endBytes); |
| } catch (Throwable t) { |
| throw new IllegalArgumentException("EndKey: " + endKey |
| + ", decoded byte size: " + endBytes.length, t); |
| } |
| |
| LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end + |
| (last ? ", last=true" : "") + ")"); |
| |
| if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero |
| LOG.info("There is no contents"); |
| return null; |
| } |
| |
| if (comparator.compare(end, idxReader.getFirstKey()) < 0 || |
| comparator.compare(idxReader.getLastKey(), start) < 0) { |
| LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + |
| "], but request start:" + start + ", end: " + end); |
| return null; |
| } |
| |
| long startOffset; |
| long endOffset; |
| try { |
| idxReader.init(); |
| startOffset = idxReader.find(start); |
| } catch (IOException ioe) { |
| LOG.error("State Dump (the requested range: " |
| + "[" + start + ", " + end + ")" + ", idx min: " |
| + idxReader.getFirstKey() + ", idx max: " |
| + idxReader.getLastKey()); |
| throw ioe; |
| } |
| try { |
| endOffset = idxReader.find(end); |
| if (endOffset == -1) { |
| endOffset = idxReader.find(end, true); |
| } |
| } catch (IOException ioe) { |
| LOG.error("State Dump (the requested range: " |
| + "[" + start + ", " + end + ")" + ", idx min: " |
| + idxReader.getFirstKey() + ", idx max: " |
| + idxReader.getLastKey()); |
| throw ioe; |
| } |
| |
| // if startOffset == -1 then case 2-1 or case 3 |
| if (startOffset == -1) { // this is a hack |
| // if case 2-1 or case 3 |
| try { |
| startOffset = idxReader.find(start, true); |
| } catch (IOException ioe) { |
| LOG.error("State Dump (the requested range: " |
| + "[" + start + ", " + end + ")" + ", idx min: " |
| + idxReader.getFirstKey() + ", idx max: " |
| + idxReader.getLastKey()); |
| throw ioe; |
| } |
| } |
| |
| if (startOffset == -1) { |
| throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + |
| "State Dump (the requested range: " |
| + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " |
| + idxReader.getLastKey()); |
| } |
| |
| // if greater than indexed values |
| if (last || (endOffset == -1 |
| && comparator.compare(idxReader.getLastKey(), end) < 0)) { |
| endOffset = data.length(); |
| } |
| |
| FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); |
| |
| if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); |
| return chunk; |
| } |
| } |
| |
| public static List<String> splitMaps(List<String> mapq) { |
| if (null == mapq) { |
| return null; |
| } |
| final List<String> ret = new ArrayList<String>(); |
| for (String s : mapq) { |
| Collections.addAll(ret, s.split(",")); |
| } |
| return ret; |
| } |
| |
| public static Map<String, List<String>> decodeParams(String uri) { |
| final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters(); |
| final List<String> types = params.get("type"); |
| final List<String> qids = params.get("qid"); |
| final List<String> ebIds = params.get("sid"); |
| final List<String> partIds = params.get("p"); |
| |
| if (types == null || ebIds == null || qids == null || partIds == null) { |
| throw new IllegalArgumentException("invalid params. required :" + params); |
| } |
| |
| if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) { |
| throw new IllegalArgumentException("invalid params. required :" + params); |
| } |
| |
| return params; |
| } |
| |
| public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) { |
| Path workDir = |
| StorageUtil.concatPath( |
| queryId, |
| "output", |
| executionBlockSequenceId); |
| return workDir; |
| } |
| |
| public static Path getBaseInputDir(String queryId, String executionBlockId) { |
| Path workDir = |
| StorageUtil.concatPath( |
| queryId, |
| "in", |
| executionBlockId); |
| return workDir; |
| } |
| } |