blob: 0da1aa6e050bd96ff73f22954f065389ac6364be [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.tuple.memory.RowBlock;
import org.apache.tajo.tuple.memory.RowWriter;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
* file outputs associated with shuffle keys. The file outputs are stored on local disks.
*/
public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
private static final Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
private static final int MAXIMUM_INITIAL_BUFFER_SIZE = StorageUnit.MB;
private static final int MINIMUM_INITIAL_BUFFER_SIZE = 4 * StorageUnit.KB;
// Buffer usage is greater than threshold, it will be flush to local storage
private static final float BUFFER_THRESHOLD_FACTOR = 0.8f;
private final ShuffleFileWriteNode plan;
private final TableMeta meta;
private final Partitioner partitioner;
private final int numShuffleOutputs;
private final int[] shuffleKeyIds;
private final HashShuffleAppenderManager hashShuffleAppenderManager;
private final int maxBufferSize;
private final int bufferThreshold;
private final int initialBufferSize;
private final DataType[] dataTypes;
private final Map<Integer, MemoryRowBlock> partitionMemoryMap;
private long writtenBytes = 0;
private long usedBufferSize = 0;
private long totalBufferCapacity = 0;
public HashShuffleFileWriteExec(TaskAttemptContext context,
final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
Preconditions.checkArgument(plan.hasShuffleKeys());
this.plan = plan;
if (plan.hasOptions()) {
this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
} else {
this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), context.getConf());
}
// about the shuffle
this.numShuffleOutputs = this.plan.getNumOutputs();
int i = 0;
this.shuffleKeyIds = new int [this.plan.getShuffleKeys().length];
for (Column key : this.plan.getShuffleKeys()) {
shuffleKeyIds[i] = inSchema.getColumnId(key.getQualifiedName());
i++;
}
this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager();
this.maxBufferSize = context.getQueryContext().getInt(SessionVars.HASH_SHUFFLE_BUFFER_SIZE) * StorageUnit.MB;
this.bufferThreshold = (int) (maxBufferSize * BUFFER_THRESHOLD_FACTOR);
this.dataTypes = SchemaUtil.toDataTypes(outSchema);
if(numShuffleOutputs > 0){
//calculate initial buffer by total partition. a buffer size will be 4Kb ~ 1MB
this.initialBufferSize = Math.min(MAXIMUM_INITIAL_BUFFER_SIZE,
Math.max(maxBufferSize / numShuffleOutputs, MINIMUM_INITIAL_BUFFER_SIZE));
} else {
this.initialBufferSize = MINIMUM_INITIAL_BUFFER_SIZE;
}
this.partitionMemoryMap = Maps.newHashMap();
}
@Override
public void init() throws IOException {
super.init();
}
@Override
public Tuple next() throws IOException {
try {
Tuple tuple;
int partId;
long numRows = 0;
while (!context.isStopped() && (tuple = child.next()) != null) {
partId = partitioner.getPartition(tuple);
MemoryRowBlock rowBlock = partitionMemoryMap.get(partId);
if (rowBlock == null) {
rowBlock = new MemoryRowBlock(dataTypes, initialBufferSize, true, plan.getStorageType());
partitionMemoryMap.put(partId, rowBlock);
totalBufferCapacity += rowBlock.capacity();
}
RowWriter writer = rowBlock.getWriter();
long prevUsedMem = rowBlock.usedMem();
totalBufferCapacity -= rowBlock.capacity();
writer.addTuple(tuple);
numRows++;
totalBufferCapacity += rowBlock.capacity(); // calculate resizeable buffer capacity
usedBufferSize += (rowBlock.usedMem() - prevUsedMem);
// if total buffer capacity are required more than maxBufferSize,
// all partitions are flushed and the buffers are released
if (totalBufferCapacity > maxBufferSize) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Too low buffer usage. threshold: %s, total capacity: %s, used: %s",
FileUtil.humanReadableByteCount(maxBufferSize, false),
FileUtil.humanReadableByteCount(totalBufferCapacity, false),
FileUtil.humanReadableByteCount(usedBufferSize, false)));
}
//flush and release buffer
flushBuffer(partitionMemoryMap, true);
writtenBytes += usedBufferSize;
totalBufferCapacity = usedBufferSize = 0;
} else if (usedBufferSize > bufferThreshold) {
//flush and reuse buffer
flushBuffer(partitionMemoryMap, false);
writtenBytes += usedBufferSize;
usedBufferSize = 0;
}
}
// flush remaining buffers
flushBuffer(partitionMemoryMap, true);
writtenBytes += usedBufferSize;
usedBufferSize = totalBufferCapacity = 0;
TableStats aggregated = new TableStats();
aggregated.setNumBytes(writtenBytes);
aggregated.setNumRows(numRows);
context.setResultStats(aggregated);
return null;
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
}
/**
* flush all buffer to local storage
*/
private void flushBuffer(Map<Integer, MemoryRowBlock> partitionMemoryMap, boolean releaseBuffer)
throws IOException, ExecutionException, InterruptedException {
List<Future<MemoryRowBlock>> resultList = Lists.newArrayList();
ArrayList<Integer> unusedBuffer = Lists.newArrayList();
for (Map.Entry<Integer, MemoryRowBlock> entry : partitionMemoryMap.entrySet()) {
int appendPartId = entry.getKey();
MemoryRowBlock memoryRowBlock = entry.getValue();
if (memoryRowBlock.getMemory().isReadable()) {
//flush and release buffer
resultList.add(hashShuffleAppenderManager.
writePartitions(meta, outSchema, context.getTaskId(), appendPartId, memoryRowBlock, releaseBuffer));
} else {
if (releaseBuffer) {
memoryRowBlock.release();
} else {
unusedBuffer.add(appendPartId);
}
}
}
// wait for flush to storage
for (Future<MemoryRowBlock> future : resultList) {
future.get();
}
if (releaseBuffer) {
partitionMemoryMap.clear();
} else {
// release the unused partition
for (Integer id : unusedBuffer) {
MemoryRowBlock memoryRowBlock = partitionMemoryMap.remove(id);
memoryRowBlock.release();
}
}
}
@Override
public void rescan() throws IOException {
throw new TajoRuntimeException(new UnsupportedException());
}
@Override
public void close() throws IOException{
if (partitionMemoryMap.size() > 0) {
for (RowBlock rowBlock : partitionMemoryMap.values()) {
rowBlock.release();
}
partitionMemoryMap.clear();
}
progress = 1.0f;
super.close();
}
}