blob: 86b7b0560d4909adb721c8a2a54ca997dcf3e33d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.plan;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.EstimateProvider;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plandump.DumpableConnection;
import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A Channel represents the result produced by an operator and the data exchange before the
* consumption by the target operator.
*
* <p>The channel defines and tracks various properties and characteristics of the data set and data
* exchange.
*
* <p>Data set characteristics:
*
* <ul>
* <li>The "global properties" of the data, i.e., how the data is distributed across partitions
* <li>The "required global properties" of the data, i.e., the global properties that, if absent,
* would cause the program to return a wrong result.
* <li>The "local properties" of the data, i.e., how the data is organized within a partition
* <li>The "required local properties" of the data, i.e., the local properties that, if absent,
* would cause the program to return a wrong result.
* </ul>
*
* Data exchange parameters:
*
* <ul>
* <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...
* <li>The "ship keys", which are the positions of the key fields in the exchanged records.
* <li>The "data exchange mode", which defines whether to pipeline or batch the exchange
* <li>Several more...
* </ul>
*/
public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
private PlanNode source;
private PlanNode target;
private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
private DataExchangeMode dataExchangeMode;
private LocalStrategy localStrategy = LocalStrategy.NONE;
private FieldList shipKeys;
private FieldList localKeys;
private boolean[] shipSortOrder;
private boolean[] localSortOrder;
private RequestedGlobalProperties requiredGlobalProps;
private RequestedLocalProperties requiredLocalProps;
private GlobalProperties globalProps;
private LocalProperties localProps;
private TypeSerializerFactory<?> serializer;
private TypeComparatorFactory<?> shipStrategyComparator;
private TypeComparatorFactory<?> localStrategyComparator;
private DataDistribution dataDistribution;
private Partitioner<?> partitioner;
private TempMode tempMode;
private double relativeTempMemory;
private double relativeMemoryLocalStrategy;
private int replicationFactor = 1;
// --------------------------------------------------------------------------------------------
public Channel(PlanNode sourceNode) {
this(sourceNode, null);
}
public Channel(PlanNode sourceNode, TempMode tempMode) {
this.source = sourceNode;
this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
}
// --------------------------------------------------------------------------------------------
// Accessors
// --------------------------------------------------------------------------------------------
/**
* Gets the source of this Channel.
*
* @return The source.
*/
@Override
public PlanNode getSource() {
return this.source;
}
public void setSource(PlanNode source) {
this.source = source;
}
/**
* Sets the target of this Channel.
*
* @param target The target.
*/
public void setTarget(PlanNode target) {
this.target = target;
}
/**
* Gets the target of this Channel.
*
* @return The target.
*/
public PlanNode getTarget() {
return this.target;
}
public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) {
setShipStrategy(strategy, null, null, null, dataExchangeMode);
}
public void setShipStrategy(
ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) {
setShipStrategy(strategy, keys, null, null, dataExchangeMode);
}
public void setShipStrategy(
ShipStrategyType strategy,
FieldList keys,
boolean[] sortDirection,
DataExchangeMode dataExchangeMode) {
setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode);
}
public void setShipStrategy(
ShipStrategyType strategy,
FieldList keys,
Partitioner<?> partitioner,
DataExchangeMode dataExchangeMode) {
setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode);
}
public void setShipStrategy(
ShipStrategyType strategy,
FieldList keys,
boolean[] sortDirection,
Partitioner<?> partitioner,
DataExchangeMode dataExchangeMode) {
this.shipStrategy = strategy;
this.shipKeys = keys;
this.shipSortOrder = sortDirection;
this.partitioner = partitioner;
this.dataExchangeMode = dataExchangeMode;
this.globalProps = null; // reset the global properties
}
/**
* Sets the data exchange mode (batch / pipelined) to use for the data exchange of this channel.
*/
public void setDataExchangeMode(DataExchangeMode dataExchangeMode) {
this.dataExchangeMode = checkNotNull(dataExchangeMode);
}
/**
* Gets the data exchange mode (batch / pipelined) to use for the data exchange of this channel.
*
* @return The data exchange mode of this channel.
*/
public DataExchangeMode getDataExchangeMode() {
return dataExchangeMode;
}
public ShipStrategyType getShipStrategy() {
return this.shipStrategy;
}
public FieldList getShipStrategyKeys() {
return this.shipKeys;
}
public boolean[] getShipStrategySortOrder() {
return this.shipSortOrder;
}
public void setLocalStrategy(LocalStrategy strategy) {
setLocalStrategy(strategy, null, null);
}
public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
this.localStrategy = strategy;
this.localKeys = keys;
this.localSortOrder = sortDirection;
this.localProps = null; // reset the local properties
}
public LocalStrategy getLocalStrategy() {
return this.localStrategy;
}
public FieldList getLocalStrategyKeys() {
return this.localKeys;
}
public boolean[] getLocalStrategySortOrder() {
return this.localSortOrder;
}
public void setDataDistribution(DataDistribution dataDistribution) {
this.dataDistribution = dataDistribution;
}
public DataDistribution getDataDistribution() {
return this.dataDistribution;
}
public Partitioner<?> getPartitioner() {
return partitioner;
}
public TempMode getTempMode() {
return this.tempMode;
}
/**
* Sets the temp mode of the connection.
*
* @param tempMode The temp mode of the connection.
*/
public void setTempMode(TempMode tempMode) {
this.tempMode = tempMode;
}
/**
* Gets the memory for materializing the channel's result from this Channel.
*
* @return The temp memory.
*/
public double getRelativeTempMemory() {
return this.relativeTempMemory;
}
/**
* Sets the memory for materializing the channel's result from this Channel.
*
* @param relativeTempMemory The memory for materialization.
*/
public void setRelativeTempMemory(double relativeTempMemory) {
this.relativeTempMemory = relativeTempMemory;
}
/**
* Sets the replication factor of the connection.
*
* @param factor The replication factor of the connection.
*/
public void setReplicationFactor(int factor) {
this.replicationFactor = factor;
}
/**
* Returns the replication factor of the connection.
*
* @return The replication factor of the connection.
*/
public int getReplicationFactor() {
return this.replicationFactor;
}
/**
* Gets the serializer from this Channel.
*
* @return The serializer.
*/
public TypeSerializerFactory<?> getSerializer() {
return serializer;
}
/**
* Sets the serializer for this Channel.
*
* @param serializer The serializer to set.
*/
public void setSerializer(TypeSerializerFactory<?> serializer) {
this.serializer = serializer;
}
/**
* Gets the ship strategy comparator from this Channel.
*
* @return The ship strategy comparator.
*/
public TypeComparatorFactory<?> getShipStrategyComparator() {
return shipStrategyComparator;
}
/**
* Sets the ship strategy comparator for this Channel.
*
* @param shipStrategyComparator The ship strategy comparator to set.
*/
public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) {
this.shipStrategyComparator = shipStrategyComparator;
}
/**
* Gets the local strategy comparator from this Channel.
*
* @return The local strategy comparator.
*/
public TypeComparatorFactory<?> getLocalStrategyComparator() {
return localStrategyComparator;
}
/**
* Sets the local strategy comparator for this Channel.
*
* @param localStrategyComparator The local strategy comparator to set.
*/
public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) {
this.localStrategyComparator = localStrategyComparator;
}
public double getRelativeMemoryLocalStrategy() {
return relativeMemoryLocalStrategy;
}
public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
}
public boolean isOnDynamicPath() {
return this.source.isOnDynamicPath();
}
public int getCostWeight() {
return this.source.getCostWeight();
}
// --------------------------------------------------------------------------------------------
// Statistic Estimates
// --------------------------------------------------------------------------------------------
@Override
public long getEstimatedOutputSize() {
long estimate = this.source.template.getEstimatedOutputSize();
return estimate < 0 ? estimate : estimate * this.replicationFactor;
}
@Override
public long getEstimatedNumRecords() {
long estimate = this.source.template.getEstimatedNumRecords();
return estimate < 0 ? estimate : estimate * this.replicationFactor;
}
@Override
public float getEstimatedAvgWidthPerOutputRecord() {
return this.source.template.getEstimatedAvgWidthPerOutputRecord();
}
// --------------------------------------------------------------------------------------------
// Data Property Handling
// --------------------------------------------------------------------------------------------
public RequestedGlobalProperties getRequiredGlobalProps() {
return requiredGlobalProps;
}
public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
this.requiredGlobalProps = requiredGlobalProps;
}
public RequestedLocalProperties getRequiredLocalProps() {
return requiredLocalProps;
}
public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
this.requiredLocalProps = requiredLocalProps;
}
public GlobalProperties getGlobalProperties() {
if (this.globalProps == null) {
this.globalProps = this.source.getGlobalProperties().clone();
switch (this.shipStrategy) {
case BROADCAST:
this.globalProps.clearUniqueFieldCombinations();
this.globalProps.setFullyReplicated();
break;
case PARTITION_HASH:
this.globalProps.setHashPartitioned(this.shipKeys);
break;
case PARTITION_RANGE:
this.globalProps.setRangePartitioned(
Utils.createOrdering(this.shipKeys, this.shipSortOrder),
this.dataDistribution);
break;
case FORWARD:
break;
case PARTITION_RANDOM:
this.globalProps.reset();
break;
case PARTITION_FORCED_REBALANCE:
this.globalProps.setForcedRebalanced();
break;
case PARTITION_CUSTOM:
this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
break;
case NONE:
throw new CompilerException(
"Cannot produce GlobalProperties before ship strategy is set.");
}
}
return this.globalProps;
}
public LocalProperties getLocalProperties() {
if (this.localProps == null) {
computeLocalPropertiesAfterShippingOnly();
switch (this.localStrategy) {
case NONE:
break;
case SORT:
case COMBININGSORT:
this.localProps =
LocalProperties.forOrdering(
Utils.createOrdering(this.localKeys, this.localSortOrder));
break;
default:
throw new CompilerException("Unsupported local strategy for channel.");
}
}
return this.localProps;
}
private void computeLocalPropertiesAfterShippingOnly() {
switch (this.shipStrategy) {
case BROADCAST:
case PARTITION_HASH:
case PARTITION_CUSTOM:
case PARTITION_RANGE:
case PARTITION_RANDOM:
case PARTITION_FORCED_REBALANCE:
this.localProps = new LocalProperties();
break;
case FORWARD:
this.localProps = this.source.getLocalProperties();
break;
case NONE:
throw new CompilerException("ShipStrategy has not yet been set.");
default:
throw new CompilerException("Unknown ShipStrategy.");
}
}
public void adjustGlobalPropertiesForFullParallelismChange() {
if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
throw new IllegalStateException(
"Cannot adjust channel for parallelism "
+ "change before the ship strategy is set.");
}
// make sure the properties are acquired
if (this.globalProps == null) {
getGlobalProperties();
}
// some strategies globally reestablish properties
switch (this.shipStrategy) {
case FORWARD:
throw new CompilerException(
"Cannot use FORWARD strategy between operations "
+ "with different number of parallel instances.");
case NONE: // excluded by sanity check. left here for verification check completion
case BROADCAST:
case PARTITION_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
case PARTITION_FORCED_REBALANCE:
case PARTITION_CUSTOM:
return;
}
throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
}
// --------------------------------------------------------------------------------------------
/** Utility method used while swapping binary union nodes for n-ary union nodes. */
public void swapUnionNodes(PlanNode newUnionNode) {
if (!(this.source instanceof BinaryUnionPlanNode)) {
throw new IllegalStateException();
} else {
this.source = newUnionNode;
}
}
// --------------------------------------------------------------------------------------------
public int getMaxDepth() {
return this.source.getOptimizerNode().getMaxDepth() + 1;
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return "Channel ("
+ this.source
+ (this.target == null ? ')' : ") -> (" + this.target + ')')
+ '['
+ this.shipStrategy
+ "] ["
+ this.localStrategy
+ "] "
+ (this.tempMode == null || this.tempMode == TempMode.NONE
? "{NO-TEMP}"
: this.tempMode);
}
@Override
public Channel clone() {
try {
return (Channel) super.clone();
} catch (CloneNotSupportedException cnsex) {
throw new RuntimeException(cnsex);
}
}
}