blob: ed49537d9306b4d15b5ec8ee7fd81621c449542d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class HashShuffleAppenderManager {
private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
private ConcurrentMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = Maps.newConcurrentMap();
private ConcurrentMap<Integer, ExecutorService> executors = Maps.newConcurrentMap(); // for parallel writing
private List<String> temporalPaths = Lists.newArrayList();
private TajoConf systemConf;
private FileSystem defaultFS;
private FileSystem localFS;
private LocalDirAllocator lDirAllocator;
private int pageSize;
public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
this.systemConf = systemConf;
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
// initialize DFS and LocalFileSystems
defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
localFS = FileSystem.getLocal(systemConf);
pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * StorageUnit.MB;
Iterable<Path> allLocalPath = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
//add async hash shuffle writer
for (Path path : allLocalPath) {
temporalPaths.add(localFS.makeQualified(path).toString());
executors.put(temporalPaths.size() - 1, Executors.newSingleThreadExecutor());
}
}
protected int getVolumeId(Path path) {
int i = 0;
for (String rootPath : temporalPaths) {
if (path.toString().startsWith(rootPath)) {
break;
}
i++;
}
Preconditions.checkPositionIndex(i, temporalPaths.size() - 1);
return i;
}
public synchronized HashShuffleAppenderWrapper getAppender(MemoryRowBlock memoryRowBlock, ExecutionBlockId ebId,
int partId, TableMeta meta, Schema outSchema)
throws IOException {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
if (partitionAppenderMap == null) {
partitionAppenderMap = Maps.newConcurrentMap();
appenderMap.put(ebId, partitionAppenderMap);
}
PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
if (partitionAppenderMeta == null) {
Path dataFile = getDataFile(ebId, partId);
FileSystem fs = dataFile.getFileSystem(systemConf);
if (fs.exists(dataFile)) {
FileStatus status = fs.getFileStatus(dataFile);
LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
}
if (!fs.exists(dataFile.getParent())) {
fs.mkdirs(dataFile.getParent());
}
DirectRawFileWriter appender =
new DirectRawFileWriter(systemConf, null, outSchema, meta, dataFile, memoryRowBlock);
appender.enableStats();
appender.init();
partitionAppenderMeta = new PartitionAppenderMeta();
partitionAppenderMeta.partId = partId;
partitionAppenderMeta.dataFile = dataFile;
partitionAppenderMeta.appender =
new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender, getVolumeId(dataFile));
partitionAppenderMeta.appender.init();
partitionAppenderMap.put(partId, partitionAppenderMeta);
if (LOG.isDebugEnabled()) {
LOG.debug("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
}
}
return partitionAppenderMeta.appender;
}
public static int getPartParentId(int partId, TajoConf tajoConf) {
return partId % tajoConf.getIntVar(ConfVars.SHUFFLE_HASH_PARENT_DIRS);
}
private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
try {
// the base dir for an output dir
String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
Path baseDirPath = lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf);
//LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
// If EB has many partition, too many shuffle file are in single directory.
return localFS.makeQualified(
StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
}
public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.remove(ebId);
if (partitionAppenderMap == null) {
LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", not a hash shuffle");
return null;
}
// Send Intermediate data to QueryMaster.
List<HashShuffleIntermediate> intermediateEntries = Lists.newArrayList();
for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
try {
eachMeta.appender.close();
HashShuffleIntermediate intermediate =
new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
eachMeta.appender.getPages(),
eachMeta.appender.getMergedTupleIndexes());
intermediateEntries.add(intermediate);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
throw e;
}
}
LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", intermediates=" + intermediateEntries.size());
return intermediateEntries;
}
public void finalizeTask(TaskAttemptId taskId) {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
appenderMap.get(taskId.getTaskId().getExecutionBlockId());
if (partitionAppenderMap == null) {
return;
}
for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
eachAppender.appender.taskFinished(taskId);
}
}
/**
* Asynchronously write partitions.
*/
public Future<MemoryRowBlock> writePartitions(TableMeta meta, Schema schema, final TaskAttemptId taskId, int partId,
final MemoryRowBlock rowBlock,
final boolean release) throws IOException {
final HashShuffleAppenderWrapper appender =
getAppender(rowBlock, taskId.getTaskId().getExecutionBlockId(), partId, meta, schema);
ExecutorService executor = executors.get(appender.getVolumeId());
return executor.submit(new Callable<MemoryRowBlock>() {
@Override
public MemoryRowBlock call() throws Exception {
appender.writeRowBlock(taskId, rowBlock);
if (release) rowBlock.release();
else rowBlock.clear();
return rowBlock;
}
});
}
public void shutdown() {
for (ExecutorService service : executors.values()) {
service.shutdownNow();
}
}
public static class HashShuffleIntermediate {
private int partId;
private long volume;
//[<page start offset,<task start, task end>>]
private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
//[<page start offset, length>]
private List<Pair<Long, Integer>> pages = Lists.newArrayList();
public HashShuffleIntermediate(int partId, long volume,
List<Pair<Long, Integer>> pages,
Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
this.partId = partId;
this.volume = volume;
this.failureTskTupleIndexes = failureTskTupleIndexes;
this.pages = pages;
}
public int getPartId() {
return partId;
}
public long getVolume() {
return volume;
}
public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
return failureTskTupleIndexes;
}
public List<Pair<Long, Integer>> getPages() {
return pages;
}
}
static class PartitionAppenderMeta {
int partId;
HashShuffleAppenderWrapper appender;
Path dataFile;
public int getPartId() {
return partId;
}
public HashShuffleAppenderWrapper getAppender() {
return appender;
}
public Path getDataFile() {
return dataFile;
}
}
}