blob: 56d2189805f32eafe7085c049be245d40b050241 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server.buffer;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.RangeMap;
import com.google.common.util.concurrent.Uninterruptibles;
import io.prometheus.client.Collector;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.DefaultFlushEventHandler;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ShuffleBufferManagerTest extends BufferTestBase {
private ShuffleBufferManager shuffleBufferManager;
private ShuffleFlushManager mockShuffleFlushManager;
private ShuffleServer mockShuffleServer;
private ShuffleTaskManager mockShuffleTaskManager;
private ShuffleServerConf conf;
@BeforeEach
public void setUp(@TempDir File tmpDir) {
conf = new ShuffleServerConf();
File dataDir = new File(tmpDir, "data");
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath()));
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
mockShuffleFlushManager = mock(ShuffleFlushManager.class);
mockShuffleServer = mock(ShuffleServer.class);
mockShuffleTaskManager = mock(ShuffleTaskManager.class);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
shuffleBufferManager = new ShuffleBufferManager(conf, mockShuffleFlushManager, false);
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
}
@Test
public void registerBufferTest() {
String appId = "registerBufferTest";
int shuffleId = 1;
StatusCode sc = shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
assertEquals(StatusCode.SUCCESS, sc);
sc = shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
assertEquals(StatusCode.SUCCESS, sc);
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleBufferManager.getBufferPool();
assertNotNull(bufferPool.get(appId).get(shuffleId).get(0));
ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(1));
assertNotNull(bufferPool.get(appId).get(shuffleId).get(2));
assertEquals(
bufferPool.get(appId).get(shuffleId).get(2), bufferPool.get(appId).get(shuffleId).get(3));
// register again
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(0));
}
@Test
public void getShuffleDataWithExpectedTaskIdsTest() {
String appId = "getShuffleDataWithExpectedTaskIdsTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
shuffleBufferManager.registerBuffer(appId, 1, 0, 1);
ShufflePartitionedData spd1 = createData(0, 1, 68);
ShufflePartitionedData spd2 = createData(0, 2, 68);
ShufflePartitionedData spd3 = createData(0, 1, 68);
ShufflePartitionedData spd4 = createData(0, 3, 68);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd2);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd3);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd4);
/** case1: all blocks in cached and read multiple times */
ShuffleDataResult result =
shuffleBufferManager.getShuffleData(
appId, 1, 0, Constants.INVALID_BLOCK_ID, 60, Roaring64NavigableMap.bitmapOf(1));
assertEquals(1, result.getBufferSegments().size());
assertEquals(0, result.getBufferSegments().get(0).getOffset());
assertEquals(68, result.getBufferSegments().get(0).getLength());
// 2nd read
long lastBlockId = result.getBufferSegments().get(0).getBlockId();
result =
shuffleBufferManager.getShuffleData(
appId, 1, 0, lastBlockId, 60, Roaring64NavigableMap.bitmapOf(1));
assertEquals(1, result.getBufferSegments().size());
assertEquals(0, result.getBufferSegments().get(0).getOffset());
assertEquals(68, result.getBufferSegments().get(0).getLength());
}
@Test
public void getShuffleDataTest() {
String appId = "getShuffleDataTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
shuffleBufferManager.registerBuffer(appId, 1, 0, 1);
shuffleBufferManager.registerBuffer(appId, 2, 0, 1);
shuffleBufferManager.registerBuffer(appId, 3, 0, 1);
shuffleBufferManager.registerBuffer(appId, 4, 0, 1);
ShufflePartitionedData spd1 = createData(0, 68);
ShufflePartitionedData spd2 = createData(0, 68);
ShufflePartitionedData spd3 = createData(0, 68);
ShufflePartitionedData spd4 = createData(0, 68);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId, 2, false, spd2);
shuffleBufferManager.cacheShuffleData(appId, 2, false, spd3);
shuffleBufferManager.cacheShuffleData(appId, 3, false, spd4);
// validate buffer, no flush happened
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleBufferManager.getBufferPool();
assertEquals(100, bufferPool.get(appId).get(1).get(0).getSize());
assertEquals(200, bufferPool.get(appId).get(2).get(0).getSize());
assertEquals(100, bufferPool.get(appId).get(3).get(0).getSize());
// validate get shuffle data
ShuffleDataResult sdr =
shuffleBufferManager.getShuffleData(appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()), sdr.getData());
long lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()), sdr.getData());
// flush happen
ShufflePartitionedData spd5 = createData(0, 10);
shuffleBufferManager.cacheShuffleData(appId, 4, false, spd5);
// according to flush strategy, some buffers should be moved to inFlushMap
assertEquals(0, bufferPool.get(appId).get(1).get(0).getBlocks().size());
assertEquals(1, bufferPool.get(appId).get(1).get(0).getInFlushBlockMap().size());
assertEquals(0, bufferPool.get(appId).get(2).get(0).getBlocks().size());
assertEquals(1, bufferPool.get(appId).get(2).get(0).getInFlushBlockMap().size());
assertEquals(0, bufferPool.get(appId).get(3).get(0).getBlocks().size());
assertEquals(1, bufferPool.get(appId).get(3).get(0).getInFlushBlockMap().size());
// keep buffer whose size < low watermark
assertEquals(1, bufferPool.get(appId).get(4).get(0).getBlocks().size());
// data in flush buffer now, it also can be got before flush finish
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()), sdr.getData());
lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()), sdr.getData());
// cache data again, it should cause flush
spd1 = createData(0, 10);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
assertEquals(1, bufferPool.get(appId).get(1).get(0).getBlocks().size());
// finish flush
bufferPool.get(appId).get(1).get(0).getInFlushBlockMap().clear();
bufferPool.get(appId).get(2).get(0).getInFlushBlockMap().clear();
bufferPool.get(appId).get(3).get(0).getInFlushBlockMap().clear();
// empty data return
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
assertEquals(0, sdr.getDataLength());
lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
assertEquals(0, sdr.getDataLength());
}
@Test
public void shuffleIdToSizeTest() {
String appId1 = "shuffleIdToSizeTest1";
String appId2 = "shuffleIdToSizeTest2";
ReentrantReadWriteLock rwLock1 = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId1)).thenReturn(rwLock1.readLock());
ReentrantReadWriteLock rwLock2 = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId2)).thenReturn(rwLock2.readLock());
shuffleBufferManager.registerBuffer(appId1, 1, 0, 0);
shuffleBufferManager.registerBuffer(appId1, 2, 0, 0);
shuffleBufferManager.registerBuffer(appId2, 1, 0, 0);
shuffleBufferManager.registerBuffer(appId2, 2, 0, 0);
ShufflePartitionedData spd1 = createData(0, 67);
ShufflePartitionedData spd2 = createData(0, 68);
ShufflePartitionedData spd3 = createData(0, 68);
ShufflePartitionedData spd4 = createData(0, 68);
shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd2);
shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd3);
shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd4);
// validate metadata of shuffle size
Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = shuffleBufferManager.getShuffleSizeMap();
assertEquals(99, shuffleSizeMap.get(appId1).get(1).get());
assertEquals(200, shuffleSizeMap.get(appId1).get(2).get());
assertEquals(100, shuffleSizeMap.get(appId2).get(1).get());
ShufflePartitionedData spd5 = createData(0, 68);
shuffleBufferManager.cacheShuffleData(appId2, 2, false, spd5);
// flush happen
assertEquals(99, shuffleSizeMap.get(appId1).get(1).get());
assertEquals(0, shuffleSizeMap.get(appId1).get(2).get());
assertEquals(0, shuffleSizeMap.get(appId2).get(1).get());
assertEquals(0, shuffleSizeMap.get(appId2).get(1).get());
shuffleBufferManager.releaseMemory(400, true, false);
ShufflePartitionedData spd6 = createData(0, 300);
shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd6);
// flush happen
assertEquals(0, shuffleSizeMap.get(appId1).get(1).get());
shuffleBufferManager.releaseMemory(463, true, false);
shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd2);
shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd4);
shuffleBufferManager.removeBuffer(appId1);
assertNull(shuffleSizeMap.get(appId1));
assertEquals(100, shuffleSizeMap.get(appId2).get(1).get());
}
@Test
public void cacheShuffleDataTest() {
String appId = "cacheShuffleDataTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int startPartitionNum = (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get();
int shuffleId = 1;
StatusCode sc =
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(StatusCode.NO_REGISTER, sc);
shuffleBufferManager.registerBuffer(appId, shuffleId + 1, 0, 1);
assertEquals(startPartitionNum + 1, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(StatusCode.NO_REGISTER, sc);
shuffleBufferManager.registerBuffer(appId, shuffleId, 100, 101);
assertEquals(startPartitionNum + 2, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(StatusCode.NO_REGISTER, sc);
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
assertEquals(startPartitionNum + 3, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(StatusCode.SUCCESS, sc);
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleBufferManager.getBufferPool();
ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
assertEquals(48, buffer.getSize());
assertEquals(48, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(96, buffer.getSize());
assertEquals(96, shuffleBufferManager.getUsedMemory());
// reach high water lever, flush
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 273));
assertEquals(0, buffer.getSize());
assertEquals(401, shuffleBufferManager.getUsedMemory());
assertEquals(401, shuffleBufferManager.getInFlushSize());
verify(mockShuffleFlushManager, times(1)).addToFlushQueue(any());
// now buffer should be full
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 100));
verify(mockShuffleFlushManager, times(1)).addToFlushQueue(any());
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 1));
assertEquals(StatusCode.NO_BUFFER, sc);
// size won't be reduced which should be processed by flushManager, reset buffer size to 0
shuffleBufferManager.resetSize();
shuffleBufferManager.removeBuffer(appId);
assertEquals(startPartitionNum, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
shuffleBufferManager.registerBuffer(appId, shuffleId, 1, 1);
shuffleBufferManager.registerBuffer(appId, 2, 0, 0);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 200));
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(1, 200));
shuffleBufferManager.cacheShuffleData(appId, 2, false, createData(0, 32));
ShuffleBuffer buffer0 = bufferPool.get(appId).get(shuffleId).get(0);
ShuffleBuffer buffer1 = bufferPool.get(appId).get(shuffleId).get(1);
ShuffleBuffer buffer2 = bufferPool.get(appId).get(2).get(0);
assertEquals(0, buffer0.getSize());
assertEquals(0, buffer1.getSize());
assertEquals(64, buffer2.getSize());
assertEquals(528, shuffleBufferManager.getUsedMemory());
assertEquals(464, shuffleBufferManager.getInFlushSize());
verify(mockShuffleFlushManager, times(3)).addToFlushQueue(any());
}
@Test
public void cacheShuffleDataWithPreAllocationTest() {
String appId = "cacheShuffleDataWithPreAllocationTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 1;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
// pre allocate memory
shuffleBufferManager.requireMemory(48, true);
assertEquals(48, shuffleBufferManager.getUsedMemory());
assertEquals(48, shuffleBufferManager.getPreAllocatedSize());
// receive data with preAllocation
shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, createData(0, 16));
assertEquals(48, shuffleBufferManager.getUsedMemory());
assertEquals(48, shuffleBufferManager.getPreAllocatedSize());
// release memory
shuffleBufferManager.releaseMemory(48, false, true);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
// receive data without preAllocation
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 17));
assertEquals(49, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
// release memory
shuffleBufferManager.releaseMemory(49, false, false);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
// release memory with preAllocation
shuffleBufferManager.requireMemory(16, true);
shuffleBufferManager.releaseMemory(16, false, true);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
// pre allocate all memory
shuffleBufferManager.requireMemory(500, true);
assertEquals(500, shuffleBufferManager.getUsedMemory());
assertEquals(500, shuffleBufferManager.getPreAllocatedSize());
// no buffer if data without pre allocation
StatusCode sc =
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(1, 16));
assertEquals(StatusCode.NO_BUFFER, sc);
// actual data size < spillThreshold, won't flush
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, createData(1, 16));
shuffleBufferManager.releasePreAllocatedSize(48);
assertEquals(StatusCode.SUCCESS, sc);
assertEquals(500, shuffleBufferManager.getUsedMemory());
assertEquals(452, shuffleBufferManager.getPreAllocatedSize());
// actual data size > highWaterMark, flush
shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, createData(0, 400));
shuffleBufferManager.releasePreAllocatedSize(432);
// trigger flush manually
shuffleBufferManager.flushIfNecessary();
assertEquals(StatusCode.SUCCESS, sc);
assertEquals(500, shuffleBufferManager.getUsedMemory());
assertEquals(20, shuffleBufferManager.getPreAllocatedSize());
verify(mockShuffleFlushManager, times(1)).addToFlushQueue(any());
}
@Test
public void bufferSizeTest() throws Exception {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager(conf);
ShuffleFlushManager shuffleFlushManager =
new ShuffleFlushManager(conf, mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(conf, shuffleFlushManager, false);
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mock(ShuffleTaskManager.class));
String appId = "bufferSizeTest";
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
int shuffleId = 1;
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
shuffleBufferManager.registerBuffer(appId, shuffleId, 4, 5);
shuffleBufferManager.registerBuffer(appId, shuffleId, 6, 7);
shuffleBufferManager.registerBuffer(appId, shuffleId, 8, 9);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(48, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 16));
assertEquals(96, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 300));
waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 64));
assertEquals(96, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(2, 64));
assertEquals(192, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(4, 64));
assertEquals(288, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(6, 64));
assertEquals(384, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(8, 64));
waitForFlush(shuffleFlushManager, appId, shuffleId, 5);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
shuffleBufferManager.registerBuffer("bufferSizeTest1", shuffleId, 0, 1);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 32));
assertEquals(64, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData("bufferSizeTest1", shuffleId, false, createData(0, 32));
assertEquals(128, shuffleBufferManager.getUsedMemory());
assertEquals(2, shuffleBufferManager.getBufferPool().keySet().size());
shuffleBufferManager.removeBuffer(appId);
assertEquals(64, shuffleBufferManager.getUsedMemory());
assertEquals(1, shuffleBufferManager.getBufferPool().keySet().size());
}
@Test
public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleConf = new ShuffleServerConf();
File dataDir = new File(tmpDir, "data");
shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath()));
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1);
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD, 100L);
shuffleConf.set(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, false);
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 64L);
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
ShuffleFlushManager shuffleFlushManager =
new ShuffleFlushManager(shuffleConf, mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf, shuffleFlushManager, false);
ShuffleTaskManager shuffleTaskManager =
new ShuffleTaskManager(
shuffleConf, shuffleFlushManager, shuffleBufferManager, storageManager);
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
String appId = "flushSingleBufferForHugePartitionTest_appId";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId, shuffleId, Arrays.asList(new PartitionRange(0, 0)), new RemoteStorageInfo(""), "");
// case1: its partition is not huge partition
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
ShufflePartitionedData partitionedData = createData(0, 1);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList());
assertEquals(1 + 32, shuffleBufferManager.getUsedMemory());
long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0);
assertEquals(1 + 32, usedSize);
assertFalse(
shuffleBufferManager.limitHugePartition(
appId, shuffleId, 0, shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
// case2: its partition is huge partition, its buffer will be flushed to DISK directly
partitionedData = createData(0, 36);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList());
assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
assertTrue(
shuffleBufferManager.limitHugePartition(
appId, shuffleId, 0, shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
partitionedData = createData(0, 1);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList());
waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
}
@Test
public void flushSingleBufferTest(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleConf = new ShuffleServerConf();
File dataDir = new File(tmpDir, "data");
shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath()));
shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, true);
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 128L);
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
ShuffleFlushManager shuffleFlushManager =
new ShuffleFlushManager(shuffleConf, mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf, shuffleFlushManager, false);
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mock(ShuffleTaskManager.class));
String appId = "bufferSizeTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 1;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 64));
assertEquals(96, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(2, 64));
waitForFlush(shuffleFlushManager, appId, shuffleId, 2);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 32));
assertEquals(64, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(1, 48));
waitForFlush(shuffleFlushManager, appId, shuffleId, 4);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
}
@Test
public void shuffleFlushThreshold() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.addAll(conf);
serverConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 180L);
serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD, 64L);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager(conf);
ShuffleFlushManager shuffleFlushManager =
new ShuffleFlushManager(conf, mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(serverConf, shuffleFlushManager, false);
String appId = "shuffleFlushTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 0;
int smallShuffleId = 1;
int smallShuffleIdTwo = 2;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
shuffleBufferManager.registerBuffer(appId, smallShuffleId, 0, 1);
shuffleBufferManager.registerBuffer(appId, smallShuffleIdTwo, 0, 1);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 64));
assertEquals(96, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31));
assertEquals(96 + 63, shuffleBufferManager.getUsedMemory());
Thread.sleep(100);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
// small shuffle id is kept in memory
assertEquals(63, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
// more data will trigger the flush
shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31));
shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31));
assertEquals(63 * 3, shuffleBufferManager.getUsedMemory());
Thread.sleep(100);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
// all the small data in shuffle server, which could be extremely rare
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 22));
shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 21));
shuffleBufferManager.cacheShuffleData(appId, smallShuffleIdTwo, false, createData(0, 20));
assertEquals(54 + 53 + 52, shuffleBufferManager.getUsedMemory());
Thread.sleep(100);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
assertEquals(52, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
}
private void waitForFlush(
ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId, int expectedBlockNum)
throws Exception {
int retry = 0;
long committedCount = 0;
do {
committedCount =
shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality();
if (committedCount < expectedBlockNum) {
Thread.sleep(500);
}
retry++;
if (retry > 10) {
fail("Flush data time out");
}
} while (committedCount < expectedBlockNum);
// Need to wait for `event.doCleanup` to be executed
// to ensure the correctness of subsequent checks of
// `shuffleBufferManager.getUsedMemory()` and `shuffleBufferManager.getInFlushSize()`.
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.until(() -> shuffleBufferManager.getUsedMemory() == 0);
}
@Test
public void bufferManagerInitTest() {
ShuffleServerConf serverConf = new ShuffleServerConf();
shuffleBufferManager = new ShuffleBufferManager(serverConf, mockShuffleFlushManager, false);
double ratio = ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO.defaultValue();
double readRatio = ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO.defaultValue();
assertEquals(
(long) (Runtime.getRuntime().maxMemory() * ratio), shuffleBufferManager.getCapacity());
assertEquals(
(long) (Runtime.getRuntime().maxMemory() * readRatio),
shuffleBufferManager.getReadCapacity());
ratio = 0.6;
readRatio = 0.1;
serverConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO, ratio);
serverConf.set(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO, readRatio);
shuffleBufferManager = new ShuffleBufferManager(serverConf, mockShuffleFlushManager, false);
assertEquals(
(long) (Runtime.getRuntime().maxMemory() * ratio), shuffleBufferManager.getCapacity());
assertEquals(
(long) (Runtime.getRuntime().maxMemory() * readRatio),
shuffleBufferManager.getReadCapacity());
}
@Test
public void flushBufferTestWhenNotSelectedStorage(@TempDir File tmpDir) throws Exception {
// In this test, rss.server.single.buffer.flush.threshold and
// rss.server.flush.cold.storage.threshold.size are 16.
// When cacheShuffleData with 64 bytes, will flush to HDFS storage, but we do not register
// remote storage.
// Then storageManager.selectStorage will return null, we should make sure that when we can not
// select a storage,
// the resources will not leak.
ShuffleServerConf shuffleConf = new ShuffleServerConf();
File dataDir = new File(tmpDir, "data");
shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath()));
shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, true);
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 16L);
shuffleConf.setSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 16L);
shuffleConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name());
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
ShuffleFlushManager shuffleFlushManager =
new ShuffleFlushManager(shuffleConf, mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf, shuffleFlushManager, false);
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mock(ShuffleTaskManager.class));
String appId = "bufferSizeTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 1;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 64));
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(2, 64));
// wait flush event drained
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.until(() -> shuffleFlushManager.getEventNumInFlush() == 0);
// make sure all cleanup tasks are done.
DefaultFlushEventHandler flushEventHandler =
(DefaultFlushEventHandler) shuffleFlushManager.getEventHandler();
ThreadPoolExecutor executor =
((ThreadPoolExecutor) flushEventHandler.getFallbackThreadPoolExecutor());
executor.shutdown();
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
}
@Test
public void blockSizeMetricsTest() {
String appId = "blockSizeMetricsTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 1;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
// cache shuffle block data, and record metrics
double[] buckets =
ConfigUtils.convertBytesStringToDoubleArray(
new ShuffleServerConf()
.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS));
Arrays.stream(buckets)
.sorted()
.forEach(
bucket -> {
StatusCode sc =
shuffleBufferManager.cacheShuffleData(
appId, shuffleId, true, createData(0, (int) bucket));
assertEquals(StatusCode.SUCCESS, sc);
});
// check metrics values
List<Collector.MetricFamilySamples> samples =
ShuffleServerMetrics.appHistogramWriteBlockSize.collect();
assertEquals(samples.size(), 1);
int index = 1;
Arrays.stream(buckets)
.sorted()
.forEach(
bucket -> {
for (Collector.MetricFamilySamples.Sample s : samples.get(0).samples) {
if (s.labelValues.contains(bucket)) {
assertEquals(s.value, index);
}
}
});
}
}