blob: fe31c6431d0008433c7eb85812ebb8af36fdee6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.reflect;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.DoFn.StartBundle;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;
/**
* Interface for invoking the {@code DoFn} processing methods.
*
* <p>Instantiating a {@link DoFnInvoker} associates it with a specific {@link DoFn} instance,
* referred to as the bound {@link DoFn}.
*/
public interface DoFnInvoker<InputT, OutputT> {
/** Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}. */
void invokeSetup();
/** Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}. */
void invokeStartBundle(DoFn<InputT, OutputT>.StartBundleContext c);
/** Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}. */
void invokeFinishBundle(DoFn<InputT, OutputT>.FinishBundleContext c);
/** Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. */
void invokeTeardown();
/** Invoke the {@link DoFn.OnWindowExpiration} method on the bound {@link DoFn}. */
void invokeOnWindowExpiration(ArgumentProvider<InputT, OutputT> arguments);
/**
* Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
*
* @param extra Factory for producing extra parameter objects (such as window), if necessary.
* @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
* DoFn.ProcessContinuation#stop()} if it returns {@code void}.
*/
DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
/** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
/** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
@SuppressWarnings("TypeParameterUnusedInFormals")
<RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
/**
* Invoke the {@link DoFn.GetRestrictionCoder} method on the bound {@link DoFn}. Called only
* during pipeline construction time.
*/
<RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry coderRegistry);
/** Invoke the {@link DoFn.SplitRestriction} method on the bound {@link DoFn}. */
<RestrictionT> void invokeSplitRestriction(
InputT element,
RestrictionT restriction,
DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
/** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
@SuppressWarnings("TypeParameterUnusedInFormals")
<RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
RestrictionT restriction);
/** Get the bound {@link DoFn}. */
DoFn<InputT, OutputT> getFn();
/**
* Interface for runner implementors to provide implementations of extra context information.
*
* <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an annotated
* {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that has indicated
* it needs the given extra context.
*
* <p>In the case of {@link ProcessElement} it is called once per invocation of {@link
* ProcessElement}.
*/
interface ArgumentProvider<InputT, OutputT> {
/**
* Construct the {@link BoundedWindow} to use within a {@link DoFn} that needs it. This is
* called if the {@link ProcessElement} method has a parameter of type {@link BoundedWindow}.
*
* @return {@link BoundedWindow} of the element currently being processed.
*/
BoundedWindow window();
/** Provides a {@link PaneInfo}. */
PaneInfo paneInfo(DoFn<InputT, OutputT> doFn);
/** Provide {@link PipelineOptions}. */
PipelineOptions pipelineOptions();
/** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */
DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn);
/** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */
DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn);
/** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */
DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn);
/** Provide a {@link DoFn.OnTimerContext} to use with the given {@link DoFn}. */
DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn);
/** Provide a reference to the input element. */
InputT element(DoFn<InputT, OutputT> doFn);
/** Provide a reference to the input sideInput with the specified tag. */
Object sideInput(String tagId);
/**
* Provide a reference to the selected schema field corresponding to the input argument
* specified by index.
*/
Object schemaElement(int index);
/** Provide a reference to the input element timestamp. */
Instant timestamp(DoFn<InputT, OutputT> doFn);
/** Provide a reference to the time domain for a timer firing. */
TimeDomain timeDomain(DoFn<InputT, OutputT> doFn);
/** Provide a {@link OutputReceiver} for outputting to the default output. */
OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn);
/** Provide a {@link OutputReceiver} for outputting rows to the default output. */
OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn);
/** Provide a {@link MultiOutputReceiver} for outputing to the default output. */
MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn);
/**
* If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
* the current {@link ProcessElement} call.
*/
RestrictionTracker<?, ?> restrictionTracker();
/** Returns the state cell for the given {@link StateId}. */
State state(String stateId);
/** Returns the timer for the given {@link TimerId}. */
Timer timer(String timerId);
}
/**
* For testing only, this {@link ArgumentProvider} throws {@link UnsupportedOperationException}
* for all parameters.
*/
class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT, OutputT> {
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public InputT sideInput(String tagId) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public InputT schemaElement(int index) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public BoundedWindow window() {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public PipelineOptions pipelineOptions() {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public State state(String stateId) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public Timer timer(String timerId) {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException(
String.format(
"Should never call non-overridden methods of %s",
FakeArgumentProvider.class.getSimpleName()));
}
}
}