blob: 4a05239aaa150702f3fed2e1b108c8ac92889c1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.handle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.proto.RssProtos;
/** This class holds the dynamic partition assignment for partition reassign mechanism. */
public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase {
private static final Logger LOGGER = LoggerFactory.getLogger(MutableShuffleHandleInfo.class);
/**
* partitionId -> replica -> assigned servers.
*
* <p>The first index of list<ShuffleServerInfo> is the initial static assignment server.
*
* <p>The remaining indexes are the replacement servers if exists.
*/
private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers;
private Map<String, Set<ShuffleServerInfo>> excludedServerToReplacements;
public MutableShuffleHandleInfo(
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = toPartitionReplicaMapping(partitionToServers);
}
@VisibleForTesting
protected MutableShuffleHandleInfo(
int shuffleId,
RemoteStorageInfo storageInfo,
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = partitionReplicaAssignedServers;
}
public MutableShuffleHandleInfo(int shuffleId, RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
}
private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(
Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers =
new HashMap<>();
for (Map.Entry<Integer, List<ShuffleServerInfo>> partitionEntry :
partitionToServers.entrySet()) {
int partitionId = partitionEntry.getKey();
Map<Integer, List<ShuffleServerInfo>> replicaMapping =
partitionReplicaAssignedServers.computeIfAbsent(partitionId, x -> new HashMap<>());
List<ShuffleServerInfo> replicaServers = partitionEntry.getValue();
for (int i = 0; i < replicaServers.size(); i++) {
int replicaIdx = i;
replicaMapping
.computeIfAbsent(replicaIdx, x -> new ArrayList<>())
.add(replicaServers.get(i));
}
}
return partitionReplicaAssignedServers;
}
public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
return excludedServerToReplacements.get(faultyServerId);
}
public Set<ShuffleServerInfo> updateAssignment(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
return Collections.emptySet();
}
excludedServerToReplacements.put(receivingFailureServerId, replacements);
Set<ShuffleServerInfo> updatedServers = new HashSet<>();
Map<Integer, List<ShuffleServerInfo>> replicaServers =
partitionReplicaAssignedServers.get(partitionId);
for (Map.Entry<Integer, List<ShuffleServerInfo>> serverEntry : replicaServers.entrySet()) {
List<ShuffleServerInfo> servers = serverEntry.getValue();
if (servers.stream()
.map(x -> x.getId())
.collect(Collectors.toSet())
.contains(receivingFailureServerId)) {
Set<ShuffleServerInfo> tempSet = new HashSet<>();
tempSet.addAll(replacements);
tempSet.removeAll(servers);
if (CollectionUtils.isNotEmpty(tempSet)) {
updatedServers.addAll(tempSet);
servers.addAll(tempSet);
}
}
}
return updatedServers;
}
@Override
public Set<ShuffleServerInfo> getServers() {
return partitionReplicaAssignedServers.values().stream()
.flatMap(x -> x.values().stream().flatMap(k -> k.stream()))
.collect(Collectors.toSet());
}
@Override
public Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWriter() {
Map<Integer, List<ShuffleServerInfo>> assignment = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry :
partitionReplicaAssignedServers.entrySet()) {
int partitionId = entry.getKey();
Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry :
replicaServers.entrySet()) {
ShuffleServerInfo candidate;
int candidateSize = replicaServerEntry.getValue().size();
candidate = replicaServerEntry.getValue().get(candidateSize - 1);
assignment.computeIfAbsent(partitionId, x -> new ArrayList<>()).add(candidate);
}
}
return assignment;
}
@Override
public Map<Integer, List<ShuffleServerInfo>> getAllPartitionServersForReader() {
Map<Integer, List<ShuffleServerInfo>> assignment = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry :
partitionReplicaAssignedServers.entrySet()) {
int partitionId = entry.getKey();
Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry :
replicaServers.entrySet()) {
assignment
.computeIfAbsent(partitionId, x -> new ArrayList<>())
.addAll(replicaServerEntry.getValue());
}
}
return assignment;
}
@Override
public PartitionDataReplicaRequirementTracking createPartitionReplicaTracking() {
PartitionDataReplicaRequirementTracking replicaRequirement =
new PartitionDataReplicaRequirementTracking(shuffleId, partitionReplicaAssignedServers);
return replicaRequirement;
}
public Set<String> listExcludedServers() {
return excludedServerToReplacements.keySet();
}
public void checkPartitionReassignServerNum(
Set<Integer> partitionIds, int legalReassignServerNum) {
for (int partitionId : partitionIds) {
Map<Integer, List<ShuffleServerInfo>> replicas =
partitionReplicaAssignedServers.get(partitionId);
for (List<ShuffleServerInfo> servers : replicas.values()) {
if (servers.size() - 1 > legalReassignServerNum) {
throw new RssException(
"Illegal reassignment servers for partitionId: "
+ partitionId
+ " that exceeding the max legal reassign server num: "
+ legalReassignServerNum);
}
}
}
}
public static RssProtos.MutableShuffleHandleInfo toProto(MutableShuffleHandleInfo handleInfo) {
synchronized (handleInfo) {
Map<Integer, RssProtos.PartitionReplicaServers> partitionToServers = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry :
handleInfo.partitionReplicaAssignedServers.entrySet()) {
int partitionId = entry.getKey();
Map<Integer, RssProtos.ReplicaServersItem> replicaServersProto = new HashMap<>();
Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry :
replicaServers.entrySet()) {
RssProtos.ReplicaServersItem item =
RssProtos.ReplicaServersItem.newBuilder()
.addAllServerId(ShuffleServerInfo.toProto(replicaServerEntry.getValue()))
.build();
replicaServersProto.put(replicaServerEntry.getKey(), item);
}
RssProtos.PartitionReplicaServers partitionReplicaServerProto =
RssProtos.PartitionReplicaServers.newBuilder()
.putAllReplicaServers(replicaServersProto)
.build();
partitionToServers.put(partitionId, partitionReplicaServerProto);
}
RssProtos.MutableShuffleHandleInfo handleProto =
RssProtos.MutableShuffleHandleInfo.newBuilder()
.setShuffleId(handleInfo.shuffleId)
.setRemoteStorageInfo(
RssProtos.RemoteStorageInfo.newBuilder()
.setPath(handleInfo.remoteStorage.getPath())
.putAllConfItems(handleInfo.remoteStorage.getConfItems())
.build())
.putAllPartitionToServers(partitionToServers)
.build();
return handleProto;
}
}
public static MutableShuffleHandleInfo fromProto(RssProtos.MutableShuffleHandleInfo handleProto) {
if (handleProto == null) {
return null;
}
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionToServers = new HashMap<>();
for (Map.Entry<Integer, RssProtos.PartitionReplicaServers> entry :
handleProto.getPartitionToServersMap().entrySet()) {
Map<Integer, List<ShuffleServerInfo>> replicaServers =
partitionToServers.computeIfAbsent(entry.getKey(), x -> new HashMap<>());
for (Map.Entry<Integer, RssProtos.ReplicaServersItem> serverEntry :
entry.getValue().getReplicaServersMap().entrySet()) {
int replicaIdx = serverEntry.getKey();
List<ShuffleServerInfo> shuffleServerInfos =
ShuffleServerInfo.fromProto(serverEntry.getValue().getServerIdList());
replicaServers.put(replicaIdx, shuffleServerInfos);
}
}
RemoteStorageInfo remoteStorageInfo =
new RemoteStorageInfo(
handleProto.getRemoteStorageInfo().getPath(),
handleProto.getRemoteStorageInfo().getConfItemsMap());
MutableShuffleHandleInfo handle =
new MutableShuffleHandleInfo(handleProto.getShuffleId(), remoteStorageInfo);
handle.partitionReplicaAssignedServers = partitionToServers;
return handle;
}
}