blob: 2cdef3eb5d5b040f222269650bd4f4148a693fb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.io;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The locatable input split assigner assigns to each host splits that are local, before assigning
* splits that are not local. When there are several splits on the given host, the assigner prefers
* to assign the splits whose first replica is on the host.
*/
@Public
public final class LocatableInputSplitAssigner implements InputSplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
// unassigned input splits
private final BlockingQueue<AssigningInputSplit> unassigned = new LinkedBlockingQueue<>();
private Map<String, BlockingQueue<AssigningInputSplit>> splitHostMap = new HashMap<>();
private AtomicInteger localAssignments = new AtomicInteger();
private AtomicInteger remoteAssignments = new AtomicInteger();
// --------------------------------------------------------------------------------------------
public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
this(splits.toArray(new LocatableInputSplit[splits.size()]));
}
public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
int maxReplicaSize = 0;
for (LocatableInputSplit locatableInputSplit : splits) {
AssigningInputSplit split = new AssigningInputSplit(locatableInputSplit);
unassigned.add(split);
maxReplicaSize = maxReplicaSize > split.getHostNames().length ? maxReplicaSize : split.getHostNames().length;
}
for (int i = 0; i < maxReplicaSize; i++) {
for (AssigningInputSplit split : unassigned) {
if (split.getHostNames().length <= i) {
continue;
}
String hostName = split.getHostNames()[i];
if (hostName == null) {
continue;
}
String host = NetUtils.getHostnameFromFQDN(hostName.toLowerCase());
splitHostMap.computeIfAbsent(host, k->new LinkedBlockingQueue<>()).add(split);
}
}
}
// --------------------------------------------------------------------------------------------
@Override
public LocatableInputSplit getNextInputSplit(String host, int taskId) {
if (host != null) {
host = host.toLowerCase(Locale.US);
final BlockingQueue inputSplitQueue = splitHostMap.get(host);
if (inputSplitQueue != null) {
LocatableInputSplit split = takeUnAssignedSplit(inputSplitQueue);
if (split != null) {
localAssignments.incrementAndGet();
LOG.info("Assigning local split to host " + host);
return split;
}
}
}
LocatableInputSplit split = takeUnAssignedSplit(unassigned);
if (split != null) {
remoteAssignments.incrementAndGet();
LOG.info("Assigning remote split to host " + host);
}
return split;
}
@Override
public void inputSplitsAssigned(int taskId, List<InputSplit> inputSplits) {
for (InputSplit inputSplit : inputSplits) {
boolean found = false;
for (AssigningInputSplit split : unassigned) {
if (split.getSplit().equals(inputSplit)) {
unassigned.remove(split);
found = true;
break;
}
}
if (!found) {
throw new FlinkRuntimeException("InputSplit not found for " + inputSplit.getSplitNumber());
}
}
}
public int getNumberOfLocalAssignments() {
return localAssignments.get();
}
public int getNumberOfRemoteAssignments() {
return remoteAssignments.get();
}
private LocatableInputSplit takeUnAssignedSplit(BlockingQueue<AssigningInputSplit> queue) {
AssigningInputSplit split = queue.poll();
while (split != null) {
if (!split.isAssigned()) {
synchronized (split) {
if (!split.isAssigned()) {
split.setAssigned();
return split.getSplit();
}
}
}
split = queue.poll();
}
return null;
}
private static class AssigningInputSplit {
private final LocatableInputSplit split;
public volatile boolean isAssigned = false;
public AssigningInputSplit(LocatableInputSplit split) {
this.split = split;
}
public String[] getHostNames() {
return split.getHostnames();
}
public LocatableInputSplit getSplit() {
return split;
}
public boolean isAssigned() {
return isAssigned;
}
public void setAssigned() {
this.isAssigned = true;
}
}
}