blob: 3271e117dc9c71051d1d76763f4dc08cfc27770b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* Implementation of {@link FragmentParallelizer} where fragment requires
* running on a given set of endpoints. Width per node is depended on the
* affinity to the endpoint and total width (calculated using costs).
*/
public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
private static final Logger logger = LoggerFactory.getLogger(HardAffinityFragmentParallelizer.class);
public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer();
private static String EOL = System.getProperty("line.separator");
private HardAffinityFragmentParallelizer() { /* singleton */}
@Override
public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
final Stats stats = fragmentWrapper.getStats();
final ParallelizationInfo pInfo = stats.getParallelizationInfo();
int totalMaxWidth = 0;
// Go through the affinity map and extract the endpoints that have mandatory assignment requirement
final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap();
for (Entry<DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet()) {
if (entry.getValue().isAssignmentRequired()) {
endpointPool.put(entry.getKey(), entry.getValue());
// Limit the max width of the endpoint to allowed max width.
totalMaxWidth += Math.min(parameters.getMaxWidthPerNode(), entry.getValue().getMaxWidth());
if (totalMaxWidth < 0) {
// If the totalWidth overflows, just keep it at the max value.
totalMaxWidth = Integer.MAX_VALUE;
}
}
}
// Step 1: Find the width taking into account various parameters
// 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
// with the calculation that ExcessiveExchangeRemover uses.
int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
// 1.2. Make sure the width is at least the number of endpoints that require an assignment
width = Math.max(endpointPool.size(), width);
// 1.3. Cap the parallelization width by fragment level width limit and system level per query width limit
width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
checkOrThrow(endpointPool.size() <= width, logger,
"Number of mandatory endpoints ({}) that require an assignment is more than the allowed fragment max " +
"width ({}).", endpointPool.size(), pInfo.getMaxWidth());
// 1.4 Cap the parallelization width by global max query width
width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
checkOrThrow(endpointPool.size() <= width, logger,
"Number of mandatory endpoints ({}) that require an assignment is more than the allowed global query " +
"width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
// 1.5 Cap the parallelization width by max allowed parallelization per node
width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode()));
// 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if the width is more,
// we end up allocating more work units to one or more endpoints that don't have that many work units.
width = Math.min(totalMaxWidth, width);
// Step 2: Select the endpoints
final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
// 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled.
for (Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
endpoints.put(entry.getKey(), 1);
}
int totalAssigned = endpoints.size();
// 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint
int remainingSlots = width - endpoints.size();
while (remainingSlots > 0) {
for (EndpointAffinity epAf : endpointPool.values()) {
final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots);
int currentAssignments = endpoints.get(epAf.getEndpoint());
for (int i=0;
i < moreAllocation &&
totalAssigned < width &&
currentAssignments < parameters.getMaxWidthPerNode() &&
currentAssignments < epAf.getMaxWidth();
i++) {
totalAssigned++;
currentAssignments++;
}
endpoints.put(epAf.getEndpoint(), currentAssignments);
}
final int previousRemainingSlots = remainingSlots;
remainingSlots = width - totalAssigned;
if (previousRemainingSlots == remainingSlots) {
logger.error("Can't parallelize fragment: " +
"Every mandatory node has exhausted the maximum width per node limit." + EOL +
"Endpoint pool: {}" + EOL + "Assignment so far: {}" + EOL + "Width: {}", endpointPool, endpoints, width);
throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
}
}
final List<DrillbitEndpoint> assignedEndpoints = Lists.newArrayList();
for(Entry<DrillbitEndpoint, Integer> entry : endpoints.entrySet()) {
for(int i=0; i < entry.getValue(); i++) {
assignedEndpoints.add(entry.getKey());
}
}
fragmentWrapper.setWidth(width);
fragmentWrapper.assignEndpoints(assignedEndpoints);
}
private static void checkOrThrow(final boolean expr, final Logger logger, final String errMsg, Object... args)
throws PhysicalOperatorSetupException {
if (!expr) {
logger.error(errMsg, args);
throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
}
}
}