blob: b1a6c4acdd9c796f0e2b6c951c7f3458f45daba8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.function;
import org.apache.wayang.core.optimizer.costs.LoadEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
import org.apache.wayang.core.types.BasicDataUnitType;
import org.apache.wayang.core.types.DataUnitGroupType;
import org.apache.wayang.core.types.DataUnitType;
import java.util.Iterator;
/**
* This descriptor pertains to functions that take multiple data units and aggregate them into a single data unit.
*/
public abstract class AggregationDescriptor<InputType, OutputType> extends FunctionDescriptor {
private final DataUnitGroupType<InputType> inputType;
private final BasicDataUnitType<OutputType> outputType;
// TODO: What about aggregation functions?
public AggregationDescriptor(DataUnitGroupType<InputType> inputType, BasicDataUnitType<OutputType> outputType) {
this(inputType, outputType, new NestableLoadProfileEstimator(
LoadEstimator.createFallback(1, 1),
LoadEstimator.createFallback(1, 1)
));
}
public AggregationDescriptor(Class<InputType> inputTypeClass, Class<OutputType> outputTypeClass) {
this(inputTypeClass, outputTypeClass, new NestableLoadProfileEstimator(
LoadEstimator.createFallback(1, 1),
LoadEstimator.createFallback(1, 1)
));
}
public AggregationDescriptor(Class<InputType> inputTypeClass,
Class<OutputType> outputTypeClass,
LoadProfileEstimator loadProfileEstimator) {
this(DataUnitType.createGrouped(inputTypeClass),
DataUnitType.createBasic(outputTypeClass),
loadProfileEstimator);
}
public AggregationDescriptor(DataUnitGroupType<InputType> inputType, BasicDataUnitType<OutputType> outputType,
LoadProfileEstimator loadProfileEstimator) {
super(loadProfileEstimator);
this.inputType = inputType;
this.outputType = outputType;
}
/**
* This is function is not built to last. It is thought to help out devising programs while we are still figuring
* out how to express functions in a platform-independent way.
*
* @param <Input> input type of the function
* @param <Output> output type of the function
* @return a function that can perform the reduce
*/
public abstract <Input, Output> FlatMapDescriptor.SerializableFunction<Iterator<Input>, Output> getJavaImplementation();
/**
* In generic code, we do not have the type parameter values of operators, functions etc. This method avoids casting issues.
*
* @return this instance with type parameters set to {@link Object}
*/
@SuppressWarnings("unchecked")
public AggregationDescriptor<Object, Object> unchecked() {
return (AggregationDescriptor<Object, Object>) this;
}
public DataUnitGroupType<InputType> getInputType() {
return this.inputType;
}
public BasicDataUnitType<OutputType> getOutputType() {
return this.outputType;
}
@Override
public String toString() {
return String.format("%s[%s]", this.getClass().getSimpleName(), this.getJavaImplementation());
}
}