| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.flink.translation.wrappers.streaming; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumMap; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.runners.core.DoFnRunner; |
| import org.apache.beam.runners.core.LateDataUtils; |
| import org.apache.beam.runners.core.StateInternals; |
| import org.apache.beam.runners.core.StateNamespace; |
| import org.apache.beam.runners.core.StateNamespaces; |
| import org.apache.beam.runners.core.StateTag; |
| import org.apache.beam.runners.core.StateTags; |
| import org.apache.beam.runners.core.StatefulDoFnRunner; |
| import org.apache.beam.runners.core.TimerInternals; |
| import org.apache.beam.runners.core.construction.Timer; |
| import org.apache.beam.runners.core.construction.graph.ExecutableStage; |
| import org.apache.beam.runners.core.construction.graph.UserStateReference; |
| import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; |
| import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; |
| import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory; |
| import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; |
| import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; |
| import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; |
| import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.TimerSpec; |
| import org.apache.beam.runners.fnexecution.control.RemoteBundle; |
| import org.apache.beam.runners.fnexecution.control.StageBundleFactory; |
| import org.apache.beam.runners.fnexecution.provisioning.JobInfo; |
| import org.apache.beam.runners.fnexecution.state.StateRequestHandler; |
| import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.VoidCoder; |
| import org.apache.beam.sdk.fn.data.FnDataReceiver; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.state.BagState; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.DoFnSchemaInformation; |
| import org.apache.beam.sdk.transforms.join.RawUnionValue; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; |
| import org.apache.flink.api.java.functions.KeySelector; |
| import org.apache.flink.runtime.state.KeyedStateBackend; |
| import org.apache.flink.streaming.api.operators.InternalTimer; |
| import org.apache.flink.streaming.api.watermark.Watermark; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This operator is the streaming equivalent of the {@link |
| * org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction}. It sends all |
| * received elements to the SDK harness and emits the received back elements to the downstream |
| * operators. It also takes care of handling side inputs and state. |
| * |
| * <p>TODO Integrate support for progress updates and metrics |
| */ |
| public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<InputT, OutputT> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageDoFnOperator.class); |
| |
| private final RunnerApi.ExecutableStagePayload payload; |
| private final JobInfo jobInfo; |
| private final FlinkExecutableStageContext.Factory contextFactory; |
| private final Map<String, TupleTag<?>> outputMap; |
| private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds; |
| /** A lock which has to be acquired when concurrently accessing state and timers. */ |
| private final ReentrantLock stateBackendLock; |
| |
| private transient FlinkExecutableStageContext stageContext; |
| private transient StateRequestHandler stateRequestHandler; |
| private transient BundleProgressHandler progressHandler; |
| private transient StageBundleFactory stageBundleFactory; |
| private transient ExecutableStage executableStage; |
| private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner; |
| private transient FlinkMetricContainer flinkMetricContainer; |
| private transient long backupWatermarkHold = Long.MIN_VALUE; |
| |
| /** Constructor. */ |
| public ExecutableStageDoFnOperator( |
| String stepName, |
| Coder<WindowedValue<InputT>> windowedInputCoder, |
| Coder<InputT> inputCoder, |
| Map<TupleTag<?>, Coder<?>> outputCoders, |
| TupleTag<OutputT> mainOutputTag, |
| List<TupleTag<?>> additionalOutputTags, |
| OutputManagerFactory<OutputT> outputManagerFactory, |
| Map<Integer, PCollectionView<?>> sideInputTagMapping, |
| Collection<PCollectionView<?>> sideInputs, |
| Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, |
| PipelineOptions options, |
| RunnerApi.ExecutableStagePayload payload, |
| JobInfo jobInfo, |
| FlinkExecutableStageContext.Factory contextFactory, |
| Map<String, TupleTag<?>> outputMap, |
| WindowingStrategy windowingStrategy, |
| Coder keyCoder, |
| KeySelector<WindowedValue<InputT>, ?> keySelector) { |
| super( |
| new NoOpDoFn(), |
| stepName, |
| windowedInputCoder, |
| inputCoder, |
| outputCoders, |
| mainOutputTag, |
| additionalOutputTags, |
| outputManagerFactory, |
| windowingStrategy, |
| sideInputTagMapping, |
| sideInputs, |
| options, |
| keyCoder, |
| keySelector, |
| DoFnSchemaInformation.create()); |
| this.payload = payload; |
| this.jobInfo = jobInfo; |
| this.contextFactory = contextFactory; |
| this.outputMap = outputMap; |
| this.sideInputIds = sideInputIds; |
| this.stateBackendLock = new ReentrantLock(); |
| } |
| |
| @Override |
| protected Lock getLockToAcquireForStateAccessDuringBundles() { |
| return stateBackendLock; |
| } |
| |
| @Override |
| public void open() throws Exception { |
| executableStage = ExecutableStage.fromPayload(payload); |
| // TODO: Wire this into the distributed cache and make it pluggable. |
| // TODO: Do we really want this layer of indirection when accessing the stage bundle factory? |
| // It's a little strange because this operator is responsible for the lifetime of the stage |
| // bundle "factory" (manager?) but not the job or Flink bundle factories. How do we make |
| // ownership of the higher level "factories" explicit? Do we care? |
| stageContext = contextFactory.get(jobInfo); |
| flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext()); |
| |
| stageBundleFactory = stageContext.getStageBundleFactory(executableStage); |
| stateRequestHandler = getStateRequestHandler(executableStage); |
| progressHandler = |
| new BundleProgressHandler() { |
| @Override |
| public void onProgress(ProcessBundleProgressResponse progress) { |
| flinkMetricContainer.updateMetrics(stepName, progress.getMonitoringInfosList()); |
| } |
| |
| @Override |
| public void onCompleted(ProcessBundleResponse response) { |
| flinkMetricContainer.updateMetrics(stepName, response.getMonitoringInfosList()); |
| } |
| }; |
| |
| // This will call {@code createWrappingDoFnRunner} which needs the above dependencies. |
| super.open(); |
| } |
| |
| private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) { |
| |
| final StateRequestHandler sideInputStateHandler; |
| if (executableStage.getSideInputs().size() > 0) { |
| checkNotNull(super.sideInputHandler); |
| StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = |
| Preconditions.checkNotNull( |
| FlinkStreamingSideInputHandlerFactory.forStage( |
| executableStage, sideInputIds, super.sideInputHandler)); |
| try { |
| sideInputStateHandler = |
| StateRequestHandlers.forSideInputHandlerFactory( |
| ProcessBundleDescriptors.getSideInputs(executableStage), sideInputHandlerFactory); |
| } catch (IOException e) { |
| throw new RuntimeException("Failed to initialize SideInputHandler", e); |
| } |
| } else { |
| sideInputStateHandler = StateRequestHandler.unsupported(); |
| } |
| |
| final StateRequestHandler userStateRequestHandler; |
| if (executableStage.getUserStates().size() > 0) { |
| if (keyedStateInternals == null) { |
| throw new IllegalStateException("Input must be keyed when user state is used"); |
| } |
| userStateRequestHandler = |
| StateRequestHandlers.forBagUserStateHandlerFactory( |
| stageBundleFactory.getProcessBundleDescriptor(), |
| new BagUserStateFactory( |
| keyedStateInternals, getKeyedStateBackend(), stateBackendLock)); |
| } else { |
| userStateRequestHandler = StateRequestHandler.unsupported(); |
| } |
| |
| EnumMap<TypeCase, StateRequestHandler> handlerMap = new EnumMap<>(TypeCase.class); |
| handlerMap.put(TypeCase.MULTIMAP_SIDE_INPUT, sideInputStateHandler); |
| handlerMap.put(TypeCase.BAG_USER_STATE, userStateRequestHandler); |
| |
| return StateRequestHandlers.delegateBasedUponType(handlerMap); |
| } |
| |
| private static class BagUserStateFactory |
| implements StateRequestHandlers.BagUserStateHandlerFactory { |
| |
| private final StateInternals stateInternals; |
| private final KeyedStateBackend<ByteBuffer> keyedStateBackend; |
| private final Lock stateBackendLock; |
| |
| private BagUserStateFactory( |
| StateInternals stateInternals, |
| KeyedStateBackend<ByteBuffer> keyedStateBackend, |
| Lock stateBackendLock) { |
| |
| this.stateInternals = stateInternals; |
| this.keyedStateBackend = keyedStateBackend; |
| this.stateBackendLock = stateBackendLock; |
| } |
| |
| @Override |
| public <K, V, W extends BoundedWindow> |
| StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState( |
| String pTransformId, |
| String userStateId, |
| Coder<K> keyCoder, |
| Coder<V> valueCoder, |
| Coder<W> windowCoder) { |
| return new StateRequestHandlers.BagUserStateHandler<K, V, W>() { |
| @Override |
| public Iterable<V> get(K key, W window) { |
| try { |
| stateBackendLock.lock(); |
| prepareStateBackend(key); |
| StateNamespace namespace = StateNamespaces.window(windowCoder, window); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "State get for {} {} {} {}", |
| pTransformId, |
| userStateId, |
| Arrays.toString(keyedStateBackend.getCurrentKey().array()), |
| window); |
| } |
| BagState<V> bagState = |
| stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); |
| return bagState.read(); |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void append(K key, W window, Iterator<V> values) { |
| try { |
| stateBackendLock.lock(); |
| prepareStateBackend(key); |
| StateNamespace namespace = StateNamespaces.window(windowCoder, window); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "State append for {} {} {} {}", |
| pTransformId, |
| userStateId, |
| Arrays.toString(keyedStateBackend.getCurrentKey().array()), |
| window); |
| } |
| BagState<V> bagState = |
| stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); |
| while (values.hasNext()) { |
| bagState.add(values.next()); |
| } |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void clear(K key, W window) { |
| try { |
| stateBackendLock.lock(); |
| prepareStateBackend(key); |
| StateNamespace namespace = StateNamespaces.window(windowCoder, window); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "State clear for {} {} {} {}", |
| pTransformId, |
| userStateId, |
| Arrays.toString(keyedStateBackend.getCurrentKey().array()), |
| window); |
| } |
| BagState<V> bagState = |
| stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); |
| bagState.clear(); |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| |
| private void prepareStateBackend(K key) { |
| // Key for state request is shipped already encoded as ByteString, |
| // this is mostly a wrapping with ByteBuffer. We still follow the |
| // usual key encoding procedure. |
| final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); |
| keyedStateBackend.setCurrentKey(encodedKey); |
| } |
| }; |
| } |
| } |
| |
| /** |
| * Note: This is only relevant when we have a stateful DoFn. We want to control the key of the |
| * state backend ourselves and we must avoid any concurrent setting of the current active key. By |
| * overwriting this, we also prevent unnecessary serialization as the key has to be encoded as a |
| * byte array. |
| */ |
| @Override |
| public void setKeyContextElement1(StreamRecord record) {} |
| |
| /** |
| * We don't want to set anything here. This is due to asynchronous nature of processing elements |
| * from the SDK Harness. The Flink runtime sets the current key before calling {@code |
| * processElement}, but this does not work when sending elements to the SDK harness which may be |
| * processed at an arbitrary later point in time. State for keys is also accessed asynchronously |
| * via state requests. |
| * |
| * <p>We set the key only as it is required for 1) State requests 2) Timers (setting/firing). |
| */ |
| @Override |
| public void setCurrentKey(Object key) {} |
| |
| @Override |
| public ByteBuffer getCurrentKey() { |
| // This is the key retrieved by HeapInternalTimerService when setting a Flink timer. |
| // Note: Only called by the TimerService. Must be guarded by a lock. |
| Preconditions.checkState( |
| stateBackendLock.isLocked(), |
| "State backend must be locked when retrieving the current key."); |
| return this.<ByteBuffer>getKeyedStateBackend().getCurrentKey(); |
| } |
| |
| private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerData timerData) { |
| try { |
| LOG.debug("Setting timer: {} {}", timerElement, timerData); |
| // KvToByteBufferKeySelector returns the key encoded |
| ByteBuffer encodedKey = (ByteBuffer) keySelector.getKey(timerElement); |
| // We have to synchronize to ensure the state backend is not concurrently accessed by the |
| // state requests |
| try { |
| stateBackendLock.lock(); |
| getKeyedStateBackend().setCurrentKey(encodedKey); |
| if (timerData.getTimestamp().isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { |
| timerInternals.deleteTimer( |
| timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain()); |
| } else { |
| timerInternals.setTimer(timerData); |
| } |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("Couldn't set timer", e); |
| } |
| } |
| |
| @Override |
| public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { |
| final ByteBuffer encodedKey = (ByteBuffer) timer.getKey(); |
| // We have to synchronize to ensure the state backend is not concurrently accessed by the state |
| // requests |
| try { |
| stateBackendLock.lock(); |
| getKeyedStateBackend().setCurrentKey(encodedKey); |
| super.fireTimer(timer); |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void dispose() throws Exception { |
| // may be called multiple times when an exception is thrown |
| if (stageContext != null) { |
| // Remove the reference to stageContext and make stageContext available for garbage |
| // collection. |
| try (AutoCloseable bundleFactoryCloser = stageBundleFactory; |
| AutoCloseable closable = stageContext) { |
| // DoFnOperator generates another "bundle" for the final watermark |
| // https://issues.apache.org/jira/browse/BEAM-5816 |
| super.dispose(); |
| } finally { |
| stageContext = null; |
| } |
| } |
| } |
| |
| @Override |
| protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) { |
| @SuppressWarnings("unchecked") |
| WindowedValue<KV<Void, Iterable<?>>> value = |
| (WindowedValue<KV<Void, Iterable<?>>>) streamRecord.getValue().getValue(); |
| PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); |
| sideInputHandler.addSideInputValue(sideInput, value.withValue(value.getValue().getValue())); |
| } |
| |
| @Override |
| protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner( |
| DoFnRunner<InputT, OutputT> wrappedRunner) { |
| sdkHarnessRunner = |
| new SdkHarnessDoFnRunner<>( |
| executableStage.getInputPCollection().getId(), |
| stageBundleFactory, |
| stateRequestHandler, |
| progressHandler, |
| outputManager, |
| outputMap, |
| (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(), |
| this::setTimer, |
| () -> FlinkKeyUtils.decodeKey(getCurrentKey(), keyCoder)); |
| |
| return ensureStateCleanup(sdkHarnessRunner); |
| } |
| |
| @Override |
| public void processWatermark(Watermark mark) throws Exception { |
| // Due to the asynchronous communication with the SDK harness, |
| // a bundle might still be in progress and not all items have |
| // yet been received from the SDK harness. If we just set this |
| // watermark as the new output watermark, we could violate the |
| // order of the records, i.e. pending items in the SDK harness |
| // could become "late" although they were "on time". |
| // |
| // We can solve this problem using one of the following options: |
| // |
| // 1) Finish the current bundle and emit this watermark as the |
| // new output watermark. Finishing the bundle ensures that |
| // all the items have been processed by the SDK harness and |
| // received by the outputQueue (see below), where they will |
| // have been emitted to the output stream. |
| // |
| // 2) Put a hold on the output watermark for as long as the current |
| // bundle has not been finished. We have to remember to manually |
| // finish the bundle in case we receive the final watermark. |
| // To avoid latency, we should process this watermark again as |
| // soon as the current bundle is finished. |
| // |
| // Approach 1) is the easiest and gives better latency, yet 2) |
| // gives better throughput due to the bundle not getting cut on |
| // every watermark. So we have implemented 2) below. |
| // |
| if (sdkHarnessRunner.isBundleInProgress()) { |
| if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { |
| invokeFinishBundle(); |
| setPushedBackWatermark(Long.MAX_VALUE); |
| } else { |
| // It is not safe to advance the output watermark yet, so add a hold on the current |
| // output watermark. |
| backupWatermarkHold = Math.max(backupWatermarkHold, getPushbackWatermarkHold()); |
| setPushedBackWatermark(Math.min(currentOutputWatermark, backupWatermarkHold)); |
| super.setBundleFinishedCallback( |
| () -> { |
| try { |
| LOG.debug("processing pushed back watermark: {}", mark); |
| // at this point the bundle is finished, allow the watermark to pass |
| // we are restoring the previous hold in case it was already set for side inputs |
| setPushedBackWatermark(backupWatermarkHold); |
| super.processWatermark(mark); |
| } catch (Exception e) { |
| throw new RuntimeException( |
| "Failed to process pushed back watermark after finished bundle.", e); |
| } |
| }); |
| } |
| } |
| super.processWatermark(mark); |
| } |
| |
| private static class SdkHarnessDoFnRunner<InputT, OutputT> |
| implements DoFnRunner<InputT, OutputT> { |
| |
| private final String mainInput; |
| private final LinkedBlockingQueue<KV<String, OutputT>> outputQueue; |
| private final StageBundleFactory stageBundleFactory; |
| private final StateRequestHandler stateRequestHandler; |
| private final BundleProgressHandler progressHandler; |
| private final BufferedOutputManager<OutputT> outputManager; |
| private final Map<String, TupleTag<?>> outputMap; |
| /** Timer Output Pcollection id => TimerSpec. */ |
| private final Map<String, TimerSpec> timerOutputIdToSpecMap; |
| |
| private final Coder<BoundedWindow> windowCoder; |
| private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration; |
| private final Supplier<Object> keyForTimer; |
| |
| private RemoteBundle remoteBundle; |
| private FnDataReceiver<WindowedValue<?>> mainInputReceiver; |
| |
| public SdkHarnessDoFnRunner( |
| String mainInput, |
| StageBundleFactory stageBundleFactory, |
| StateRequestHandler stateRequestHandler, |
| BundleProgressHandler progressHandler, |
| BufferedOutputManager<OutputT> outputManager, |
| Map<String, TupleTag<?>> outputMap, |
| Coder<BoundedWindow> windowCoder, |
| BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration, |
| Supplier<Object> keyForTimer) { |
| this.mainInput = mainInput; |
| this.stageBundleFactory = stageBundleFactory; |
| this.stateRequestHandler = stateRequestHandler; |
| this.progressHandler = progressHandler; |
| this.outputManager = outputManager; |
| this.outputMap = outputMap; |
| this.timerRegistration = timerRegistration; |
| this.timerOutputIdToSpecMap = new HashMap<>(); |
| this.keyForTimer = keyForTimer; |
| // Gather all timers from all transforms by their output pCollectionId which is unique |
| for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap : |
| stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) { |
| for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerMap.values()) { |
| timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec); |
| } |
| } |
| this.windowCoder = windowCoder; |
| this.outputQueue = new LinkedBlockingQueue<>(); |
| } |
| |
| @Override |
| public void startBundle() { |
| OutputReceiverFactory receiverFactory = |
| new OutputReceiverFactory() { |
| @Override |
| public FnDataReceiver<OutputT> create(String pCollectionId) { |
| return receivedElement -> { |
| // handover to queue, do not block the grpc thread |
| outputQueue.put(KV.of(pCollectionId, receivedElement)); |
| }; |
| } |
| }; |
| |
| try { |
| remoteBundle = |
| stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler); |
| mainInputReceiver = |
| Preconditions.checkNotNull( |
| remoteBundle.getInputReceivers().get(mainInput), |
| "Failed to retrieve main input receiver."); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to start remote bundle", e); |
| } |
| } |
| |
| @Override |
| public void processElement(WindowedValue<InputT> element) { |
| try { |
| LOG.debug("Sending value: {}", element); |
| mainInputReceiver.accept(element); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to process element with SDK harness.", e); |
| } |
| emitResults(); |
| } |
| |
| @Override |
| public void onTimer( |
| String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { |
| Object timerKey = keyForTimer.get(); |
| Preconditions.checkNotNull(timerKey, "Key for timer needs to be set before calling onTimer"); |
| Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle"); |
| LOG.debug("timer callback: {} {} {} {}", timerId, window, timestamp, timeDomain); |
| FnDataReceiver<WindowedValue<?>> timerReceiver = |
| Preconditions.checkNotNull( |
| remoteBundle.getInputReceivers().get(timerId), |
| "No receiver found for timer %s", |
| timerId); |
| WindowedValue<KV<Object, Timer>> timerValue = |
| WindowedValue.of( |
| KV.of(timerKey, Timer.of(timestamp, new byte[0])), |
| timestamp, |
| Collections.singleton(window), |
| PaneInfo.NO_FIRING); |
| try { |
| timerReceiver.accept(timerValue); |
| } catch (Exception e) { |
| throw new RuntimeException( |
| String.format(Locale.ENGLISH, "Failed to process timer %s", timerReceiver), e); |
| } |
| } |
| |
| @Override |
| public void finishBundle() { |
| try { |
| // TODO: it would be nice to emit results as they arrive, can thread wait non-blocking? |
| // close blocks until all results are received |
| remoteBundle.close(); |
| emitResults(); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to finish remote bundle", e); |
| } finally { |
| remoteBundle = null; |
| } |
| } |
| |
| boolean isBundleInProgress() { |
| return remoteBundle != null; |
| } |
| |
| private void emitResults() { |
| KV<String, OutputT> result; |
| while ((result = outputQueue.poll()) != null) { |
| final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); |
| TupleTag<?> tag = outputMap.get(outputPCollectionId); |
| WindowedValue windowedValue = |
| Preconditions.checkNotNull( |
| (WindowedValue) result.getValue(), |
| "Received a null value from the SDK harness for %s", |
| outputPCollectionId); |
| if (tag != null) { |
| // process regular elements |
| outputManager.output(tag, windowedValue); |
| } else { |
| TimerSpec timerSpec = |
| Preconditions.checkNotNull( |
| timerOutputIdToSpecMap.get(outputPCollectionId), |
| "Unknown Pcollectionid %s", |
| outputPCollectionId); |
| Timer timer = |
| Preconditions.checkNotNull( |
| (Timer) ((KV) windowedValue.getValue()).getValue(), |
| "Received null Timer from SDK harness: %s", |
| windowedValue); |
| LOG.debug("Timer received: {} {}", outputPCollectionId, timer); |
| for (Object window : windowedValue.getWindows()) { |
| StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); |
| TimerInternals.TimerData timerData = |
| TimerInternals.TimerData.of( |
| timerSpec.inputCollectionId(), |
| namespace, |
| timer.getTimestamp(), |
| timerSpec.getTimerSpec().getTimeDomain()); |
| timerRegistration.accept(windowedValue, timerData); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT> getFn() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private DoFnRunner<InputT, OutputT> ensureStateCleanup( |
| SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner) { |
| if (keyCoder == null) { |
| // There won't be any state to clean up |
| // (stateful functions have to be keyed) |
| return sdkHarnessRunner; |
| } |
| // Takes care of state cleanup via StatefulDoFnRunner |
| Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); |
| CleanupTimer<InputT> cleanupTimer = |
| new CleanupTimer<>( |
| timerInternals, |
| stateBackendLock, |
| windowingStrategy, |
| keyCoder, |
| windowCoder, |
| getKeyedStateBackend()); |
| |
| List<String> userStates = |
| executableStage.getUserStates().stream() |
| .map(UserStateReference::localName) |
| .collect(Collectors.toList()); |
| |
| KeyedStateBackend<ByteBuffer> stateBackend = getKeyedStateBackend(); |
| StateCleaner stateCleaner = |
| new StateCleaner(userStates, windowCoder, () -> stateBackend.getCurrentKey()); |
| |
| return new StatefulDoFnRunner<InputT, OutputT, BoundedWindow>( |
| sdkHarnessRunner, windowingStrategy, cleanupTimer, stateCleaner) { |
| @Override |
| public void finishBundle() { |
| // Before cleaning up state, first finish bundle for all underlying DoFnRunners |
| super.finishBundle(); |
| // execute cleanup after the bundle is complete |
| if (!stateCleaner.cleanupQueue.isEmpty()) { |
| try { |
| stateBackendLock.lock(); |
| stateCleaner.cleanupState( |
| keyedStateInternals, (key) -> stateBackend.setCurrentKey(key)); |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| } |
| }; |
| } |
| |
| static class CleanupTimer<InputT> implements StatefulDoFnRunner.CleanupTimer<InputT> { |
| private static final String GC_TIMER_ID = "__user-state-cleanup__"; |
| |
| private final TimerInternals timerInternals; |
| private final Lock stateBackendLock; |
| private final WindowingStrategy windowingStrategy; |
| private final Coder keyCoder; |
| private final Coder windowCoder; |
| private final KeyedStateBackend<ByteBuffer> keyedStateBackend; |
| |
| CleanupTimer( |
| TimerInternals timerInternals, |
| Lock stateBackendLock, |
| WindowingStrategy windowingStrategy, |
| Coder keyCoder, |
| Coder windowCoder, |
| KeyedStateBackend<ByteBuffer> keyedStateBackend) { |
| this.timerInternals = timerInternals; |
| this.stateBackendLock = stateBackendLock; |
| this.windowingStrategy = windowingStrategy; |
| this.keyCoder = keyCoder; |
| this.windowCoder = windowCoder; |
| this.keyedStateBackend = keyedStateBackend; |
| } |
| |
| @Override |
| public Instant currentInputWatermarkTime() { |
| return timerInternals.currentInputWatermarkTime(); |
| } |
| |
| @Override |
| public void setForWindow(InputT input, BoundedWindow window) { |
| Preconditions.checkNotNull(input, "Null input passed to CleanupTimer"); |
| // make sure this fires after any window.maxTimestamp() timers |
| Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy).plus(1); |
| // needs to match the encoding in prepareStateBackend for state request handler |
| final ByteBuffer key = FlinkKeyUtils.encodeKey(((KV) input).getKey(), keyCoder); |
| // Ensure the state backend is not concurrently accessed by the state requests |
| try { |
| stateBackendLock.lock(); |
| keyedStateBackend.setCurrentKey(key); |
| timerInternals.setTimer( |
| StateNamespaces.window(windowCoder, window), |
| GC_TIMER_ID, |
| gcTime, |
| TimeDomain.EVENT_TIME); |
| } finally { |
| stateBackendLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean isForWindow( |
| String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { |
| boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); |
| Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy).plus(1); |
| return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); |
| } |
| } |
| |
| static class StateCleaner implements StatefulDoFnRunner.StateCleaner<BoundedWindow> { |
| |
| private final List<String> userStateNames; |
| private final Coder windowCoder; |
| private final ArrayDeque<KV<ByteBuffer, BoundedWindow>> cleanupQueue; |
| private final Supplier<ByteBuffer> keyedStateBackend; |
| |
| StateCleaner( |
| List<String> userStateNames, Coder windowCoder, Supplier<ByteBuffer> keyedStateBackend) { |
| this.userStateNames = userStateNames; |
| this.windowCoder = windowCoder; |
| this.keyedStateBackend = keyedStateBackend; |
| this.cleanupQueue = new ArrayDeque<>(); |
| } |
| |
| @Override |
| public void clearForWindow(BoundedWindow window) { |
| // Executed in the context of onTimer(..) where the correct key will be set |
| cleanupQueue.add(KV.of(keyedStateBackend.get(), window)); |
| } |
| |
| @SuppressWarnings("ByteBufferBackingArray") |
| void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer> keyContextConsumer) { |
| while (!cleanupQueue.isEmpty()) { |
| KV<ByteBuffer, BoundedWindow> kv = cleanupQueue.remove(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("State cleanup for {} {}", Arrays.toString(kv.getKey().array()), kv.getValue()); |
| } |
| keyContextConsumer.accept(kv.getKey()); |
| for (String userState : userStateNames) { |
| StateNamespace namespace = StateNamespaces.window(windowCoder, kv.getValue()); |
| StateTag<BagState<Void>> bagStateStateTag = StateTags.bag(userState, VoidCoder.of()); |
| BagState<?> state = stateInternals.state(namespace, bagStateStateTag); |
| state.clear(); |
| } |
| } |
| } |
| } |
| |
| private static class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> { |
| @ProcessElement |
| public void doNothing(ProcessContext context) {} |
| } |
| } |