blob: a8e874c029e465de58261ce8861c6fe1e3522692 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.worker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequest;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import static org.apache.tajo.ResourceProtos.*;
public class TaskImpl implements Task {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
private static final float FETCHER_PROGRESS = 0.5f;
private final TajoConf systemConf;
private final QueryContext queryContext;
private final ExecutionBlockContext executionBlockContext;
private final TaskRequest request;
private final Map<String, TableDesc> descs;
private final TableStats inputStats;
private final Path taskDir;
private final TaskAttemptContext context;
private List<Fetcher> fetcherRunners;
private LogicalNode plan;
private PhysicalExec executor;
private boolean interQuery;
private Path inputTableBaseDir;
private long startTime;
private long endTime;
private List<FileChunk> localChunks;
// TODO - to be refactored
private ShuffleType shuffleType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
public TaskImpl(final TaskRequest request,
final ExecutionBlockContext executionBlockContext) throws IOException {
this.request = request;
this.executionBlockContext = executionBlockContext;
this.systemConf = executionBlockContext.getConf();
this.queryContext = request.getQueryContext(systemConf);
this.inputStats = new TableStats();
this.fetcherRunners = Lists.newArrayList();
this.descs = Maps.newHashMap();
Path baseDirPath = executionBlockContext.createBaseDir();
LOG.info("Task basedir is created (" + baseDirPath +")");
TaskAttemptId taskAttemptId = request.getId();
this.taskDir = StorageUtil.concatPath(baseDirPath,
taskAttemptId.getTaskId().getId() + "_" + taskAttemptId.getId());
this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskAttemptId,
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
this.context.setState(TaskAttemptState.TA_PENDING);
}
public void initPlan() throws IOException {
plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
updateDescsForScanNodes(NodeType.SCAN);
updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
updateDescsForScanNodes(NodeType.INDEX_SCAN);
interQuery = request.getProto().getInterQuery();
if (interQuery) {
context.setInterQuery();
this.shuffleType = context.getDataChannel().getShuffleType();
if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()))
.getAppenderFilePath(getId(), queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
}
this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
LOG.info("==================================");
LOG.info("* Stage " + request.getId() + " is initialized");
LOG.info("* InterQuery: " + interQuery
+ (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
", Fragments (num: " + request.getFragments().size() + ")" +
", Fetches (total:" + request.getFetches().size() + ") :");
if(LOG.isDebugEnabled()) {
for (FetchImpl f : request.getFetches()) {
LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
}
}
LOG.info("* Local task dir: " + taskDir);
if(LOG.isDebugEnabled()) {
LOG.debug("* plan:\n");
LOG.debug(plan.toString());
}
LOG.info("==================================");
}
private void updateDescsForScanNodes(NodeType nodeType) {
assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
if (scanNodes != null) {
for (LogicalNode node : scanNodes) {
ScanNode scanNode = (ScanNode) node;
descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
}
private void startScriptExecutors() throws IOException {
for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
executor.start(systemConf);
}
}
private void stopScriptExecutors() {
for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
executor.shutdown();
}
}
@Override
public void init() throws IOException {
LOG.info("Initializing: " + getId());
initPlan();
startScriptExecutors();
if (context.getState() == TaskAttemptState.TA_PENDING) {
// initialize a task temporal dir
FileSystem localFS = executionBlockContext.getLocalFS();
localFS.mkdirs(taskDir);
if (request.getFetches().size() > 0) {
inputTableBaseDir = localFS.makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
localFS.mkdirs(inputTableBaseDir);
Path tableDir;
for (String inputTable : context.getInputTables()) {
tableDir = new Path(inputTableBaseDir, inputTable);
if (!localFS.exists(tableDir)) {
LOG.info("the directory is created " + tableDir.toUri());
localFS.mkdirs(tableDir);
}
}
}
// for localizing the intermediate data
fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
}
}
private TaskAttemptId getId() {
return context.getTaskId();
}
public String toString() {
return "TaskId: " + this.getId() + " Status: " + context.getState();
}
@Override
public boolean isStopped() {
return context.isStopped();
}
@Override
public TaskAttemptContext getTaskContext() {
return context;
}
@Override
public ExecutionBlockContext getExecutionBlockContext() {
return executionBlockContext;
}
@Override
public boolean hasFetchPhase() {
return fetcherRunners.size() > 0;
}
@Override
public void fetch(ExecutorService fetcherExecutor) {
for (Fetcher f : fetcherRunners) {
fetcherExecutor.submit(new FetchRunner(context, f));
}
}
@Override
public void kill() {
stopScriptExecutors();
context.setState(TaskAttemptState.TA_KILLED);
context.stop();
}
@Override
public void abort() {
stopScriptExecutors();
context.setState(TaskAttemptState.TA_FAILED);
context.stop();
}
@Override
public TaskStatusProto getReport() {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
builder.setId(context.getTaskId().getProto())
.setProgress(context.getProgress())
.setState(context.getState());
builder.setInputStats(reloadInputStats());
if (context.getResultStats() != null) {
builder.setResultStats(context.getResultStats().getProto());
}
return builder.build();
}
@Override
public boolean isProgressChanged() {
return context.isProgressChanged();
}
@Override
public void updateProgress() {
if(context != null && context.isStopped()){
return;
}
if (executor != null && context.getProgress() < 1.0f) {
context.setExecutorProgress(executor.getProgress());
}
}
private CatalogProtos.TableStatsProto reloadInputStats() {
synchronized(inputStats) {
if (this.executor == null) {
return inputStats.getProto();
}
TableStats executorInputStats = this.executor.getInputStats();
if (executorInputStats != null) {
inputStats.setValues(executorInputStats);
}
return inputStats.getProto();
}
}
private TaskCompletionReport getTaskCompletionReport() {
TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
builder.setId(context.getTaskId().getProto());
builder.setInputStats(reloadInputStats());
if (context.hasResultStats()) {
builder.setResultStats(context.getResultStats().getProto());
} else {
builder.setResultStats(new TableStats().getProto());
}
if (!context.getPartitions().isEmpty()) {
builder.addAllPartitions(context.getPartitions());
}
Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
if (it.hasNext()) {
do {
Entry<Integer, String> entry = it.next();
ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
part.setPartId(entry.getKey());
// Set output volume
if (context.getPartitionOutputVolume() != null) {
for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
if (entry.getKey().equals(e.getKey())) {
part.setVolume(e.getValue().longValue());
break;
}
}
}
builder.addShuffleFileOutputs(part.build());
} while (it.hasNext());
}
return builder.build();
}
private void waitForFetch() throws InterruptedException, IOException {
context.getFetchLatch().await();
LOG.info(context.getTaskId() + " All fetches are done!");
Collection<String> inputs = Lists.newArrayList(context.getInputTables());
// Get all broadcasted tables
Set<String> broadcastTableNames = new HashSet<String>();
List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
if (broadcasts != null) {
for (EnforceProperty eachBroadcast : broadcasts) {
broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
}
}
// localize the fetched data and skip the broadcast table
for (String inputTable: inputs) {
if (broadcastTableNames.contains(inputTable)) {
continue;
}
File tableDir = new File(context.getFetchIn(), inputTable);
FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
context.updateAssignedFragments(inputTable, frags);
}
}
@Override
public void run() throws Exception {
startTime = System.currentTimeMillis();
Throwable error = null;
try {
if(!context.isStopped()) {
context.setState(TajoProtos.TaskAttemptState.TA_RUNNING);
if (context.hasFetchPhase()) {
// If the fetch is still in progress, the query unit must wait for complete.
waitForFetch();
context.setFetcherProgress(FETCHER_PROGRESS);
updateProgress();
}
this.executor = executionBlockContext.getTQueryEngine().createPlan(context, plan);
this.executor.init();
while(!context.isStopped() && executor.next() != null) {
}
}
} catch (Throwable e) {
error = e ;
LOG.error(e.getMessage(), e);
stopScriptExecutors();
context.stop();
} finally {
if (executor != null) {
try {
executor.close();
reloadInputStats();
} catch (IOException e) {
LOG.error(e, e);
}
this.executor = null;
}
executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(getId());
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);
if (context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
if (error != null) {
if (error.getMessage() == null) {
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
} else {
errorBuilder.setErrorMessage(error.getMessage());
}
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
executionBlockContext.failedTasksNum.incrementAndGet();
}
} else {
// if successful
context.stop();
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
executionBlockContext.succeededTasksNum.incrementAndGet();
TaskCompletionReport report = getTaskCompletionReport();
queryMasterStub.done(null, report, NullCallback.get());
}
endTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
"Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ ", failed: " + executionBlockContext.failedTasksNum.intValue());
}
}
@Override
public void cleanup() {
// history store in memory while running stage
TaskHistory taskHistory = createTaskHistory();
executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory);
executionBlockContext.getTasks().remove(getId());
fetcherRunners.clear();
fetcherRunners = null;
try {
if(executor != null) {
executor.close();
executor = null;
}
} catch (IOException e) {
LOG.fatal(e.getMessage(), e);
}
executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
stopScriptExecutors();
}
@Override
public TaskHistory createTaskHistory() {
TaskHistory taskHistory = null;
try {
taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
startTime, endTime, reloadInputStats());
if (context.getOutputPath() != null) {
taskHistory.setOutputPath(context.getOutputPath().toString());
}
if (context.getWorkDir() != null) {
taskHistory.setWorkingPath(context.getWorkDir().toString());
}
if (context.getResultStats() != null) {
taskHistory.setOutputStats(context.getResultStats().getProto());
}
if (hasFetchPhase()) {
taskHistory.setTotalFetchCount(fetcherRunners.size());
int i = 0;
FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
for (Fetcher fetcher : fetcherRunners) {
builder.setStartTime(fetcher.getStartTime());
builder.setFinishTime(fetcher.getFinishTime());
builder.setFileLength(fetcher.getFileLen());
builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
builder.setState(fetcher.getState());
taskHistory.addFetcherHistory(builder.build());
if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
}
taskHistory.setFinishedFetchCount(i);
}
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
}
return taskHistory;
}
public List<Fetcher> getFetchers() {
return fetcherRunners;
}
public int hashCode() {
return context.hashCode();
}
public boolean equals(Object obj) {
if (obj instanceof TaskImpl) {
TaskImpl other = (TaskImpl) obj;
return this.context.equals(other.context);
}
return false;
}
private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
throws IOException {
Configuration c = new Configuration(systemConf);
c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
FileSystem fs = FileSystem.get(c);
Path tablePath = new Path(file.getAbsolutePath());
List<FileFragment> listTablets = new ArrayList<FileFragment>();
FileFragment tablet;
FileStatus[] fileLists = fs.listStatus(tablePath);
for (FileStatus f : fileLists) {
if (f.getLen() == 0) {
continue;
}
tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
listTablets.add(tablet);
}
// Special treatment for locally pseudo fetched chunks
synchronized (localChunks) {
for (FileChunk chunk : localChunks) {
if (name.equals(chunk.getEbId())) {
tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
listTablets.add(tablet);
}
}
}
FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
return tablets;
}
private class FetchRunner implements Runnable {
private final TaskAttemptContext ctx;
private final Fetcher fetcher;
private int maxRetryNum;
public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
this.ctx = ctx;
this.fetcher = fetcher;
this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
}
@Override
public void run() {
int retryNum = 0;
int retryWaitTime = 1000; //sec
try { // for releasing fetch latch
while(!context.isStopped() && retryNum < maxRetryNum) {
if (retryNum > 0) {
try {
Thread.sleep(retryWaitTime);
retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
} catch (InterruptedException e) {
LOG.error(e);
}
LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
}
try {
FileChunk fetched = fetcher.get();
if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
&& fetched.getFile() != null) {
if (fetched.fromRemote() == false) {
localChunks.add(fetched);
LOG.info("Add a new FileChunk to local chunk list");
}
break;
}
} catch (Throwable e) {
LOG.error("Fetch failed: " + fetcher.getURI(), e);
}
retryNum++;
}
} finally {
if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
fetcherFinished(ctx);
} else {
if (retryNum == maxRetryNum) {
LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
}
stopScriptExecutors();
context.stop(); // retry task
ctx.getFetchLatch().countDown();
}
}
}
}
@VisibleForTesting
public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
if (totalFetcher > 0) {
return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
} else {
return 0.0f;
}
}
private synchronized void fetcherFinished(TaskAttemptContext ctx) {
int fetcherSize = fetcherRunners.size();
if(fetcherSize == 0) {
return;
}
ctx.getFetchLatch().countDown();
int remainFetcher = (int) ctx.getFetchLatch().getCount();
if (remainFetcher == 0) {
context.setFetcherProgress(FETCHER_PROGRESS);
} else {
context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
}
}
private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
List<FetchImpl> fetches) throws IOException {
if (fetches.size() > 0) {
Path inputDir = executionBlockContext.getLocalDirAllocator().
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
int i = 0;
int localStoreChunkCount = 0;
File storeDir;
File defaultStoreFile;
FileChunk storeChunk = null;
List<Fetcher> runnerList = Lists.newArrayList();
for (FetchImpl f : fetches) {
storeDir = new File(inputDir.toString(), f.getName());
if (!storeDir.exists()) {
if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
}
for (URI uri : f.getURIs()) {
defaultStoreFile = new File(storeDir, "in_" + i);
InetAddress address = InetAddress.getByName(uri.getHost());
WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
storeChunk = getLocalStoredFileChunk(uri, systemConf);
// When a range request is out of range, storeChunk will be NULL. This case is normal state.
// So, we should skip and don't need to create storeChunk.
if (storeChunk == null || storeChunk.length() == 0) {
continue;
}
if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
storeChunk.setFromRemote(false);
localStoreChunkCount++;
} else {
storeChunk = new FileChunk(defaultStoreFile, 0, -1);
storeChunk.setFromRemote(true);
}
} else {
storeChunk = new FileChunk(defaultStoreFile, 0, -1);
storeChunk.setFromRemote(true);
}
// If we decide that intermediate data should be really fetched from a remote host, storeChunk
// represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
storeChunk.setEbId(f.getName());
Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
runnerList.add(fetcher);
i++;
if (LOG.isDebugEnabled()) {
LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
}
}
}
ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
LOG.info("Create shuffle Fetchers local:" + localStoreChunkCount +
", remote:" + (runnerList.size() - localStoreChunkCount));
return runnerList;
} else {
return Lists.newArrayList();
}
}
private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
// Parse the URI
// Parsing the URL into key-values
final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
String partId = params.get("p").get(0);
String queryId = params.get("qid").get(0);
String shuffleType = params.get("type").get(0);
String sid = params.get("sid").get(0);
final List<String> taskIdList = params.get("ta");
final List<String> offsetList = params.get("offset");
final List<String> lengthList = params.get("length");
long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ ", taskIds=" + taskIdList);
}
// The working directory of Tajo worker for each query, including stage
Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
FileChunk chunk;
// If the stage requires a range shuffle
if (shuffleType.equals("r")) {
Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
LOG.warn("Range shuffle - file not exist. " + outputPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
try {
chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
} catch (Throwable t) {
LOG.error("getFileChunks() throws exception");
return null;
}
// If the stage requires a hash shuffle or a scattered hash shuffle
} else if (shuffleType.equals("h") || shuffleType.equals("s")) {
int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
File file = new File(path.toUri());
long startPos = (offset >= 0 && length >= 0) ? offset : 0;
long readLen = (offset >= 0 && length >= 0) ? length : file.length();
if (startPos >= file.length()) {
LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
return null;
}
chunk = new FileChunk(file, startPos, readLen);
} else {
LOG.error("Unknown shuffle type");
return null;
}
return chunk;
}
public static Path getTaskAttemptDir(TaskAttemptId quid) {
Path workDir =
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
String.valueOf(quid.getTaskId().getId()),
String.valueOf(quid.getId()));
return workDir;
}
}