blob: 4ce44aa2fdd4b57e3cf23c8fc05f78d6cde53248 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static java.util.Objects.requireNonNull;
/**
* The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute
* user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
* have one input (such as {@link org.apache.flink.api.common.functions.RichMapFunction} or
* {@link org.apache.flink.api.common.functions.RichReduceFunction}).
*
* <p>This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
* through configuration objects, and semantic properties.
* @param <IN> The data type of the input data set.
* @param <OUT> The data type of the returned data set.
*/
@Public
public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>>
extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {
private Configuration parameters;
private Map<String, DataSet<?>> broadcastVariables;
// NOTE: only set this variable via setSemanticProperties()
private SingleInputSemanticProperties udfSemantics;
private boolean analyzedUdfSemantics;
// --------------------------------------------------------------------------------------------
/**
* Creates a new operators with the given data set as input. The given result type
* describes the data type of the elements in the data set produced by the operator.
*
* @param input The data set that is the input to the operator.
* @param resultType The type of the elements in the resulting data set.
*/
protected SingleInputUdfOperator(DataSet<IN> input, TypeInformation<OUT> resultType) {
super(input, resultType);
}
protected abstract Function getFunction();
// --------------------------------------------------------------------------------------------
// Fluent API methods
// --------------------------------------------------------------------------------------------
@Override
public O withParameters(Configuration parameters) {
this.parameters = parameters;
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}
@Override
public O withBroadcastSet(DataSet<?> data, String name) {
if (data == null) {
throw new IllegalArgumentException("Broadcast variable data must not be null.");
}
if (name == null) {
throw new IllegalArgumentException("Broadcast variable name must not be null.");
}
if (this.broadcastVariables == null) {
this.broadcastVariables = new HashMap<String, DataSet<?>>();
}
this.broadcastVariables.put(name, data);
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}
/**
* Adds semantic information about forwarded fields of the user-defined function.
* The forwarded fields information declares fields which are never modified by the function and
* which are forwarded at the same position to the output or unchanged copied to another position in the output.
*
* <p>Fields that are forwarded at the same position are specified by their position.
* The specified position must be valid for the input and output data type and have the same type.
* For example <code>withForwardedFields("f2")</code> declares that the third field of a Java input tuple is
* copied to the third field of an output tuple.
*
* <p>Fields which are unchanged copied to another position in the output are declared by specifying the
* source field reference in the input and the target field reference in the output.
* {@code withForwardedFields("f0->f2")} denotes that the first field of the Java input tuple is
* unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
* the number of declared fields and their types in input and output type match.
*
* <p>Multiple forwarded fields can be annotated in one ({@code withForwardedFields("f2; f3->f0; f4")})
* or separate Strings ({@code withForwardedFields("f2", "f3->f0", "f4")}).
* Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for
* details on field references such as nested fields and wildcard.
*
* <p>It is not possible to override existing semantic information about forwarded fields which was
* for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} class annotation.
*
* <p><b>NOTE: Adding semantic information for functions is optional!
* If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans.
* However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results!
* So be careful when adding semantic information.
* </b>
*
* @param forwardedFields A list of field forward expressions.
* @return This operator with annotated forwarded field information.
*
* @see org.apache.flink.api.java.functions.FunctionAnnotation
* @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
*/
public O withForwardedFields(String... forwardedFields) {
if (this.udfSemantics == null) {
// extract semantic properties from function annotations
setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
}
if (this.udfSemantics == null
|| this.analyzedUdfSemantics) { // discard analyzed semantic properties
setSemanticProperties(new SingleInputSemanticProperties());
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
} else {
if (udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
// refuse semantic information as it would override the function annotation
throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " +
"has already been added by a function annotation for this operator. " +
"Cannot overwrite function annotations.");
} else {
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
}
}
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}
// ------------------------------------------------------------------------
// type hinting
// ------------------------------------------------------------------------
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Classes can be used as type hints for non-generic types (classes without generic parameters),
* but not for generic types like for example Tuples. For those generic types, please
* use the {@link #returns(TypeHint)} method.
*
* <p>Use this method the following way:
* <pre>{@code
* DataSet<String[]> result =
* data.flatMap(new FunctionWithNonInferrableReturnType())
* .returns(String[].class);
* }</pre>
*
* @param typeClass The class of the returned data type.
* @return This operator with the type information corresponding to the given type class.
*/
public O returns(Class<OUT> typeClass) {
requireNonNull(typeClass, "type class must not be null");
try {
return returns(TypeInformation.of(typeClass));
}
catch (InvalidTypesException e) {
throw new InvalidTypesException("Cannot infer the type information from the class alone." +
"This is most likely because the class represents a generic type. In that case," +
"please use the 'returns(TypeHint)' method instead.", e);
}
}
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Use this method the following way:
* <pre>{@code
* DataSet<Tuple2<String, Double>> result =
* data.flatMap(new FunctionWithNonInferrableReturnType())
* .returns(new TypeHint<Tuple2<String, Double>>(){});
* }</pre>
*
* @param typeHint The type hint for the returned data type.
* @return This operator with the type information corresponding to the given type hint.
*/
public O returns(TypeHint<OUT> typeHint) {
requireNonNull(typeHint, "TypeHint must not be null");
try {
return returns(TypeInformation.of(typeHint));
}
catch (InvalidTypesException e) {
throw new InvalidTypesException("Cannot infer the type information from the type hint. " +
"Make sure that the TypeHint does not use any generic type variables.");
}
}
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)}
* are preferable.
*
* @param typeInfo The type information for the returned data type.
* @return This operator using the given type information for the return type.
*/
public O returns(TypeInformation<OUT> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
fillInType(typeInfo);
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}
/**
* Adds a type information hint about the return type of this operator.
*
*
* <p>Type hints are important in cases where the Java compiler
* throws away generic type information necessary for efficient execution.
*
*
* <p>This method takes a type information string that will be parsed. A type information string can contain the following
* types:
*
* <ul>
* <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
* <li>Basic type arrays such as <code>Integer[]</code>,
* <code>String[]</code>, etc.
* <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
* <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
* <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
* <li>Generic types such as <code>java.lang.Class</code>, etc.
* <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
* <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
* <li>Value types such as <code>DoubleValue</code>,
* <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
* <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
* <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
* <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
* </ul>
*
* <p>Example:
* <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
*
* @param typeInfoString
* type information string to be parsed
* @return This operator with a given return type hint.
*
* @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
*/
@Deprecated
@PublicEvolving
public O returns(String typeInfoString) {
if (typeInfoString == null) {
throw new IllegalArgumentException("Type information string must not be null.");
}
return returns(TypeInfoParser.<OUT>parse(typeInfoString));
}
// --------------------------------------------------------------------------------------------
// Accessors
// --------------------------------------------------------------------------------------------
@Override
@Internal
public Map<String, DataSet<?>> getBroadcastSets() {
return this.broadcastVariables == null ?
Collections.<String, DataSet<?>>emptyMap() :
Collections.unmodifiableMap(this.broadcastVariables);
}
@Override
public Configuration getParameters() {
return this.parameters;
}
@Override
@Internal
public SingleInputSemanticProperties getSemanticProperties() {
if (this.udfSemantics == null || analyzedUdfSemantics) {
SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
if (props != null) {
setSemanticProperties(props);
}
}
if (this.udfSemantics == null) {
setSemanticProperties(new SingleInputSemanticProperties());
}
return this.udfSemantics;
}
/**
* Sets the semantic properties for the user-defined function (UDF). The semantic properties
* define how fields of tuples and other objects are modified or preserved through this UDF.
* The configured properties can be retrieved via {@link UdfOperator#getSemanticProperties()}.
*
* @param properties The semantic properties for the UDF.
* @see UdfOperator#getSemanticProperties()
*/
@Internal
public void setSemanticProperties(SingleInputSemanticProperties properties) {
this.udfSemantics = properties;
this.analyzedUdfSemantics = false;
}
protected boolean getAnalyzedUdfSemanticsFlag() {
return this.analyzedUdfSemantics;
}
protected void setAnalyzedUdfSemanticsFlag() {
this.analyzedUdfSemantics = true;
}
protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass);
return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());
}
protected boolean udfWithForwardedFieldsAnnotation(Class<?> udfClass) {
return udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null ||
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null;
}
}