| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.server; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Queues; |
| import com.google.common.collect.Range; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.roaringbitmap.longlong.Roaring64NavigableMap; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.common.PartitionRange; |
| import org.apache.uniffle.common.RemoteStorageInfo; |
| import org.apache.uniffle.common.ShuffleDataDistributionType; |
| import org.apache.uniffle.common.ShuffleDataResult; |
| import org.apache.uniffle.common.ShuffleIndexResult; |
| import org.apache.uniffle.common.ShufflePartitionedBlock; |
| import org.apache.uniffle.common.ShufflePartitionedData; |
| import org.apache.uniffle.common.config.RssBaseConf; |
| import org.apache.uniffle.common.exception.FileNotFoundException; |
| import org.apache.uniffle.common.exception.InvalidRequestException; |
| import org.apache.uniffle.common.exception.NoBufferException; |
| import org.apache.uniffle.common.exception.NoBufferForHugePartitionException; |
| import org.apache.uniffle.common.exception.NoRegisterException; |
| import org.apache.uniffle.common.exception.RssException; |
| import org.apache.uniffle.common.rpc.StatusCode; |
| import org.apache.uniffle.common.util.BlockIdLayout; |
| import org.apache.uniffle.common.util.Constants; |
| import org.apache.uniffle.common.util.JavaUtils; |
| import org.apache.uniffle.common.util.RssUtils; |
| import org.apache.uniffle.common.util.ThreadUtils; |
| import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; |
| import org.apache.uniffle.server.buffer.ShuffleBuffer; |
| import org.apache.uniffle.server.buffer.ShuffleBufferManager; |
| import org.apache.uniffle.server.event.AppPurgeEvent; |
| import org.apache.uniffle.server.event.AppUnregisterPurgeEvent; |
| import org.apache.uniffle.server.event.PurgeEvent; |
| import org.apache.uniffle.server.event.ShufflePurgeEvent; |
| import org.apache.uniffle.server.storage.StorageManager; |
| import org.apache.uniffle.storage.common.Storage; |
| import org.apache.uniffle.storage.common.StorageReadMetrics; |
| import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; |
| import org.apache.uniffle.storage.util.ShuffleStorageUtils; |
| |
| import static org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION; |
| import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION; |
| |
| public class ShuffleTaskManager { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ShuffleTaskManager.class); |
| private final ShuffleFlushManager shuffleFlushManager; |
| private final ScheduledExecutorService scheduledExecutorService; |
| private final ScheduledExecutorService expiredAppCleanupExecutorService; |
| private final ScheduledExecutorService leakShuffleDataCheckExecutorService; |
| private ScheduledExecutorService triggerFlushExecutorService; |
| private final TopNShuffleDataSizeOfAppCalcTask topNShuffleDataSizeOfAppCalcTask; |
| private final StorageManager storageManager; |
| private AtomicLong requireBufferId = new AtomicLong(0); |
| private ShuffleServerConf conf; |
| private long appExpiredWithoutHB; |
| private long preAllocationExpired; |
| private long commitCheckIntervalMax; |
| private long leakShuffleDataCheckInterval; |
| private long triggerFlushInterval; |
| // appId -> shuffleId -> blockIds to avoid too many appId |
| // store taskAttemptId info to filter speculation task |
| // Roaring64NavigableMap instance will cost much memory, |
| // merge different blockId of partition to one bitmap can reduce memory cost, |
| // but when get blockId, performance will degrade a little which can be optimized by client |
| // configuration |
| private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds; |
| private final ShuffleBufferManager shuffleBufferManager; |
| private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap(); |
| private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap(); |
| private Thread clearResourceThread; |
| private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue(); |
| private final Cache<String, Lock> appLocks; |
| |
| public ShuffleTaskManager( |
| ShuffleServerConf conf, |
| ShuffleFlushManager shuffleFlushManager, |
| ShuffleBufferManager shuffleBufferManager, |
| StorageManager storageManager) { |
| this.conf = conf; |
| this.shuffleFlushManager = shuffleFlushManager; |
| this.partitionsToBlockIds = JavaUtils.newConcurrentMap(); |
| this.shuffleBufferManager = shuffleBufferManager; |
| this.storageManager = storageManager; |
| this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT); |
| this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX); |
| this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED); |
| this.leakShuffleDataCheckInterval = |
| conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL); |
| this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL); |
| // the thread for checking application status |
| this.scheduledExecutorService = |
| ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource"); |
| scheduledExecutorService.scheduleAtFixedRate( |
| this::preAllocatedBufferCheck, |
| preAllocationExpired / 2, |
| preAllocationExpired / 2, |
| TimeUnit.MILLISECONDS); |
| this.expiredAppCleanupExecutorService = |
| ThreadUtils.getDaemonSingleThreadScheduledExecutor("expiredAppCleaner"); |
| expiredAppCleanupExecutorService.scheduleAtFixedRate( |
| this::checkResourceStatus, |
| appExpiredWithoutHB / 2, |
| appExpiredWithoutHB / 2, |
| TimeUnit.MILLISECONDS); |
| this.leakShuffleDataCheckExecutorService = |
| ThreadUtils.getDaemonSingleThreadScheduledExecutor("leakShuffleDataChecker"); |
| leakShuffleDataCheckExecutorService.scheduleAtFixedRate( |
| this::checkLeakShuffleData, |
| leakShuffleDataCheckInterval, |
| leakShuffleDataCheckInterval, |
| TimeUnit.MILLISECONDS); |
| if (triggerFlushInterval > 0) { |
| triggerFlushExecutorService = |
| ThreadUtils.getDaemonSingleThreadScheduledExecutor("triggerShuffleBufferManagerFlush"); |
| triggerFlushExecutorService.scheduleWithFixedDelay( |
| this::triggerFlush, |
| triggerFlushInterval / 2, |
| triggerFlushInterval, |
| TimeUnit.MILLISECONDS); |
| } |
| if (shuffleBufferManager != null) { |
| shuffleBufferManager.setShuffleTaskManager(this); |
| } |
| |
| appLocks = |
| CacheBuilder.newBuilder() |
| .expireAfterAccess(3600, TimeUnit.SECONDS) |
| .maximumSize(Integer.MAX_VALUE) |
| .build(); |
| |
| // the thread for clear expired resources |
| Runnable clearResourceRunnable = |
| () -> { |
| while (true) { |
| PurgeEvent event = null; |
| try { |
| event = expiredAppIdQueue.take(); |
| long startTime = System.currentTimeMillis(); |
| if (event instanceof AppPurgeEvent) { |
| removeResources(event.getAppId(), true); |
| double usedTime = |
| (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; |
| ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); |
| } |
| if (event instanceof AppUnregisterPurgeEvent) { |
| removeResources(event.getAppId(), false); |
| double usedTime = |
| (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; |
| ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); |
| } |
| if (event instanceof ShufflePurgeEvent) { |
| removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds()); |
| double usedTime = |
| (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; |
| ShuffleServerMetrics.summaryTotalRemoveResourceByShuffleIdsTime.observe(usedTime); |
| } |
| } catch (Exception e) { |
| StringBuilder diagnosticMessageBuilder = |
| new StringBuilder( |
| "Exception happened when clearing resource for expired application"); |
| if (event != null) { |
| diagnosticMessageBuilder.append(" for appId: "); |
| diagnosticMessageBuilder.append(event.getAppId()); |
| |
| if (CollectionUtils.isNotEmpty(event.getShuffleIds())) { |
| diagnosticMessageBuilder.append(", shuffleIds: "); |
| diagnosticMessageBuilder.append(event.getShuffleIds()); |
| } |
| } |
| LOG.error("{}", diagnosticMessageBuilder, e); |
| } |
| } |
| }; |
| clearResourceThread = new Thread(clearResourceRunnable); |
| clearResourceThread.setName("clearResourceThread"); |
| clearResourceThread.setDaemon(true); |
| |
| topNShuffleDataSizeOfAppCalcTask = new TopNShuffleDataSizeOfAppCalcTask(this, conf); |
| topNShuffleDataSizeOfAppCalcTask.start(); |
| } |
| |
| private Lock getAppLock(String appId) { |
| try { |
| return appLocks.get(appId, ReentrantLock::new); |
| } catch (ExecutionException e) { |
| LOG.error("Failed to get App lock.", e); |
| throw new RssException(e); |
| } |
| } |
| |
| /** Only for test */ |
| @VisibleForTesting |
| public StatusCode registerShuffle( |
| String appId, |
| int shuffleId, |
| List<PartitionRange> partitionRanges, |
| RemoteStorageInfo remoteStorageInfo, |
| String user) { |
| return registerShuffle( |
| appId, |
| shuffleId, |
| partitionRanges, |
| remoteStorageInfo, |
| user, |
| ShuffleDataDistributionType.NORMAL, |
| -1); |
| } |
| |
| public StatusCode registerShuffle( |
| String appId, |
| int shuffleId, |
| List<PartitionRange> partitionRanges, |
| RemoteStorageInfo remoteStorageInfo, |
| String user, |
| ShuffleDataDistributionType dataDistType, |
| int maxConcurrencyPerPartitionToWrite) { |
| Lock lock = getAppLock(appId); |
| try { |
| lock.lock(); |
| refreshAppId(appId); |
| |
| ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); |
| taskInfo.setUser(user); |
| taskInfo.setSpecification( |
| ShuffleSpecification.builder() |
| .maxConcurrencyPerPartitionToWrite( |
| getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf)) |
| .dataDistributionType(dataDistType) |
| .build()); |
| |
| partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); |
| for (PartitionRange partitionRange : partitionRanges) { |
| shuffleBufferManager.registerBuffer( |
| appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd()); |
| } |
| if (!remoteStorageInfo.isEmpty()) { |
| storageManager.registerRemoteStorage(appId, remoteStorageInfo); |
| } |
| return StatusCode.SUCCESS; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| protected static int getMaxConcurrencyWriting( |
| int maxConcurrencyPerPartitionToWrite, ShuffleServerConf conf) { |
| if (maxConcurrencyPerPartitionToWrite > 0) { |
| return Math.min( |
| maxConcurrencyPerPartitionToWrite, |
| conf.get(CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION)); |
| } |
| return conf.get(SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION); |
| } |
| |
| public StatusCode cacheShuffleData( |
| String appId, int shuffleId, boolean isPreAllocated, ShufflePartitionedData spd) { |
| refreshAppId(appId); |
| return shuffleBufferManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd); |
| } |
| |
| public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long requireBufferId) { |
| return requireBufferIds.remove(requireBufferId); |
| } |
| |
| public void releasePreAllocatedSize(long requireSize) { |
| shuffleBufferManager.releasePreAllocatedSize(requireSize); |
| } |
| |
| @VisibleForTesting |
| void removeAndReleasePreAllocatedBuffer(long requireBufferId) { |
| PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(requireBufferId); |
| if (info != null) { |
| releasePreAllocatedSize(info.getRequireSize()); |
| } |
| } |
| |
| public StatusCode commitShuffle(String appId, int shuffleId) throws Exception { |
| long start = System.currentTimeMillis(); |
| refreshAppId(appId); |
| Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId); |
| Roaring64NavigableMap cloneBlockIds; |
| ShuffleTaskInfo shuffleTaskInfo = |
| shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); |
| Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, x -> new Object()); |
| synchronized (lock) { |
| long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT); |
| if (System.currentTimeMillis() - start > commitTimeout) { |
| throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms"); |
| } |
| synchronized (cachedBlockIds) { |
| cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds); |
| } |
| long expectedCommitted = cloneBlockIds.getLongCardinality(); |
| shuffleBufferManager.commitShuffleTask(appId, shuffleId); |
| Roaring64NavigableMap committedBlockIds; |
| Roaring64NavigableMap cloneCommittedBlockIds; |
| long checkInterval = 1000L; |
| while (true) { |
| committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId); |
| synchronized (committedBlockIds) { |
| cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds); |
| } |
| cloneBlockIds.andNot(cloneCommittedBlockIds); |
| if (cloneBlockIds.isEmpty()) { |
| break; |
| } |
| Thread.sleep(checkInterval); |
| if (System.currentTimeMillis() - start > commitTimeout) { |
| throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms"); |
| } |
| LOG.info( |
| "Checking commit result for appId[" |
| + appId |
| + "], shuffleId[" |
| + shuffleId |
| + "], expect committed[" |
| + expectedCommitted |
| + "], remain[" |
| + cloneBlockIds.getLongCardinality() |
| + "]"); |
| checkInterval = Math.min(checkInterval * 2, commitCheckIntervalMax); |
| } |
| LOG.info( |
| "Finish commit for appId[" |
| + appId |
| + "], shuffleId[" |
| + shuffleId |
| + "] with expectedCommitted[" |
| + expectedCommitted |
| + "], cost " |
| + (System.currentTimeMillis() - start) |
| + " ms to check"); |
| } |
| return StatusCode.SUCCESS; |
| } |
| |
| public void addFinishedBlockIds( |
| String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) { |
| refreshAppId(appId); |
| Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); |
| if (shuffleIdToPartitions == null) { |
| throw new RssException("appId[" + appId + "] is expired!"); |
| } |
| shuffleIdToPartitions.computeIfAbsent( |
| shuffleId, |
| key -> { |
| Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum]; |
| for (int i = 0; i < bitmapNum; i++) { |
| blockIds[i] = Roaring64NavigableMap.bitmapOf(); |
| } |
| return blockIds; |
| }); |
| Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); |
| if (blockIds.length != bitmapNum) { |
| throw new InvalidRequestException( |
| "Request expects " |
| + bitmapNum |
| + " bitmaps, but there are " |
| + blockIds.length |
| + " bitmaps!"); |
| } |
| |
| for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) { |
| Integer partitionId = entry.getKey(); |
| Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum]; |
| synchronized (bitmap) { |
| for (long blockId : entry.getValue()) { |
| bitmap.addLong(blockId); |
| } |
| } |
| } |
| } |
| |
| public int updateAndGetCommitCount(String appId, int shuffleId) { |
| ShuffleTaskInfo shuffleTaskInfo = |
| shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); |
| AtomicInteger commitNum = |
| shuffleTaskInfo.getCommitCounts().computeIfAbsent(shuffleId, x -> new AtomicInteger(0)); |
| return commitNum.incrementAndGet(); |
| } |
| |
| // Only for tests |
| public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) { |
| updateCachedBlockIds(appId, shuffleId, 0, spbs); |
| } |
| |
| public void updateCachedBlockIds( |
| String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) { |
| if (spbs == null || spbs.length == 0) { |
| return; |
| } |
| ShuffleTaskInfo shuffleTaskInfo = |
| shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); |
| Roaring64NavigableMap bitmap = |
| shuffleTaskInfo |
| .getCachedBlockIds() |
| .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf()); |
| |
| long size = 0L; |
| synchronized (bitmap) { |
| for (ShufflePartitionedBlock spb : spbs) { |
| bitmap.addLong(spb.getBlockId()); |
| size += spb.getSize(); |
| } |
| } |
| long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size); |
| if (shuffleBufferManager.isHugePartition(partitionSize)) { |
| shuffleTaskInfo.markHugePartition(shuffleId, partitionId); |
| } |
| } |
| |
| public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) { |
| Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = |
| shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds(); |
| Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId); |
| if (blockIds == null) { |
| LOG.warn( |
| "Unexpected value when getCachedBlockIds for appId[" |
| + appId |
| + "], shuffleId[" |
| + shuffleId |
| + "]"); |
| return Roaring64NavigableMap.bitmapOf(); |
| } |
| return blockIds; |
| } |
| |
| public long getPartitionDataSize(String appId, int shuffleId, int partitionId) { |
| ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId); |
| if (shuffleTaskInfo == null) { |
| return 0L; |
| } |
| return shuffleTaskInfo.getPartitionDataSize(shuffleId, partitionId); |
| } |
| |
| public long requireBuffer( |
| String appId, int shuffleId, List<Integer> partitionIds, int requireSize) { |
| ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId); |
| if (null == shuffleTaskInfo) { |
| LOG.error("No such app is registered. appId: {}, shuffleId: {}", appId, shuffleId); |
| throw new NoRegisterException("No such app is registered. appId: " + appId); |
| } |
| for (int partitionId : partitionIds) { |
| long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, partitionId); |
| if (shuffleBufferManager.limitHugePartition( |
| appId, shuffleId, partitionId, partitionUsedDataSize)) { |
| String errorMessage = |
| String.format( |
| "Huge partition is limited to writing. appId: %s, shuffleId: %s, partitionIds: %s, partitionUsedDataSize: %s", |
| appId, shuffleId, partitionIds, partitionUsedDataSize); |
| LOG.error(errorMessage); |
| throw new NoBufferForHugePartitionException(errorMessage); |
| } |
| } |
| return requireBuffer(appId, requireSize); |
| } |
| |
| public long requireBuffer(String appId, int requireSize) { |
| if (shuffleBufferManager.requireMemory(requireSize, true)) { |
| long requireId = requireBufferId.incrementAndGet(); |
| requireBufferIds.put( |
| requireId, |
| new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize)); |
| return requireId; |
| } else { |
| LOG.error("Failed to require buffer, require size: {}", requireSize); |
| throw new NoBufferException("No Buffer For Regular Partition, requireSize: " + requireSize); |
| } |
| } |
| |
| public long requireBuffer(int requireSize) { |
| // appId of EMPTY means the client uses the old version that should be upgraded. |
| return requireBuffer("EMPTY", requireSize); |
| } |
| |
| public byte[] getFinishedBlockIds( |
| String appId, Integer shuffleId, Set<Integer> partitions, BlockIdLayout blockIdLayout) |
| throws IOException { |
| refreshAppId(appId); |
| for (int partitionId : partitions) { |
| Map.Entry<Range<Integer>, ShuffleBuffer> entry = |
| shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId, partitionId); |
| if (entry == null) { |
| LOG.error( |
| "The empty shuffle buffer, this should not happen. appId: {}, shuffleId: {}, partition: {}, layout: {}", |
| appId, |
| shuffleId, |
| partitionId, |
| blockIdLayout); |
| continue; |
| } |
| Storage storage = |
| storageManager.selectStorage( |
| new ShuffleDataReadEvent( |
| appId, shuffleId, partitionId, entry.getKey().lowerEndpoint())); |
| // update shuffle's timestamp that was recently read. |
| if (storage != null) { |
| storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); |
| } |
| } |
| Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); |
| if (shuffleIdToPartitions == null) { |
| return null; |
| } |
| |
| Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); |
| if (blockIds == null) { |
| return new byte[] {}; |
| } |
| Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap(); |
| for (int partitionId : partitions) { |
| int bitmapIndex = partitionId % blockIds.length; |
| if (bitmapIndexToPartitions.containsKey(bitmapIndex)) { |
| bitmapIndexToPartitions.get(bitmapIndex).add(partitionId); |
| } else { |
| HashSet<Integer> newHashSet = Sets.newHashSet(partitionId); |
| bitmapIndexToPartitions.put(bitmapIndex, newHashSet); |
| } |
| } |
| |
| Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); |
| for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) { |
| Set<Integer> requestPartitions = entry.getValue(); |
| Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; |
| getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); |
| } |
| return RssUtils.serializeBitMap(res); |
| } |
| |
| // filter the specific partition blockId in the bitmap to the resultBitmap |
| protected Roaring64NavigableMap getBlockIdsByPartitionId( |
| Set<Integer> requestPartitions, |
| Roaring64NavigableMap bitmap, |
| Roaring64NavigableMap resultBitmap, |
| BlockIdLayout blockIdLayout) { |
| bitmap.forEach( |
| blockId -> { |
| int partitionId = blockIdLayout.getPartitionId(blockId); |
| if (requestPartitions.contains(partitionId)) { |
| resultBitmap.addLong(blockId); |
| } |
| }); |
| return resultBitmap; |
| } |
| |
| public ShuffleDataResult getInMemoryShuffleData( |
| String appId, |
| Integer shuffleId, |
| Integer partitionId, |
| long blockId, |
| int readBufferSize, |
| Roaring64NavigableMap expectedTaskIds) { |
| refreshAppId(appId); |
| return shuffleBufferManager.getShuffleData( |
| appId, shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds); |
| } |
| |
| public ShuffleDataResult getShuffleData( |
| String appId, |
| Integer shuffleId, |
| Integer partitionId, |
| int partitionNumPerRange, |
| int partitionNum, |
| String storageType, |
| long offset, |
| int length) { |
| refreshAppId(appId); |
| |
| CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); |
| request.setAppId(appId); |
| request.setShuffleId(shuffleId); |
| request.setPartitionId(partitionId); |
| request.setPartitionNumPerRange(partitionNumPerRange); |
| request.setPartitionNum(partitionNum); |
| request.setStorageType(storageType); |
| request.setRssBaseConf(conf); |
| int[] range = |
| ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); |
| Storage storage = |
| storageManager.selectStorage( |
| new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); |
| if (storage == null) { |
| throw new FileNotFoundException("No such data stored in current storage manager."); |
| } |
| |
| return storage.getOrCreateReadHandler(request).getShuffleData(offset, length); |
| } |
| |
| public ShuffleIndexResult getShuffleIndex( |
| String appId, |
| Integer shuffleId, |
| Integer partitionId, |
| int partitionNumPerRange, |
| int partitionNum) { |
| refreshAppId(appId); |
| String storageType = conf.get(RssBaseConf.RSS_STORAGE_TYPE).name(); |
| CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); |
| request.setAppId(appId); |
| request.setShuffleId(shuffleId); |
| request.setPartitionId(partitionId); |
| request.setPartitionNumPerRange(partitionNumPerRange); |
| request.setPartitionNum(partitionNum); |
| request.setStorageType(storageType); |
| request.setRssBaseConf(conf); |
| int[] range = |
| ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); |
| Storage storage = |
| storageManager.selectStorage( |
| new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); |
| if (storage == null) { |
| throw new FileNotFoundException("No such data in current storage manager."); |
| } |
| return storage.getOrCreateReadHandler(request).getShuffleIndex(); |
| } |
| |
| public void checkResourceStatus() { |
| try { |
| Set<String> appNames = Sets.newHashSet(shuffleTaskInfos.keySet()); |
| // remove applications which is timeout according to rss.server.app.expired.withoutHeartbeat |
| for (String appId : appNames) { |
| if (isAppExpired(appId)) { |
| LOG.info( |
| "Detect expired appId[" |
| + appId |
| + "] according " |
| + "to rss.server.app.expired.withoutHeartbeat"); |
| expiredAppIdQueue.add(new AppPurgeEvent(appId, getUserByAppId(appId))); |
| } |
| } |
| ShuffleServerMetrics.gaugeAppNum.set(shuffleTaskInfos.size()); |
| } catch (Exception e) { |
| LOG.warn("Error happened in checkResourceStatus", e); |
| } |
| } |
| |
| private boolean isAppExpired(String appId) { |
| if (shuffleTaskInfos.get(appId) == null) { |
| return true; |
| } |
| return System.currentTimeMillis() - shuffleTaskInfos.get(appId).getCurrentTimes() |
| > appExpiredWithoutHB; |
| } |
| |
| /** |
| * Clear up the partial resources of shuffleIds of App. |
| * |
| * @param appId |
| * @param shuffleIds |
| */ |
| public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) { |
| if (CollectionUtils.isEmpty(shuffleIds)) { |
| return; |
| } |
| |
| LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId, shuffleIds); |
| final long start = System.currentTimeMillis(); |
| final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); |
| if (taskInfo != null) { |
| for (Integer shuffleId : shuffleIds) { |
| taskInfo.getCachedBlockIds().remove(shuffleId); |
| taskInfo.getCommitCounts().remove(shuffleId); |
| taskInfo.getCommitLocks().remove(shuffleId); |
| } |
| } |
| Optional.ofNullable(partitionsToBlockIds.get(appId)) |
| .ifPresent( |
| x -> { |
| for (Integer shuffleId : shuffleIds) { |
| x.remove(shuffleId); |
| } |
| }); |
| shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds); |
| shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds); |
| storageManager.removeResources(new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); |
| LOG.info( |
| "Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]", |
| appId, |
| shuffleIds, |
| System.currentTimeMillis() - start); |
| } |
| |
| public void checkLeakShuffleData() { |
| LOG.info("Start check leak shuffle data"); |
| try { |
| Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet()); |
| storageManager.checkAndClearLeakedShuffleData(appIds); |
| LOG.info("Finish check leak shuffle data"); |
| } catch (Exception e) { |
| LOG.warn("Error happened in checkLeakShuffleData", e); |
| } |
| } |
| |
| @VisibleForTesting |
| public void removeResources(String appId, boolean checkAppExpired) { |
| Lock lock = getAppLock(appId); |
| try { |
| lock.lock(); |
| LOG.info("Start remove resource for appId[" + appId + "]"); |
| if (checkAppExpired && !isAppExpired(appId)) { |
| LOG.info( |
| "It seems that this appId[{}] has registered a new shuffle, just ignore this AppPurgeEvent event.", |
| appId); |
| return; |
| } |
| final long start = System.currentTimeMillis(); |
| String user = getUserByAppId(appId); |
| ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId); |
| if (shuffleTaskInfo == null) { |
| LOG.info("Resource for appId[" + appId + "] had been removed before."); |
| return; |
| } |
| |
| final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = |
| shuffleTaskInfo.getCachedBlockIds(); |
| partitionsToBlockIds.remove(appId); |
| shuffleBufferManager.removeBuffer(appId); |
| shuffleFlushManager.removeResources(appId); |
| if (!shuffleToCachedBlockIds.isEmpty()) { |
| storageManager.removeResources( |
| new AppPurgeEvent(appId, user, new ArrayList<>(shuffleToCachedBlockIds.keySet()))); |
| } |
| if (shuffleTaskInfo.hasHugePartition()) { |
| ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); |
| ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize()); |
| } |
| LOG.info( |
| "Finish remove resource for appId[" |
| + appId |
| + "] cost " |
| + (System.currentTimeMillis() - start) |
| + " ms"); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void refreshAppId(String appId) { |
| shuffleTaskInfos |
| .computeIfAbsent( |
| appId, |
| x -> { |
| ShuffleServerMetrics.counterTotalAppNum.inc(); |
| return new ShuffleTaskInfo(appId); |
| }) |
| .setCurrentTimes(System.currentTimeMillis()); |
| } |
| |
| // check pre allocated buffer, release the memory if it expired |
| private void preAllocatedBufferCheck() { |
| try { |
| long current = System.currentTimeMillis(); |
| List<Long> removeIds = Lists.newArrayList(); |
| for (PreAllocatedBufferInfo info : requireBufferIds.values()) { |
| if (current - info.getTimestamp() > preAllocationExpired) { |
| removeIds.add(info.getRequireId()); |
| } |
| } |
| for (Long requireId : removeIds) { |
| PreAllocatedBufferInfo info = requireBufferIds.remove(requireId); |
| if (info != null) { |
| // move release memory code down to here as the requiredBuffer could be consumed during |
| // removing processing. |
| shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true); |
| LOG.warn( |
| "Remove expired preAllocatedBuffer[id={}] that required by app: {}", |
| requireId, |
| info.getAppId()); |
| ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc(); |
| } else { |
| LOG.info("PreAllocatedBuffer[id={}] has already be used", requireId); |
| } |
| } |
| } catch (Exception e) { |
| LOG.warn("Error happened in preAllocatedBufferCheck", e); |
| } |
| } |
| |
| public int getRequireBufferSize(long requireId) { |
| PreAllocatedBufferInfo pabi = requireBufferIds.get(requireId); |
| if (pabi == null) { |
| return 0; |
| } |
| return pabi.getRequireSize(); |
| } |
| |
| public String getUserByAppId(String appId) { |
| return shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)).getUser(); |
| } |
| |
| @VisibleForTesting |
| public Set<String> getAppIds() { |
| return shuffleTaskInfos.keySet(); |
| } |
| |
| @VisibleForTesting |
| Map<Long, PreAllocatedBufferInfo> getRequireBufferIds() { |
| return requireBufferIds; |
| } |
| |
| @VisibleForTesting |
| public Map<String, Map<Integer, Roaring64NavigableMap[]>> getPartitionsToBlockIds() { |
| return partitionsToBlockIds; |
| } |
| |
| public void removeShuffleDataAsync(String appId, int shuffleId) { |
| expiredAppIdQueue.add( |
| new ShufflePurgeEvent(appId, getUserByAppId(appId), Arrays.asList(shuffleId))); |
| } |
| |
| public void removeShuffleDataAsync(String appId) { |
| expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId, getUserByAppId(appId))); |
| } |
| |
| @VisibleForTesting |
| void removeShuffleDataSync(String appId, int shuffleId) { |
| removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); |
| } |
| |
| public ShuffleDataDistributionType getDataDistributionType(String appId) { |
| return shuffleTaskInfos.get(appId).getDataDistType(); |
| } |
| |
| @VisibleForTesting |
| public ShuffleTaskInfo getShuffleTaskInfo(String appId) { |
| return shuffleTaskInfos.get(appId); |
| } |
| |
| private void triggerFlush() { |
| synchronized (this.shuffleBufferManager) { |
| this.shuffleBufferManager.flushIfNecessary(); |
| } |
| } |
| |
| public Map<String, ShuffleTaskInfo> getShuffleTaskInfos() { |
| return shuffleTaskInfos; |
| } |
| |
| public void stop() { |
| topNShuffleDataSizeOfAppCalcTask.stop(); |
| } |
| |
| public void start() { |
| clearResourceThread.start(); |
| } |
| } |