blob: 697ccf12427aa32496f85db92702cf686b4f1606 [file] [log] [blame]
/*
* Tencent is pleased to support the open source community by making
* Firestorm-Spark remote shuffle server available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://opensource.org/licenses/Apache-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.rss.server;
import com.tencent.rss.common.ShufflePartitionedBlock;
import com.tencent.rss.common.ShufflePartitionedData;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
public class ShuffleBuffer {
private final int capacity;
private long size;
private List<ShufflePartitionedBlock> blocks;
public ShuffleBuffer(int capacity) {
this.capacity = capacity;
this.size = 0;
this.blocks = new LinkedList<>();
}
public int append(ShufflePartitionedData data) {
int mSize = 0;
synchronized (this) {
for (ShufflePartitionedBlock block : data.getBlockList()) {
blocks.add(block);
mSize += block.getSize();
size += mSize;
}
}
return mSize;
}
public synchronized ShuffleDataFlushEvent toFlushEvent(
String appId, int shuffleId, int startPartition, int endPartition, Supplier<Boolean> isValid) {
if (blocks.isEmpty()) {
return null;
}
// buffer will be cleared, and new list must be created for async flush
List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement(),
appId,
shuffleId,
startPartition,
endPartition,
size,
spBlocks,
isValid);
blocks.clear();
size = 0;
return event;
}
public List<ShufflePartitionedBlock> getBlocks() {
return blocks;
}
public long getSize() {
return size;
}
public boolean isFull() {
return size > capacity;
}
}