blob: 8fadd8939aff99546d14007f9a238e85529db5e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
/**
* Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
*
*/
@Private
public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
FetchedInputCallback {
private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
private final Configuration conf;
private final TezTaskOutputFiles fileNameAllocator;
private final LocalDirAllocator localDirAllocator;
// Configuration parameters
private final long memoryLimit;
private final long maxSingleShuffleLimit;
private final long maxAvailableTaskMemory;
private final long initialMemoryAvailable;
private volatile long usedMemory = 0;
public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
long maxTaskAvailableMemory, long memoryAvailable) {
this.conf = conf;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
this.initialMemoryAvailable = memoryAvailable;
this.fileNameAllocator = new TezTaskOutputFiles(conf,
uniqueIdentifier);
this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
// Setup configuration
final float maxInMemCopyUse = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ maxInMemCopyUse);
}
long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
if (memReq <= this.initialMemoryAvailable) {
this.memoryLimit = memReq;
} else {
this.memoryLimit = initialMemoryAvailable;
}
LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
final float singleShuffleMemoryLimitPercent = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
LOG.info("SimpleInputManager -> " + "MemoryLimit: " +
this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
}
@Private
public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
final float maxInMemCopyUse = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ maxInMemCopyUse);
}
long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
return memReq;
}
@Override
public synchronized FetchedInput allocate(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
if (actualSize > maxSingleShuffleLimit
|| this.usedMemory + actualSize > this.memoryLimit) {
return new DiskFetchedInput(actualSize, compressedSize,
inputAttemptIdentifier, this, conf, localDirAllocator,
fileNameAllocator);
} else {
this.usedMemory += actualSize;
LOG.info("Used memory after allocating " + actualSize + " : " + usedMemory);
return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
}
}
@Override
public synchronized void fetchComplete(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
// Not tracking anything here.
case DISK:
case MEMORY:
break;
default:
throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ " not expected for Broadcast fetch");
}
}
@Override
public synchronized void fetchFailed(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
@Override
public synchronized void freeResources(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
private void cleanup(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
case DISK:
break;
case MEMORY:
unreserve(fetchedInput.getActualSize());
break;
default:
throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ " not expected for Broadcast fetch");
}
}
private synchronized void unreserve(long size) {
this.usedMemory -= size;
LOG.info("Used memory after freeing " + size + " : " + usedMemory);
}
}