blob: c4e74507eaffac22c4e00bd3addf7a0a6127739b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.data;
import org.apache.nemo.conf.JobConf;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
/**
* A class to restrict parallel connection per runtime edge.
* Executors can suffer from performance degradation and network-related exceptions when there are massive connections,
* especially under low network bandwidth or high volume of data.
*/
public final class BlockTransferThrottler {
private static final Logger LOG = LoggerFactory.getLogger(BlockTransferThrottler.class.getName());
private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = new HashMap<>();
private final Map<String, Queue<CompletableFuture<Void>>> runtimeEdgeIdToPendingConnections = new HashMap<>();
private final int maxNum;
@Inject
private BlockTransferThrottler(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNum) {
this.maxNum = maxNum;
}
/**
* Request a permission to make a connection.
*
* @param runtimeEdgeId the corresponding runtime edge id.
* @return a future that will be completed when the connection is granted.
*/
public synchronized CompletableFuture<Void> requestTransferPermission(final String runtimeEdgeId) {
runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new ArrayDeque<>());
final int currentOutstandingConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
if (currentOutstandingConnections < maxNum) {
// grant immediately
runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, currentOutstandingConnections + 1);
return CompletableFuture.completedFuture(null);
} else {
// add to pending queue
final CompletableFuture<Void> future = new CompletableFuture<>();
runtimeEdgeIdToPendingConnections.get(runtimeEdgeId).add(future);
return future;
}
}
/**
* Indicates the transfer has finished.
*
* @param runtimeEdgeId the corresponding runtime edge id.
*/
public synchronized void onTransferFinished(final String runtimeEdgeId) {
final Queue<CompletableFuture<Void>> pendingConnections = runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
if (pendingConnections.size() == 0) {
// Just decrease the number of current connections.
// Since we have no pending connections, we leave pendingConnections queue untouched.
final int numCurrentConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, numCurrentConnections - 1);
} else {
// Since pendingConnections has at least one element, the poll method invocation will immediately return.
// One connection is completed, and another connection kicks in; the number of current connection stays same
final CompletableFuture<Void> nextFuture = pendingConnections.poll();
nextFuture.complete(null);
}
}
}