blob: 434cb2fd6db88b72e9d5cd34b9cbb412a7f74a1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
/**
* Implementation of {@link FragmentParallelizer} where fragment has zero or
* more endpoints with affinities. Width per node is depended on the affinity to
* the endpoint and total width (calculated using costs). Based on various
* factors endpoints which have no affinity can be assigned to run the
* fragments.
*/
public class SoftAffinityFragmentParallelizer implements FragmentParallelizer {
public static final SoftAffinityFragmentParallelizer INSTANCE = new SoftAffinityFragmentParallelizer();
private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING =
Ordering.from(new Comparator<EndpointAffinity>() {
@Override
public int compare(EndpointAffinity o1, EndpointAffinity o2) {
// Sort in descending order of affinity values
return Double.compare(o2.getAffinity(), o1.getAffinity());
}
});
@Override
public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
// Find the parallelization width of fragment
final Stats stats = fragmentWrapper.getStats();
final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
// 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
// with the calculation that ExcessiveExchangeRemover uses.
int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
// 2. Cap the parallelization width by fragment level width limit and system level per query width limit
width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
// 3. Cap the parallelization width by system level per node width limit
width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size());
// 4. Make sure width is at least the min width enforced by operators
width = Math.max(parallelizationInfo.getMinWidth(), width);
// 4. Make sure width is at most the max width enforced by operators
width = Math.min(parallelizationInfo.getMaxWidth(), width);
// 5 Finally make sure the width is at least one
width = Math.max(1, width);
fragmentWrapper.setWidth(width);
final List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth(), parameters);
fragmentWrapper.assignEndpoints(assignedEndpoints);
}
// Assign endpoints based on the given endpoint list, affinity map and width.
private List<DrillbitEndpoint> findEndpoints(final Collection<DrillbitEndpoint> activeEndpoints,
final Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width,
final ParallelizationParameters parameters)
throws PhysicalOperatorSetupException {
final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
if (endpointAffinityMap.size() > 0) {
// Get EndpointAffinity list sorted in descending order of affinity values
List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
// Find the number of mandatory nodes (nodes with +infinity affinity).
int numRequiredNodes = 0;
for(EndpointAffinity ep : sortedAffinityList) {
if (ep.isAssignmentRequired()) {
numRequiredNodes++;
} else {
// As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
// of non-mandatory node
break;
}
}
if (width < numRequiredNodes) {
throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
"less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
}
// Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
int affinedSlots =
Math.max(1, (int) (Math.ceil(parameters.getAffinityFactor() * width / activeEndpoints.size()) * sortedAffinityList.size()));
// Make sure affined slots is at least the number of mandatory nodes
affinedSlots = Math.max(affinedSlots, numRequiredNodes);
// Cap the affined slots to max parallelization width
affinedSlots = Math.min(affinedSlots, width);
Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
// Keep adding until we have selected "affinedSlots" number of endpoints.
while(endpoints.size() < affinedSlots) {
EndpointAffinity ea = affinedEPItr.next();
endpoints.add(ea.getEndpoint());
}
}
// add remaining endpoints if required
if (endpoints.size() < width) {
// Get a list of endpoints that are not part of the affinity endpoint list
List<DrillbitEndpoint> endpointsWithNoAffinity;
final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
if (endpointAffinityMap.size() > 0) {
endpointsWithNoAffinity = Lists.newArrayList();
for (DrillbitEndpoint ep : activeEndpoints) {
if (!endpointsWithAffinity.contains(ep)) {
endpointsWithNoAffinity.add(ep);
}
}
} else {
endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
// immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
// support immutable copy as input.
}
// round robin with random start.
Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
Iterator<DrillbitEndpoint> otherEPItr =
Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
while (endpoints.size() < width) {
endpoints.add(otherEPItr.next());
}
}
return endpoints;
}
}