blob: fdd20c9bec1e08b986534193183a2752b2b77605 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HashShuffleAppenderManager {
private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
private TajoConf systemConf;
private FileSystem defaultFS;
private FileSystem localFS;
private LocalDirAllocator lDirAllocator;
private int pageSize;
public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
this.systemConf = systemConf;
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
// initialize DFS and LocalFileSystems
defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
localFS = FileSystem.getLocal(systemConf);
pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
}
public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
TableMeta meta, Schema outSchema) throws IOException {
synchronized (appenderMap) {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
if (partitionAppenderMap == null) {
partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>();
appenderMap.put(ebId, partitionAppenderMap);
}
PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
if (partitionAppenderMeta == null) {
Path dataFile = getDataFile(ebId, partId);
FileSystem fs = dataFile.getFileSystem(systemConf);
if (fs.exists(dataFile)) {
FileStatus status = fs.getFileStatus(dataFile);
LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
}
if (!fs.exists(dataFile.getParent())) {
fs.mkdirs(dataFile.getParent());
}
FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
tajoConf).getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
partitionAppenderMeta = new PartitionAppenderMeta();
partitionAppenderMeta.partId = partId;
partitionAppenderMeta.dataFile = dataFile;
partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
partitionAppenderMeta.appender.init();
partitionAppenderMap.put(partId, partitionAppenderMeta);
LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
}
return partitionAppenderMeta.appender;
}
}
public static int getPartParentId(int partId, TajoConf tajoConf) {
return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
}
private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
try {
// the base dir for an output dir
String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf));
//LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
// If EB has many partition, too many shuffle file are in single directory.
return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId +
"." + RawFile.FILE_EXTENSION);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
}
public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
synchronized (appenderMap) {
partitionAppenderMap = appenderMap.remove(ebId);
}
if (partitionAppenderMap == null) {
LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
return null;
}
// Send Intermediate data to QueryMaster.
List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
try {
eachMeta.appender.close();
HashShuffleIntermediate intermediate =
new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
eachMeta.appender.getPages(),
eachMeta.appender.getMergedTupleIndexes());
intermEntries.add(intermediate);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
throw e;
}
}
LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
return intermEntries;
}
public void finalizeTask(QueryUnitAttemptId taskId) {
synchronized (appenderMap) {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
if (partitionAppenderMap == null) {
return;
}
for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
eachAppender.appender.taskFinished(taskId);
}
}
}
public static class HashShuffleIntermediate {
private int partId;
private long volume;
//[<page start offset,<task start, task end>>]
private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
//[<page start offset, length>]
private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
public HashShuffleIntermediate(int partId, long volume,
List<Pair<Long, Integer>> pages,
Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
this.partId = partId;
this.volume = volume;
this.failureTskTupleIndexes = failureTskTupleIndexes;
this.pages = pages;
}
public int getPartId() {
return partId;
}
public long getVolume() {
return volume;
}
public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
return failureTskTupleIndexes;
}
public List<Pair<Long, Integer>> getPages() {
return pages;
}
}
static class PartitionAppenderMeta {
int partId;
HashShuffleAppender appender;
Path dataFile;
public int getPartId() {
return partId;
}
public HashShuffleAppender getAppender() {
return appender;
}
public Path getDataFile() {
return dataFile;
}
}
}