blob: 9a3740e2494312bdc41a9f5595f11805be79ef35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
* The minimal amount of information required to create and use a {@link SideInputReader} when using
* the portability framework within the Dataflow Runner harness.
*
* <p>Note that this is being used to satisfy type constraints for {@link SideInputReader} used
* throughout the Dataflow Runner harness and only supports returning the tag, the coder, the
* materialization, and the window coder. All other methods throw {@link
* UnsupportedOperationException}.
*
* <p>TODO: Migrate to a runner only specific concept of a side input to be used with {@link
* SideInputReader}s.
*/
public class DataflowPortabilityPCollectionView<K, V, W extends BoundedWindow>
implements PCollectionView<MultimapView<K, V>> {
public static <K, V> PCollectionView<MultimapView<K, V>> with(
TupleTag<KV<K, V>> tag, FullWindowedValueCoder<KV<K, V>> coder) {
return new DataflowPortabilityPCollectionView(tag, coder);
}
private final TupleTag<KV<K, V>> tag;
private final FullWindowedValueCoder<KV<K, V>> coder;
private DataflowPortabilityPCollectionView(
TupleTag<KV<K, V>> tag, FullWindowedValueCoder<KV<K, V>> coder) {
this.tag = tag;
this.coder = coder;
}
@Nullable
@Override
public PCollection<?> getPCollection() {
throw new UnsupportedOperationException();
}
@Override
public TupleTag<KV<K, V>> getTagInternal() {
return tag;
}
@Override
public ViewFn<?, MultimapView<K, V>> getViewFn() {
return (ViewFn) PortabilityViewFn.INSTANCE;
}
/**
* A minimal type {@link ViewFn} that satisfies requirements to be used when executing portable
* pipelines.
*/
public static class PortabilityViewFn<K, V>
extends ViewFn<MultimapView<K, V>, MultimapView<K, V>> {
private static final PortabilityViewFn<Object, Object> INSTANCE = new PortabilityViewFn<>();
// prevent instantiation
private PortabilityViewFn() {}
@Override
public Materialization<MultimapView<K, V>> getMaterialization() {
return Materializations.multimap();
}
@Override
public MultimapView<K, V> apply(MultimapView<K, V> o) {
return o;
}
};
@Override
public WindowMappingFn<?> getWindowMappingFn() {
throw new UnsupportedOperationException();
}
@Override
public WindowingStrategy<KV<K, V>, W> getWindowingStrategyInternal() {
return WindowingStrategy.of(
new WindowFn<KV<K, V>, W>() {
@Override
public Collection<W> assignWindows(AssignContext c) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void mergeWindows(MergeContext c) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
throw new UnsupportedOperationException();
}
@Override
public Coder<W> windowCoder() {
return (Coder) coder.getWindowCoder();
}
@Override
public WindowMappingFn<W> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException();
}
});
}
@Override
public Coder<?> getCoderInternal() {
return coder.getValueCoder();
}
@Override
public String getName() {
throw new UnsupportedOperationException();
}
@Override
public Pipeline getPipeline() {
throw new UnsupportedOperationException();
}
@Override
public Map<TupleTag<?>, PValue> expand() {
throw new UnsupportedOperationException();
}
@Override
public void finishSpecifyingOutput(
String transformName, PInput input, PTransform<?, ?> transform) {
throw new UnsupportedOperationException();
}
@Override
public void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform) {
throw new UnsupportedOperationException();
}
}