blob: ed6c1e2e62f2ce32b8c613edc2b7473f9c99ccf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.rm;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.foreman.rm.EmbeddedQueryQueue;
import com.google.common.collect.Iterables;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.junit.AfterClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMemoryCalculator extends PlanTestBase {
private static final long DEFAULT_SLICE_TARGET = 100000L;
private static final long DEFAULT_BATCH_SIZE = 16*1024*1024;
private static final UserSession session = UserSession.Builder.newBuilder()
.withCredentials(UserBitShared.UserCredentials.newBuilder()
.setUserName("foo")
.build())
.withUserProperties(UserProtos.UserProperties.getDefaultInstance())
.withOptionManager(bits[0].getContext().getOptionManager())
.build();
private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node2", 30011);
private static final DrillbitEndpoint N1_EP3 = newDrillbitEndpoint("node3", 30012);
private static final DrillbitEndpoint N1_EP4 = newDrillbitEndpoint("node4", 30013);
private static final DrillbitEndpoint[] nodeList = {N1_EP1, N1_EP2, N1_EP3, N1_EP4};
private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
}
private static final DrillbitContext drillbitContext = getDrillbitContext();
private static final QueryContext queryContext = new QueryContext(session, drillbitContext,
UserBitShared.QueryId.getDefaultInstance());
@AfterClass
public static void close() throws Exception {
queryContext.close();
}
private final Wrapper mockWrapper(Wrapper rootFragment,
Map<DrillbitEndpoint, NodeResource> resourceMap,
List<DrillbitEndpoint> endpoints,
Map<Fragment, Wrapper> originalToMockWrapper ) {
final Wrapper mockWrapper = mock(Wrapper.class);
originalToMockWrapper.put(rootFragment.getNode(), mockWrapper);
List<Wrapper> mockdependencies = new ArrayList<>();
for (Wrapper dependency : rootFragment.getFragmentDependencies()) {
mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper));
}
when(mockWrapper.getNode()).thenReturn(rootFragment.getNode());
when(mockWrapper.getAssignedEndpoints()).thenReturn(endpoints);
when(mockWrapper.getResourceMap()).thenReturn(resourceMap);
when(mockWrapper.getWidth()).thenReturn(endpoints.size());
when(mockWrapper.getFragmentDependencies()).thenReturn(mockdependencies);
when(mockWrapper.isEndpointsAssignmentDone()).thenReturn(true);
return mockWrapper;
}
private final PlanningSet mockPlanningSet(PlanningSet planningSet,
Map<DrillbitEndpoint, NodeResource> resourceMap,
List<DrillbitEndpoint> endpoints) {
Map<Fragment, Wrapper> wrapperToMockWrapper = new HashMap<>();
Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap,
endpoints, wrapperToMockWrapper);
PlanningSet mockPlanningSet = mock(PlanningSet.class);
when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment);
when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> {
return wrapperToMockWrapper.get(invocation.getArgument(0));
});
return mockPlanningSet;
}
private String getPlanForQuery(String query) throws Exception {
return getPlanForQuery(query, DEFAULT_BATCH_SIZE);
}
private String getPlanForQuery(String query, long outputBatchSize) throws Exception {
return getPlanForQuery(query, outputBatchSize, DEFAULT_SLICE_TARGET);
}
private String getPlanForQuery(String query, long outputBatchSize,
long slice_target) throws Exception {
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
.setOptionDefault(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize)
.setOptionDefault(ExecConstants.SLICE_TARGET, slice_target);
String plan;
try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
plan = client.queryBuilder()
.sql(query)
.explainJson();
}
return plan;
}
private List<DrillbitEndpoint> getEndpoints(int totalMinorFragments,
Set<DrillbitEndpoint> notIn) {
List<DrillbitEndpoint> endpoints = new ArrayList<>();
Iterator drillbits = Iterables.cycle(nodeList).iterator();
while(totalMinorFragments-- > 0) {
DrillbitEndpoint dbit = (DrillbitEndpoint) drillbits.next();
if (!notIn.contains(dbit)) {
endpoints.add(dbit);
}
}
return endpoints;
}
private Set<Wrapper> createSet(Wrapper... wrappers) {
Set<Wrapper> setOfWrappers = new HashSet<>();
for (Wrapper wrapper : wrappers) {
setOfWrappers.add(wrapper);
}
return setOfWrappers;
}
private Fragment getRootFragmentFromPlan(DrillbitContext context,
String plan) throws Exception {
final PhysicalPlanReader planReader = context.getPlanReader();
return PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
}
private PlanningSet preparePlanningSet(List<DrillbitEndpoint> activeEndpoints, long slice_target,
Map<DrillbitEndpoint, NodeResource> resources, String sql,
SimpleParallelizer parallelizer) throws Exception {
Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target));
return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints);
}
@Test
public void TestSingleMajorFragmentWithProjectAndScan() throws Exception {
List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
.collect(Collectors.toMap(x -> x,
x -> NodeResource.create()));
String sql = "SELECT * from cp.`tpch/nation.parquet`";
SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 20));
}
@Test
public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception {
List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
.collect(Collectors.toMap(x -> x,
x -> NodeResource.create()));
String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id";
SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 529570));
}
@Test
public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception {
List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
.collect(Collectors.toMap(x -> x,
x -> NodeResource.create()));
String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id";
SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
PlanningSet planningSet = preparePlanningSet(activeEndpoints, 2, resources, sql, parallelizer);
parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 481490));
}
@Test
public void TestZKBasedQueue() throws Exception {
String sql = "select * from cp.`employee.json`";
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher).configProperty(EmbeddedQueryQueue.ENABLED, true);
try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
client
.queryBuilder()
.sql(sql)
.run();
}
}
}