blob: cda52dac89a01d002ffe489f589a1e928d5ec5ff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.broadcast.input;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
public class BroadcastInputManager implements FetchedInputAllocator,
FetchedInputCallback {
private final Configuration conf;
private final TezTaskOutputFiles fileNameAllocator;
private final LocalDirAllocator localDirAllocator;
// Configuration parameters
private final long memoryLimit;
private final long maxSingleShuffleLimit;
private long usedMemory = 0;
public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
this.conf = conf;
this.fileNameAllocator = new TezTaskOutputFiles(conf,
inputContext.getUniqueIdentifier());
this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
// Setup configuration
final float maxInMemCopyUse = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
this.memoryLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
final float singleShuffleMemoryLimitPercent = conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
}
@Override
public synchronized FetchedInput allocate(long size,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
if (size > maxSingleShuffleLimit
|| this.usedMemory + size > this.memoryLimit) {
return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
localDirAllocator, fileNameAllocator);
} else {
this.usedMemory += size;
return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
}
}
@Override
public void fetchComplete(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
// Not tracking anything here.
case DISK:
case MEMORY:
break;
default:
throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ " not expected for Broadcast fetch");
}
}
@Override
public void fetchFailed(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
@Override
public void freeResources(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
private void cleanup(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
case DISK:
break;
case MEMORY:
unreserve(fetchedInput.getSize());
break;
default:
throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ " not expected for Broadcast fetch");
}
}
private synchronized void unreserve(long size) {
this.usedMemory -= size;
}
}