blob: 88c5438ffde877f4089471dc2c55877f88ac3d4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.io.File;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.KerberizedHdfsBase;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.apache.uniffle.test.ShuffleReadWriteBase.mockSSI;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
private static final int COORDINATOR_RPC_PROT = 19999;
private static final int SHUFFLE_SERVER_PORT = 29999;
private static final String COORDINATOR_QUORUM = "localhost:" + COORDINATOR_RPC_PROT;
private ShuffleServerGrpcClient shuffleServerClient;
private static CoordinatorServer coordinatorServer;
private static ShuffleServer shuffleServer;
private static ShuffleServerConf getShuffleServerConf() throws Exception {
File dataFolder = Files.createTempDirectory("rssdata").toFile();
ShuffleServerConf serverConf = new ShuffleServerConf();
dataFolder.deleteOnExit();
serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT);
serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", "671088640");
serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
serverConf.setString("rss.server.read.buffer.capacity", "335544320");
serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
serverConf.setString("rss.server.heartbeat.delay", "1000");
serverConf.setString("rss.server.heartbeat.interval", "1000");
serverConf.setInteger("rss.jetty.http.port", 18080);
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
return serverConf;
}
@BeforeAll
public static void setup() throws Exception {
testRunner = ShuffleServerWithKerberizedHdfsTest.class;
KerberizedHdfsBase.init();
CoordinatorConf coordinatorConf = new CoordinatorConf();
coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 19999);
coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 19998);
coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
coordinatorServer = new CoordinatorServer(coordinatorConf);
coordinatorServer.start();
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();
}
@AfterAll
public static void afterAll() throws Exception {
if (coordinatorServer != null) {
coordinatorServer.stopServer();
}
if (shuffleServer != null) {
shuffleServer.stopServer();
}
}
@BeforeEach
public void beforeEach() throws Exception {
initHadoopSecurityContext();
shuffleServerClient = new ShuffleServerGrpcClient("localhost", SHUFFLE_SERVER_PORT);
}
@AfterEach
public void clearEach() throws Exception {
removeHadoopSecurityContext();
shuffleServerClient.close();
}
private Map<String, String> conf2Map(Configuration conf) {
Map<String, String> confMap = new HashMap<>();
for (Map.Entry<String, String> entry : conf) {
confMap.put(entry.getKey(), entry.getValue());
}
return confMap;
}
private Map<Integer, List<ShuffleBlockInfo>> createTestData(
Roaring64NavigableMap[] bitmaps,
Map<Long, byte[]> expectedData) {
for (int i = 0; i < 4; i++) {
bitmaps[i] = Roaring64NavigableMap.bitmapOf();
}
List<ShuffleBlockInfo> blocks1 = ShuffleReadWriteBase.createShuffleBlockList(
0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI);
List<ShuffleBlockInfo> blocks2 = ShuffleReadWriteBase.createShuffleBlockList(
0, 1, 1, 5, 25, bitmaps[1], expectedData, mockSSI);
List<ShuffleBlockInfo> blocks3 = ShuffleReadWriteBase.createShuffleBlockList(
0, 2, 2, 4, 25, bitmaps[2], expectedData, mockSSI);
List<ShuffleBlockInfo> blocks4 = ShuffleReadWriteBase.createShuffleBlockList(
0, 3, 3, 1, 25, bitmaps[3], expectedData, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(0, blocks1);
partitionToBlocks.put(1, blocks2);
partitionToBlocks.put(2, blocks3);
partitionToBlocks.put(3, blocks4);
return partitionToBlocks;
}
@Test
public void hdfsWriteReadTest() throws Exception {
String alexDir = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/alex/";
String user = "alex";
String appId = "app_hdfs_read_write";
String dataBasePath = alexDir + "rss/test";
RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo(
dataBasePath,
conf2Map(kerberizedHdfs.getConf())
);
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(
appId,
0,
Lists.newArrayList(new PartitionRange(0, 1)),
remoteStorageInfo,
user
);
shuffleServerClient.registerShuffle(rrsr);
rrsr = new RssRegisterShuffleRequest(
appId,
0,
Lists.newArrayList(new PartitionRange(2, 3)),
remoteStorageInfo,
user
);
shuffleServerClient.registerShuffle(rrsr);
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(0, dataBlocks.get(0));
partitionToBlocks.put(1, dataBlocks.get(1));
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
assertEquals(456, shuffleServer.getShuffleBufferManager().getUsedMemory());
assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
0,
0,
100,
2,
10,
1000,
dataBasePath,
bitmaps[0],
Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(),
new Configuration(),
new DefaultIdHelper()
);
assertNull(readClient.readShuffleBlockData());
shuffleServerClient.finishShuffle(rfsr);
partitionToBlocks.clear();
partitionToBlocks.put(2, dataBlocks.get(2));
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
rfsr = new RssFinishShuffleRequest(appId, 0);
shuffleServerClient.finishShuffle(rfsr);
partitionToBlocks.clear();
partitionToBlocks.put(3, dataBlocks.get(3));
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
rfsr = new RssFinishShuffleRequest(appId, 0);
shuffleServerClient.finishShuffle(rfsr);
readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
0,
0,
100,
2,
10,
1000,
dataBasePath, bitmaps[0],
Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(),
new Configuration(),
new DefaultIdHelper()
);
validateResult(readClient, expectedData, bitmaps[0]);
readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
0,
1,
100,
2,
10,
1000,
dataBasePath,
bitmaps[1],
Roaring64NavigableMap.bitmapOf(1),
Lists.newArrayList(),
new Configuration(),
new DefaultIdHelper()
);
validateResult(readClient, expectedData, bitmaps[1]);
readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
0,
2,
100,
2,
10,
1000,
dataBasePath,
bitmaps[2],
Roaring64NavigableMap.bitmapOf(2),
Lists.newArrayList(),
new Configuration(),
new DefaultIdHelper()
);
validateResult(readClient, expectedData, bitmaps[2]);
readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
0,
3,
100,
2,
10,
1000,
dataBasePath,
bitmaps[3],
Roaring64NavigableMap.bitmapOf(3),
Lists.newArrayList(),
new Configuration(),
new DefaultIdHelper()
);
validateResult(readClient, expectedData, bitmaps[3]);
}
protected void validateResult(ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap) {
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
while (csb != null && csb.getByteBuffer() != null) {
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (TestUtils.compareByte(entry.getValue(), csb.getByteBuffer())) {
matched.addLong(entry.getKey());
break;
}
}
csb = readClient.readShuffleBlockData();
}
assertTrue(blockIdBitmap.equals(matched));
}
}