blob: 2377056bcc20eb6574638edf8e19b24d7479edce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.drill.exec.planner.cost.NodeResource;
import com.google.common.collect.ImmutableList;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Wrapper class that allows us to add additional information to each fragment
* node for planning purposes.
*/
public class Wrapper {
private final Fragment node;
private final int majorFragmentId;
private int width = -1;
private final Stats stats;
private boolean endpointsAssigned;
private long initialAllocation;
private long maxAllocation;
// Resources (i.e memory and cpu) are stored per drillbit in this map.
// A Drillbit can have n number of minor fragments then the NodeResource
// contains cumulative resources required for all the minor fragments
// for that major fragment on that Drillbit.
private Map<DrillbitEndpoint, NodeResource> nodeResourceMap;
// List of fragments this particular fragment depends on for determining its
// parallelization and endpoint assignments.
private final List<Wrapper> fragmentDependencies = Lists.newArrayList();
// List of assigned endpoints. Technically, there could repeated endpoints
// in this list if we'd like to assign the
// same fragment multiple times to the same endpoint.
private final List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
public Wrapper(Fragment node, int majorFragmentId) {
this.majorFragmentId = majorFragmentId;
this.node = node;
this.stats = new Stats();
}
public Stats getStats() {
return stats;
}
public void resetAllocation() {
initialAllocation = 0;
maxAllocation = 0;
}
public int getMajorFragmentId() {
return majorFragmentId;
}
public int getWidth() {
return width;
}
public void setWidth(int width) {
Preconditions.checkState(this.width == -1);
this.width = width;
}
public Fragment getNode() {
return node;
}
public long getInitialAllocation() {
return initialAllocation;
}
public long getMaxAllocation() {
return maxAllocation;
}
public void addInitialAllocation(long memory) {
initialAllocation += memory;
}
public void addMaxAllocation(long memory) {
maxAllocation += memory;
}
private class AssignEndpointsToScanAndStore extends
AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException> {
@Override
public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value)
throws PhysicalOperatorSetupException {
if (exchange == node.getSendingExchange()) {
return visitOp(exchange, value);
}
// stop on receiver exchange.
return null;
}
@Override
public Void visitGroupScan(GroupScan groupScan,
List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
groupScan.applyAssignments(value);
return super.visitGroupScan(groupScan, value);
}
@Override
public Void visitSubScan(SubScan subScan, List<DrillbitEndpoint> value)
throws PhysicalOperatorSetupException {
// TODO - implement this
return visitOp(subScan, value);
}
@Override
public Void visitStore(Store store, List<DrillbitEndpoint> value)
throws PhysicalOperatorSetupException {
store.applyAssignments(value);
return super.visitStore(store, value);
}
@Override
public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value)
throws PhysicalOperatorSetupException {
return visitChildren(op, value);
}
}
public void assignEndpoints(List<DrillbitEndpoint> assignedEndpoints)
throws PhysicalOperatorSetupException {
Preconditions.checkState(!endpointsAssigned);
endpointsAssigned = true;
endpoints.addAll(assignedEndpoints);
// Set scan and store endpoints.
AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
node.getRoot().accept(visitor, endpoints);
// Set the endpoints for this (one at most) sending exchange.
if (node.getSendingExchange() != null) {
node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
}
// Set the endpoints for each incoming exchange within this fragment.
for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) {
e.getExchange().setupReceivers(majorFragmentId, endpoints);
}
}
@Override
public String toString() {
return "FragmentWrapper [majorFragmentId=" + majorFragmentId + ", width="
+ width + ", stats=" + stats + "]";
}
public List<DrillbitEndpoint> getAssignedEndpoints() {
Preconditions.checkState(endpointsAssigned);
return ImmutableList.copyOf(endpoints);
}
public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
Preconditions.checkState(endpointsAssigned);
return endpoints.get(minorFragmentId);
}
/**
* Add a parallelization dependency on given fragment.
*
* @param dependsOn
*/
public void addFragmentDependency(Wrapper dependsOn) {
fragmentDependencies.add(dependsOn);
}
/**
* Is the endpoints assignment done for this fragment?
*
* @return True if the endpoints assignment done for this fragment. False
* otherwise.
*/
public boolean isEndpointsAssignmentDone() {
return endpointsAssigned;
}
/**
* Get the list of fragements this particular fragment depends on.
*
* @return The list of fragements this particular fragment depends on.
*/
public List<Wrapper> getFragmentDependencies() {
return ImmutableList.copyOf(fragmentDependencies);
}
/**
* Compute the cpu resources required for all the minor fragments of this
* major fragment. This information is stored per DrillbitEndpoint. It is
* assumed that this function is called only once.
*/
public void computeCpuResources() {
Preconditions.checkArgument(nodeResourceMap == null);
BinaryOperator<NodeResource> merge = (first, second) -> {
NodeResource result = NodeResource.create();
result.add(first);
result.add(second);
return result;
};
Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint =
endpoint -> new NodeResource(1, 0);
nodeResourceMap = endpoints.stream()
.collect(Collectors.groupingBy(Function.identity(),
Collectors.reducing(NodeResource.create(), cpuPerEndpoint, merge)));
}
public Map<DrillbitEndpoint, NodeResource> getResourceMap() {
return nodeResourceMap;
}
}