| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.server; |
| |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.collect.Sets; |
| import org.roaringbitmap.longlong.Roaring64NavigableMap; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.common.ShuffleDataDistributionType; |
| import org.apache.uniffle.common.util.JavaUtils; |
| |
| /** |
| * ShuffleTaskInfo contains the information of submitting the shuffle, the information of the cache |
| * block, user and timestamp corresponding to the app |
| */ |
| public class ShuffleTaskInfo { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleTaskInfo.class); |
| |
| private final String appId; |
| private Long currentTimes; |
| /** shuffleId -> commit count */ |
| private Map<Integer, AtomicInteger> commitCounts; |
| |
| private Map<Integer, Object> commitLocks; |
| /** shuffleId -> blockIds */ |
| private Map<Integer, Roaring64NavigableMap> cachedBlockIds; |
| |
| private AtomicReference<String> user; |
| |
| private final AtomicLong totalDataSize = new AtomicLong(0); |
| private final AtomicLong inMemoryDataSize = new AtomicLong(0); |
| private final AtomicLong onLocalFileDataSize = new AtomicLong(0); |
| private final AtomicLong onHadoopDataSize = new AtomicLong(0); |
| |
| /** shuffleId -> partitionId -> partition shuffle data size */ |
| private Map<Integer, Map<Integer, Long>> partitionDataSizes; |
| /** shuffleId -> huge partitionIds set */ |
| private final Map<Integer, Set<Integer>> hugePartitionTags; |
| |
| private final AtomicBoolean existHugePartition; |
| |
| private final AtomicReference<ShuffleSpecification> specification; |
| |
| public ShuffleTaskInfo(String appId) { |
| this.appId = appId; |
| this.currentTimes = System.currentTimeMillis(); |
| this.commitCounts = JavaUtils.newConcurrentMap(); |
| this.commitLocks = JavaUtils.newConcurrentMap(); |
| this.cachedBlockIds = JavaUtils.newConcurrentMap(); |
| this.user = new AtomicReference<>(); |
| this.partitionDataSizes = JavaUtils.newConcurrentMap(); |
| this.hugePartitionTags = JavaUtils.newConcurrentMap(); |
| this.existHugePartition = new AtomicBoolean(false); |
| this.specification = new AtomicReference<>(); |
| } |
| |
| public Long getCurrentTimes() { |
| return currentTimes; |
| } |
| |
| public void setCurrentTimes(Long currentTimes) { |
| this.currentTimes = currentTimes; |
| } |
| |
| public Map<Integer, AtomicInteger> getCommitCounts() { |
| return commitCounts; |
| } |
| |
| public Map<Integer, Object> getCommitLocks() { |
| return commitLocks; |
| } |
| |
| public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() { |
| return cachedBlockIds; |
| } |
| |
| public String getUser() { |
| return user.get(); |
| } |
| |
| public void setUser(String user) { |
| this.user.set(user); |
| } |
| |
| public int getMaxConcurrencyPerPartitionToWrite() { |
| return specification.get().getMaxConcurrencyPerPartitionToWrite(); |
| } |
| |
| public ShuffleDataDistributionType getDataDistType() { |
| return specification.get().getDistributionType(); |
| } |
| |
| public void setSpecification(ShuffleSpecification specification) { |
| this.specification.set(specification); |
| } |
| |
| public long addPartitionDataSize(int shuffleId, int partitionId, long delta) { |
| totalDataSize.addAndGet(delta); |
| inMemoryDataSize.addAndGet(delta); |
| partitionDataSizes.computeIfAbsent(shuffleId, key -> JavaUtils.newConcurrentMap()); |
| Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId); |
| partitions.putIfAbsent(partitionId, 0L); |
| return partitions.computeIfPresent(partitionId, (k, v) -> v + delta); |
| } |
| |
| public long getTotalDataSize() { |
| return totalDataSize.get(); |
| } |
| |
| public long getInMemoryDataSize() { |
| return inMemoryDataSize.get(); |
| } |
| |
| public long addOnLocalFileDataSize(long delta) { |
| inMemoryDataSize.addAndGet(-delta); |
| return onLocalFileDataSize.addAndGet(delta); |
| } |
| |
| public long getOnLocalFileDataSize() { |
| return onLocalFileDataSize.get(); |
| } |
| |
| public long addOnHadoopDataSize(long delta) { |
| inMemoryDataSize.addAndGet(-delta); |
| return onHadoopDataSize.addAndGet(delta); |
| } |
| |
| public long getOnHadoopDataSize() { |
| return onHadoopDataSize.get(); |
| } |
| |
| public long getPartitionDataSize(int shuffleId, int partitionId) { |
| Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId); |
| if (partitions == null) { |
| return 0; |
| } |
| Long size = partitions.get(partitionId); |
| if (size == null) { |
| return 0L; |
| } |
| return size; |
| } |
| |
| public boolean hasHugePartition() { |
| return existHugePartition.get(); |
| } |
| |
| public int getHugePartitionSize() { |
| return hugePartitionTags.values().stream().map(x -> x.size()).reduce((x, y) -> x + y).orElse(0); |
| } |
| |
| public void markHugePartition(int shuffleId, int partitionId) { |
| if (!existHugePartition.get()) { |
| boolean markedWithCAS = existHugePartition.compareAndSet(false, true); |
| if (markedWithCAS) { |
| ShuffleServerMetrics.gaugeAppWithHugePartitionNum.inc(); |
| ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.inc(); |
| } |
| } |
| |
| Set<Integer> partitions = |
| hugePartitionTags.computeIfAbsent(shuffleId, key -> Sets.newConcurrentHashSet()); |
| if (partitions.add(partitionId)) { |
| ShuffleServerMetrics.counterTotalHugePartitionNum.inc(); |
| ShuffleServerMetrics.gaugeHugePartitionNum.inc(); |
| LOGGER.warn( |
| "Huge partition occurs, appId: {}, shuffleId: {}, partitionId: {}", |
| appId, |
| shuffleId, |
| partitionId); |
| } |
| } |
| |
| public Set<Integer> getShuffleIds() { |
| return partitionDataSizes.keySet(); |
| } |
| |
| @Override |
| public String toString() { |
| return "ShuffleTaskInfo{" |
| + "appId='" |
| + appId |
| + '\'' |
| + ", totalDataSize=" |
| + totalDataSize |
| + ", inMemoryDataSize=" |
| + inMemoryDataSize |
| + ", onLocalFileDataSize=" |
| + onLocalFileDataSize |
| + ", onHadoopDataSize=" |
| + onHadoopDataSize |
| + ", partitionDataSizes=" |
| + partitionDataSizes |
| + '}'; |
| } |
| } |