| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.flink.translation.wrappers.streaming; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import java.nio.ByteBuffer; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.beam.runners.core.DoFnRunner; |
| import org.apache.beam.runners.core.KeyedWorkItem; |
| import org.apache.beam.runners.core.KeyedWorkItems; |
| import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; |
| import org.apache.beam.runners.core.OutputWindowedValue; |
| import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; |
| import org.apache.beam.runners.core.StateInternals; |
| import org.apache.beam.runners.core.StateInternalsFactory; |
| import org.apache.beam.runners.core.TimerInternals; |
| import org.apache.beam.runners.core.TimerInternalsFactory; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.DoFnSchemaInformation; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.flink.api.java.functions.KeySelector; |
| import org.apache.flink.runtime.state.StateInitializationContext; |
| import org.apache.flink.streaming.api.operators.InternalTimer; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| |
| /** |
| * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing the |
| * {@code @ProcessElement} method of a splittable {@link DoFn}. |
| */ |
| public class SplittableDoFnOperator<InputT, OutputT, RestrictionT> |
| extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> { |
| |
| private transient ScheduledExecutorService executorService; |
| |
| public SplittableDoFnOperator( |
| DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> doFn, |
| String stepName, |
| Coder<WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>> windowedInputCoder, |
| Coder<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> inputCoder, |
| Map<TupleTag<?>, Coder<?>> outputCoders, |
| TupleTag<OutputT> mainOutputTag, |
| List<TupleTag<?>> additionalOutputTags, |
| OutputManagerFactory<OutputT> outputManagerFactory, |
| WindowingStrategy<?, ?> windowingStrategy, |
| Map<Integer, PCollectionView<?>> sideInputTagMapping, |
| Collection<PCollectionView<?>> sideInputs, |
| PipelineOptions options, |
| Coder<?> keyCoder, |
| KeySelector<WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, ?> keySelector) { |
| super( |
| doFn, |
| stepName, |
| windowedInputCoder, |
| inputCoder, |
| outputCoders, |
| mainOutputTag, |
| additionalOutputTags, |
| outputManagerFactory, |
| windowingStrategy, |
| sideInputTagMapping, |
| sideInputs, |
| options, |
| keyCoder, |
| keySelector, |
| DoFnSchemaInformation.create(), |
| Collections.emptyMap()); |
| } |
| |
| @Override |
| protected DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> |
| createWrappingDoFnRunner( |
| DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> wrappedRunner) { |
| // don't wrap in anything because we don't need state cleanup because ProcessFn does |
| // all that |
| return wrappedRunner; |
| } |
| |
| @Override |
| public void initializeState(StateInitializationContext context) throws Exception { |
| super.initializeState(context); |
| |
| checkState(doFn instanceof ProcessFn); |
| |
| // this will implicitly be keyed by the key of the incoming |
| // element or by the key of a firing timer |
| StateInternalsFactory<byte[]> stateInternalsFactory = |
| key -> (StateInternals) keyedStateInternals; |
| |
| // this will implicitly be keyed like the StateInternalsFactory |
| TimerInternalsFactory<byte[]> timerInternalsFactory = key -> timerInternals; |
| |
| executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()); |
| |
| ((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory); |
| ((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory); |
| ((ProcessFn) doFn) |
| .setProcessElementInvoker( |
| new OutputAndTimeBoundedSplittableProcessElementInvoker<>( |
| doFn, |
| serializedOptions.get(), |
| new OutputWindowedValue<OutputT>() { |
| @Override |
| public void outputWindowedValue( |
| OutputT output, |
| Instant timestamp, |
| Collection<? extends BoundedWindow> windows, |
| PaneInfo pane) { |
| outputManager.output( |
| mainOutputTag, WindowedValue.of(output, timestamp, windows, pane)); |
| } |
| |
| @Override |
| public <AdditionalOutputT> void outputWindowedValue( |
| TupleTag<AdditionalOutputT> tag, |
| AdditionalOutputT output, |
| Instant timestamp, |
| Collection<? extends BoundedWindow> windows, |
| PaneInfo pane) { |
| outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); |
| } |
| }, |
| sideInputReader, |
| executorService, |
| 10000, |
| Duration.standardSeconds(10))); |
| } |
| |
| @Override |
| protected void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> timer) { |
| timerInternals.cleanupPendingTimer(timer.getNamespace()); |
| if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) { |
| // ignore this, it can only be a state cleanup timers from StatefulDoFnRunner and ProcessFn |
| // does its own state cleanup and should never set event-time timers. |
| return; |
| } |
| doFnRunner.processElement( |
| WindowedValue.valueInGlobalWindow( |
| KeyedWorkItems.timersWorkItem( |
| (byte[]) keyedStateInternals.getKey(), |
| Collections.singletonList(timer.getNamespace())))); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| super.close(); |
| |
| executorService.shutdown(); |
| |
| long shutdownTimeout = Duration.standardSeconds(10).getMillis(); |
| try { |
| if (!executorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { |
| LOG.debug( |
| "The scheduled executor service did not properly terminate. Shutting " |
| + "it down now."); |
| executorService.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| LOG.debug("Could not properly await the termination of the scheduled executor service.", e); |
| executorService.shutdownNow(); |
| } |
| } |
| } |