blob: 7abfce6b8b0a9bff853c9a39062ee8cbc5ed9311 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.RemoteFetchedInput;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import static org.apache.tez.common.RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH;
/** Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart() */
@Private
public class RssSimpleFetchedInputAllocator extends SimpleFetchedInputAllocator {
private static final Logger LOG = LoggerFactory.getLogger(RssSimpleFetchedInputAllocator.class);
// In order to be compatible with the Tez IFile file format, the decoded data needs to be added
// with the corresponding HEADER and CHECKSUM, which occupy 8 bytes.
private static final int IFILE_HEAD_CHECKSUM_LEN = 8;
private final Configuration conf;
private final TezTaskOutputFiles fileNameAllocator;
private final LocalDirAllocator localDirAllocator;
// Configuration parameters
private final long memoryLimit;
private final long maxSingleShuffleLimit;
private final long maxAvailableTaskMemory;
private final long initialMemoryAvailable;
private final String srcNameTrimmed;
private volatile long usedMemory = 0;
private final String uniqueIdentifier;
private final String appAttemptId;
private final boolean remoteSpillEnable;
private FileSystem remoteFS;
private String remoteSpillBasePath;
public RssSimpleFetchedInputAllocator(
String srcNameTrimmed,
String uniqueIdentifier,
int dagID,
Configuration conf,
long maxTaskAvailableMemory,
long memoryAvailable,
String appAttemptId) {
super(srcNameTrimmed, uniqueIdentifier, dagID, conf, maxTaskAvailableMemory, memoryAvailable);
this.srcNameTrimmed = srcNameTrimmed;
this.conf = conf;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
this.initialMemoryAvailable = memoryAvailable;
this.uniqueIdentifier = uniqueIdentifier;
this.appAttemptId = appAttemptId;
this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier, dagID);
this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// Setup configuration
final float maxInMemCopyUse =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new RssException(
"Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT
+ ": "
+ maxInMemCopyUse);
}
long memReq =
(long)
(conf.getLong(
Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE))
* maxInMemCopyUse);
if (memReq <= this.initialMemoryAvailable) {
this.memoryLimit = memReq;
} else {
this.memoryLimit = initialMemoryAvailable;
}
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
throw new RssException(
"Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT
+ ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit =
(long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
this.remoteSpillEnable = conf.getBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, false);
if (this.remoteSpillEnable) {
this.remoteSpillBasePath = conf.get(RSS_REMOTE_SPILL_STORAGE_PATH);
if (StringUtils.isBlank(this.remoteSpillBasePath)) {
throw new RssException("You must set remote spill path!");
}
// construct remote configuration
String remoteStorageConf = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
Map<String, String> remoteStorageConfMap =
RemoteStorageInfo.parseRemoteStorageConf(remoteStorageConf);
Configuration remoteConf = new Configuration(this.conf);
for (Map.Entry<String, String> entry : remoteStorageConfMap.entrySet()) {
remoteConf.set(entry.getKey(), entry.getValue());
}
// construct remote filesystem
int replication =
this.conf.getInt(
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
int retries =
this.conf.getInt(
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
try {
remoteConf.setInt("dfs.replication", replication);
remoteConf.setInt("dfs.client.block.write.retries", retries);
remoteFS =
HadoopFilesystemProvider.getFilesystem(new Path(this.remoteSpillBasePath), remoteConf);
} catch (Exception e) {
throw new RssException("Cannot init remoteFS on path:" + this.remoteSpillBasePath);
}
}
LOG.info(
srcNameTrimmed
+ ": "
+ "RequestedMemory="
+ memReq
+ ", AssignedMemory="
+ this.memoryLimit
+ ", maxSingleShuffleLimit="
+ this.maxSingleShuffleLimit);
}
@Private
public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
final float maxInMemCopyUse =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new RssException(
"Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT
+ ": "
+ maxInMemCopyUse);
}
return (long)
(conf.getLong(
Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE))
* maxInMemCopyUse);
}
@Override
public synchronized FetchedInput allocate(
long actualSize, long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
throws IOException {
if (actualSize > maxSingleShuffleLimit || this.usedMemory + actualSize > this.memoryLimit) {
if (remoteSpillEnable) {
LOG.info("Allocate RemoteFetchedInput, length:{}", actualSize);
return new RemoteFetchedInput(
actualSize + IFILE_HEAD_CHECKSUM_LEN,
inputAttemptIdentifier,
this,
remoteFS,
remoteSpillBasePath,
uniqueIdentifier,
appAttemptId);
} else {
LOG.info("Allocate DiskFetchedInput, length:{}", actualSize);
return new DiskFetchedInput(
actualSize + IFILE_HEAD_CHECKSUM_LEN,
inputAttemptIdentifier,
this,
conf,
localDirAllocator,
fileNameAllocator);
}
} else {
this.usedMemory += actualSize;
if (LOG.isDebugEnabled()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Used memory after allocating "
+ actualSize
+ " : "
+ usedMemory);
}
return new MemoryFetchedInput(actualSize, inputAttemptIdentifier, this);
}
}
@Override
public synchronized FetchedInput allocateType(
Type type,
long actualSize,
long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier)
throws IOException {
switch (type) {
case DISK:
// It should not be called here.
if (remoteSpillEnable) {
LOG.info("AllocateType RemoteFetchedInput, compressedSize:{}", compressedSize);
return new RemoteFetchedInput(
actualSize + IFILE_HEAD_CHECKSUM_LEN,
inputAttemptIdentifier,
this,
remoteFS,
remoteSpillBasePath,
uniqueIdentifier,
appAttemptId);
} else {
LOG.info("AllocateType DiskFetchedInput, compressedSize:{}", compressedSize);
return new DiskFetchedInput(
actualSize + IFILE_HEAD_CHECKSUM_LEN,
inputAttemptIdentifier,
this,
conf,
localDirAllocator,
fileNameAllocator);
}
default:
return allocate(actualSize, compressedSize, inputAttemptIdentifier);
}
}
@Override
public synchronized void fetchComplete(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
// Not tracking anything here.
case DISK:
case DISK_DIRECT:
case MEMORY:
break;
default:
throw new RssException(
"InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
}
}
@Override
public synchronized void fetchFailed(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
@Override
public synchronized void freeResources(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
private void cleanup(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
case DISK:
break;
case MEMORY:
unreserve(((MemoryFetchedInput) fetchedInput).getSize());
break;
default:
throw new RssException(
"InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
}
}
private synchronized void unreserve(long size) {
this.usedMemory -= size;
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Used memory after freeing {}: {}", srcNameTrimmed, size, usedMemory);
}
}
@VisibleForTesting
public void setRemoteFS(FileSystem remoteFS) {
this.remoteFS = remoteFS;
}
}