blob: c19a911f5b0118768174dd5a157a763690b12d52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* This class is to test the conf of {@code org.apache.uniffle.server.ShuffleServerConf.Tags}
* and {@code RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS}
*/
public class AssignmentWithTagsTest extends CoordinatorTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AssignmentWithTagsTest.class);
// KV: tag -> shuffle server id
private static Map<String, List<Integer>> tagOfShufflePorts = new HashMap<>();
private static List<Integer> findAvailablePorts(int num) throws IOException {
List<ServerSocket> sockets = new ArrayList<>();
List<Integer> ports = new ArrayList<>();
for (int i = 0; i < num; i++) {
ServerSocket socket = new ServerSocket(0);
ports.add(socket.getLocalPort());
sockets.add(socket);
}
for (ServerSocket socket : sockets) {
socket.close();
}
return ports;
}
private static void createAndStartShuffleServerWithTags(Set<String> tags) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setString("rss.server.tags", StringUtils.join(tags, ","));
List<Integer> ports = findAvailablePorts(2);
shuffleServerConf.setInteger("rss.rpc.server.port", ports.get(0));
shuffleServerConf.setInteger("rss.jetty.http.port", ports.get(1));
for (String tag : tags) {
tagOfShufflePorts.putIfAbsent(tag, new ArrayList<>());
tagOfShufflePorts.get(tag).add(ports.get(0));
}
tagOfShufflePorts.putIfAbsent(Constants.SHUFFLE_SERVER_VERSION, new ArrayList<>());
tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).add(ports.get(0));
LOG.info("Shuffle server data dir: {}, rpc port: {}, http port: {}", dataDir1 + "," + dataDir2,
ports.get(0), ports.get(1));
ShuffleServer server = new ShuffleServer(shuffleServerConf);
shuffleServers.add(server);
server.start();
}
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet());
}
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet("fixed"));
}
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet("elastic"));
}
// Wait all shuffle servers registering to coordinator
long startTimeMS = System.currentTimeMillis();
while (true) {
int nodeSum = coordinators.get(0).getClusterManager().getNodesNum();
if (nodeSum == 6) {
break;
}
if (System.currentTimeMillis() - startTimeMS > 1000 * 5) {
throw new Exception("Timeout of waiting shuffle servers registry, timeout: 5s.");
}
}
}
@Test
public void testTags() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
// Case1 : only set the single default shuffle version tag
ShuffleAssignmentsInfo assignmentsInfo =
shuffleWriteClient.getShuffleAssignments("app-1",
1, 1, 1, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 1);
List<Integer> assignedServerPorts = assignmentsInfo
.getPartitionToServers()
.values()
.stream()
.flatMap(x -> x.stream())
.map(x -> x.getPort())
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).contains(assignedServerPorts.get(0)));
// Case2: Set the single non-exist shuffle server tag
try {
assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-2",
1, 1, 1, Sets.newHashSet("non-exist"), 1);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
}
// Case3: Set the single fixed tag
assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-3",
1, 1, 1, Sets.newHashSet("fixed"), 1);
assignedServerPorts = assignmentsInfo
.getPartitionToServers()
.values()
.stream()
.flatMap(x -> x.stream())
.map(x -> x.getPort())
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
// case4: Set the multiple tags if exists
assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-4",
1, 1, 1, Sets.newHashSet("fixed", Constants.SHUFFLE_SERVER_VERSION), 1);
assignedServerPorts = assignmentsInfo
.getPartitionToServers()
.values()
.stream()
.flatMap(x -> x.stream())
.map(x -> x.getPort())
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
// case5: Set the multiple tags if non-exist
try {
assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-5",
1, 1, 1, Sets.newHashSet("fixed", "elastic", Constants.SHUFFLE_SERVER_VERSION), 1);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
}
}
}