blob: 96f892962ce5975d7b45c1ea4ad70325fbd6c437 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.RangeMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
public class ShuffleFlushManager {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleFlushManager.class);
public static final AtomicLong ATOMIC_EVENT_ID = new AtomicLong(0);
private final ShuffleServer shuffleServer;
private final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue();
private final ThreadPoolExecutor threadPoolExecutor;
private final List<String> storageBasePaths;
private final String shuffleServerId;
private final String storageType;
private final int storageDataReplica;
private final ShuffleServerConf shuffleServerConf;
private Configuration hadoopConf;
// appId -> shuffleId -> partitionId -> handlers
private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> handlers = Maps.newConcurrentMap();
// appId -> shuffleId -> committed shuffle blockIds
private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds = Maps.newConcurrentMap();
private final int retryMax;
private final StorageManager storageManager;
private final BlockingQueue<PendingShuffleFlushEvent> pendingEvents = Queues.newLinkedBlockingQueue();
private final long pendingEventTimeoutSec;
private int processPendingEventIndex = 0;
public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, String shuffleServerId, ShuffleServer shuffleServer,
StorageManager storageManager) {
this.shuffleServerId = shuffleServerId;
this.shuffleServer = shuffleServer;
this.shuffleServerConf = shuffleServerConf;
this.storageManager = storageManager;
initHadoopConf();
retryMax = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_WRITE_RETRY_MAX);
storageType = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE);
storageDataReplica = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_DATA_REPLICA);
int waitQueueSize = shuffleServerConf.getInteger(
ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(waitQueueSize);
int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
ThreadUtils.getThreadFactory("FlushEventThreadPool"));
storageBasePaths = shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
// the thread for flush data
Runnable processEventRunnable = () -> {
while (true) {
try {
ShuffleDataFlushEvent event = flushQueue.take();
threadPoolExecutor.execute(() -> {
try {
ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size());
ShuffleServerMetrics.gaugeWriteHandler.inc();
flushToFile(event);
ShuffleServerMetrics.gaugeWriteHandler.dec();
} catch (Exception e) {
LOG.error("Exception happened when flush data for " + event, e);
}
});
} catch (Exception e) {
LOG.error("Exception happened when process event.", e);
}
}
};
Thread processEventThread = new Thread(processEventRunnable);
processEventThread.setName("ProcessEventThread");
processEventThread.setDaemon(true);
processEventThread.start();
// todo: extract a class named Service, and support stop method
Thread thread = new Thread("PendingEventProcessThread") {
@Override
public void run() {
for (; ; ) {
try {
processPendingEvents();
processPendingEventIndex = (processPendingEventIndex + 1) % 1000;
if (processPendingEventIndex == 0) {
// todo: get sleep interval from configuration
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
} catch (Exception e) {
LOG.error(getName() + " happened exception: ", e);
}
}
}
};
thread.setDaemon(true);
thread.start();
}
public void addToFlushQueue(ShuffleDataFlushEvent event) {
if (!flushQueue.offer(event)) {
LOG.warn("Flush queue is full, discard event: " + event);
}
}
private void flushToFile(ShuffleDataFlushEvent event) {
Storage storage = storageManager.selectStorage(event);
if (storage != null && !storage.canWrite()) {
addPendingEvents(event);
return;
}
long start = System.currentTimeMillis();
List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
boolean writeSuccess = false;
try {
// storage info maybe null if the application cache was cleared already
if (storage != null) {
if (blocks == null || blocks.isEmpty()) {
LOG.info("There is no block to be flushed: " + event);
} else if (!event.isValid()) {
// avoid printing error log
writeSuccess = true;
LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
} else {
String user = StringUtils.defaultString(
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
StringUtils.EMPTY
);
ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new CreateShuffleWriteHandlerRequest(
storageType,
event.getAppId(),
event.getShuffleId(),
event.getStartPartition(),
event.getEndPartition(),
storageBasePaths.toArray(new String[storageBasePaths.size()]),
shuffleServerId,
hadoopConf,
storageDataReplica,
user)
);
do {
if (event.getRetryTimes() > retryMax) {
LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
break;
}
if (!event.isValid()) {
LOG.warn("AppId {} was removed already, event {} should be dropped, may leak one handler",
event.getAppId(), event);
// avoid printing error log
writeSuccess = true;
break;
}
writeSuccess = storageManager.write(storage, handler, event);
if (writeSuccess) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
break;
} else {
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
}
} while (event.getRetryTimes() <= retryMax);
}
}
} catch (Exception e) {
// just log the error, don't throw the exception and stop the flush thread
LOG.error("Exception happened when process flush shuffle data for " + event, e);
} finally {
cleanupFlushEventData(event);
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
LOG.debug("Flush to file success in " + duration + " ms and release " + event.getSize() + " bytes");
} else {
LOG.error("Flush to file for " + event + " failed in "
+ duration + " ms and release " + event.getSize() + " bytes");
}
}
}
}
private void updateCommittedBlockIds(String appId, int shuffleId, List<ShufflePartitionedBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
return;
}
if (!committedBlockIds.containsKey(appId)) {
committedBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
}
Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : blocks) {
bitmap.addLong(spb.getBlockId());
}
}
}
public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) {
Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = committedBlockIds.get(appId);
if (shuffleIdToBlockIds == null) {
LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]");
return Roaring64NavigableMap.bitmapOf();
}
Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "], shuffleId[" + shuffleId + "]");
return Roaring64NavigableMap.bitmapOf();
}
return blockIds;
}
public void removeResources(String appId) {
handlers.remove(appId);
committedBlockIds.remove(appId);
}
protected void initHadoopConf() {
hadoopConf = new Configuration();
for (String key : shuffleServerConf.getKeySet()) {
if (key.startsWith(ShuffleServerConf.PREFIX_HADOOP_CONF)) {
String value = shuffleServerConf.getString(key, "");
String hadoopKey = key.substring(ShuffleServerConf.PREFIX_HADOOP_CONF.length() + 1);
LOG.info("Update hadoop configuration:" + hadoopKey + "=" + value);
hadoopConf.set(hadoopKey, value);
}
}
}
public int getEventNumInFlush() {
return flushQueue.size();
}
public Configuration getHadoopConf() {
return hadoopConf;
}
@VisibleForTesting
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> getHandlers() {
return handlers;
}
@VisibleForTesting
void processPendingEvents() throws Exception {
PendingShuffleFlushEvent event = pendingEvents.take();
Storage storage = storageManager.selectStorage(event.getEvent());
if (storage == null) {
dropPendingEvent(event);
LOG.error("Flush event cannot be flushed because of application related was cleared, {}", event.getEvent());
return;
}
if (System.currentTimeMillis() - event.getCreateTimeStamp() > pendingEventTimeoutSec * 1000L) {
dropPendingEvent(event);
LOG.error("Flush event cannot be flushed for {} sec, the event {} is dropped",
pendingEventTimeoutSec, event.getEvent());
return;
}
// storage maybe null if the application cache was cleared already
// add event to flush queue, and it will be released
if (storage.canWrite()) {
addToFlushQueue(event.getEvent());
return;
}
addPendingEventsInternal(event);
}
private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
if (shuffleBuffer != null) {
shuffleBuffer.clearInFlushBuffer(event.getEventId());
}
if (shuffleServer != null) {
shuffleServer.getShuffleBufferManager().releaseMemory(
event.getSize(), true, false);
}
}
private void dropPendingEvent(PendingShuffleFlushEvent event) {
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
cleanupFlushEventData(event.getEvent());
}
@VisibleForTesting
void addPendingEvents(ShuffleDataFlushEvent event) {
addPendingEventsInternal(new PendingShuffleFlushEvent(event));
}
@VisibleForTesting
int getPendingEventsSize() {
return pendingEvents.size();
}
private void addPendingEventsInternal(PendingShuffleFlushEvent event) {
boolean pendingEventsResult = pendingEvents.offer(event);
ShuffleDataFlushEvent flushEvent = event.getEvent();
if (!pendingEventsResult) {
LOG.error("Post pendingEvent queue fail!! App: " + flushEvent.getAppId() + " Shuffle "
+ flushEvent.getShuffleId() + " Partition " + flushEvent.getStartPartition());
}
}
private static class PendingShuffleFlushEvent {
private final ShuffleDataFlushEvent event;
private final long createTimeStamp = System.currentTimeMillis();
PendingShuffleFlushEvent(ShuffleDataFlushEvent event) {
this.event = event;
}
public ShuffleDataFlushEvent getEvent() {
return event;
}
public long getCreateTimeStamp() {
return createTimeStamp;
}
}
}