blob: 9be7c9a465d89669522ecf90eedd977385cf41c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.TypeVariable;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link Combine.CombineFn} delegating all relevant calls to given delegate. This is used to
* create a type anonymous class for cases where the CombineFn is a generic class. The anonymous
* class can then be used in a UDAF as
*
* <pre>
* .registerUdaf("UDAF", new TypedCombineFnDelegate<>(genericCombineFn) {})
* </pre>
*
* @param <InputT> the type of input
* @param <AccumT> the type of accumulator
* @param <OutputT> the type of output
*/
public class TypedCombineFnDelegate<InputT, AccumT, OutputT>
extends Combine.CombineFn<InputT, AccumT, OutputT> {
private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
protected TypedCombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
this.delegate = delegate;
}
@Override
public TypeDescriptor<OutputT> getOutputType() {
return Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
.orElse(delegate.getOutputType());
}
@Override
public TypeDescriptor<InputT> getInputType() {
return Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
.orElse(delegate.getInputType());
}
@Override
public AccumT createAccumulator() {
return delegate.createAccumulator();
}
@Override
public AccumT addInput(AccumT mutableAccumulator, InputT input) {
return delegate.addInput(mutableAccumulator, input);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
return delegate.mergeAccumulators(accumulators);
}
@Override
public OutputT extractOutput(AccumT accumulator) {
return delegate.extractOutput(accumulator);
}
@Override
public AccumT compact(AccumT accumulator) {
return delegate.compact(accumulator);
}
@Override
public OutputT apply(Iterable<? extends InputT> inputs) {
return delegate.apply(inputs);
}
@Override
public OutputT defaultValue() {
return delegate.defaultValue();
}
@Override
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return delegate.getAccumulatorCoder(registry, inputCoder);
}
@Override
public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return delegate.getDefaultOutputCoder(registry, inputCoder);
}
@Override
public String getIncompatibleGlobalWindowErrorMessage() {
return delegate.getIncompatibleGlobalWindowErrorMessage();
}
@Override
public TypeVariable<?> getInputTVariable() {
return delegate.getInputTVariable();
}
@Override
public TypeVariable<?> getAccumTVariable() {
return delegate.getAccumTVariable();
}
@Override
public TypeVariable<?> getOutputTVariable() {
return delegate.getOutputTVariable();
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
delegate.populateDisplayData(builder);
}
@SuppressWarnings("unchecked")
@Nullable
private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
Class<?> cls = Preconditions.checkArgumentNotNull(getClass());
do {
Class<?> superClass = cls.getSuperclass();
if (superClass == null) {
break;
}
if (superClass.equals(TypedCombineFnDelegate.class)) {
@Nonnull
ParameterizedType superType =
(ParameterizedType) Preconditions.checkArgumentNotNull(cls.getGenericSuperclass());
TypeDescriptor<T> candidate =
(TypeDescriptor<T>) TypeDescriptor.of(superType.getActualTypeArguments()[index]);
if (!(candidate instanceof TypeVariable)) {
return candidate;
}
}
cls = superClass;
} while (true);
return null;
}
}