blob: 6b1399c513a6fc2ccdf6304d69acca6eb7e2d96d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import org.apache.drill.categories.PlannerTest;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.test.BaseTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.List;
import static java.lang.Integer.MAX_VALUE;
import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
import static org.apache.drill.exec.planner.fragment.HardAffinityFragmentParallelizer.INSTANCE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Category(PlannerTest.class)
public class TestHardAffinityFragmentParallelizer extends BaseTest {
// Create a set of test endpoints
private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node1", 30011);
private static final DrillbitEndpoint N2_EP1 = newDrillbitEndpoint("node2", 30010);
private static final DrillbitEndpoint N2_EP2 = newDrillbitEndpoint("node2", 30011);
private static final DrillbitEndpoint N3_EP1 = newDrillbitEndpoint("node3", 30010);
private static final DrillbitEndpoint N3_EP2 = newDrillbitEndpoint("node3", 30011);
private static final DrillbitEndpoint N4_EP2 = newDrillbitEndpoint("node4", 30011);
private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
}
private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode,
final int maxGlobalWidth) {
return new ParallelizationParameters() {
@Override
public long getSliceTarget() {
return threshold;
}
@Override
public int getMaxWidthPerNode() {
return maxWidthPerNode;
}
@Override
public int getMaxGlobalWidth() {
return maxGlobalWidth;
}
/**
* {@link HardAffinityFragmentParallelizer} doesn't use affinity factor.
* @return
*/
@Override
public double getAffinityFactor() {
return 0.0f;
}
};
}
private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List<EndpointAffinity> epAffs) {
final Fragment fragment = mock(Fragment.class);
final PhysicalOperator root = mock(PhysicalOperator.class);
when(fragment.getRoot()).thenReturn(root);
final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
final Stats stats = fragmentWrapper.getStats();
stats.setDistributionAffinity(DistributionAffinity.HARD);
stats.addCost(cost);
stats.addMinWidth(minWidth);
stats.addMaxWidth(maxWidth);
stats.addEndpointAffinities(epAffs);
return fragmentWrapper;
}
@Test
public void simpleCase1() throws Exception {
final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
// Expect the fragment parallelization to be just one because:
// The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
assertEquals(1, wrapper.getWidth());
final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
assertEquals(1, assignedEps.size());
assertEquals(N1_EP1, assignedEps.get(0));
}
@Test
public void simpleCase2() throws Exception {
// Set the slice target to 1
final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
// Expect the fragment parallelization to be 5:
// 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
// 2. Max width per node is 5 (limits the width 200 to 5)
assertEquals(5, wrapper.getWidth());
final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
assertEquals(5, assignedEps.size());
for (DrillbitEndpoint ep : assignedEps) {
assertEquals(N1_EP1, ep);
}
}
@Test
public void multiNodeCluster1() throws Exception {
final Wrapper wrapper = newWrapper(200, 1, 20,
ImmutableList.of(
new EndpointAffinity(N1_EP1, 0.15, true, MAX_VALUE),
new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N2_EP1, 0.10, true, MAX_VALUE),
new EndpointAffinity(N3_EP2, 0.20, true, MAX_VALUE),
new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE)
));
INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
// Expect the fragment parallelization to be 5 because:
// 1. The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
// 2. Number of mandoatory node assignments are 5 which overrides the cost based width of 1.
assertEquals(5, wrapper.getWidth());
// As there are 5 required eps and the width is 5, everyone gets assigned 1.
final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
assertEquals(5, assignedEps.size());
assertTrue(assignedEps.contains(N1_EP1));
assertTrue(assignedEps.contains(N1_EP2));
assertTrue(assignedEps.contains(N2_EP1));
assertTrue(assignedEps.contains(N3_EP2));
assertTrue(assignedEps.contains(N4_EP2));
}
@Test
public void multiNodeCluster2() throws Exception {
final Wrapper wrapper = newWrapper(200, 1, 20,
ImmutableList.of(
new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
));
INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
// Expect the fragment parallelization to be 20 because:
// 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
// 2. Number of mandatory node assignments are 5 (current width 200 satisfies the requirement)
// 3. max fragment width is 20 which limits the width
assertEquals(20, wrapper.getWidth());
final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
assertEquals(20, assignedEps.size());
final HashMultiset<DrillbitEndpoint> counts = HashMultiset.create();
for(final DrillbitEndpoint ep : assignedEps) {
counts.add(ep);
}
// Each node gets at max 5.
assertTrue(counts.count(N1_EP2) <= 5);
assertTrue(counts.count(N2_EP2) <= 5);
assertTrue(counts.count(N3_EP1) <= 5);
assertTrue(counts.count(N4_EP2) <= 5);
assertTrue(counts.count(N1_EP1) <= 5);
}
@Test
public void multiNodeClusterNegative1() throws Exception {
final Wrapper wrapper = newWrapper(200, 1, 20,
ImmutableList.of(
new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
));
try {
INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
fail("Expected an exception, because max global query width (2) is less than the number of mandatory nodes (5)");
} catch (Exception e) {
// ok
}
}
@Test
public void multiNodeClusterNegative2() throws Exception {
final Wrapper wrapper = newWrapper(200, 1, 3,
ImmutableList.of(
new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
));
try {
INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
fail("Expected an exception, because max fragment width (3) is less than the number of mandatory nodes (5)");
} catch (Exception e) {
// ok
}
}
}