| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.spark.stateful; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; |
| import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; |
| import org.apache.beam.runners.core.LateDataUtils; |
| import org.apache.beam.runners.core.OutputWindowedValue; |
| import org.apache.beam.runners.core.ReduceFnRunner; |
| import org.apache.beam.runners.core.SystemReduceFn; |
| import org.apache.beam.runners.core.TimerInternals; |
| import org.apache.beam.runners.core.UnsupportedSideInputReader; |
| import org.apache.beam.runners.core.construction.SerializablePipelineOptions; |
| import org.apache.beam.runners.core.construction.TriggerTranslation; |
| import org.apache.beam.runners.core.metrics.CounterCell; |
| import org.apache.beam.runners.core.metrics.MetricsContainerImpl; |
| import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; |
| import org.apache.beam.runners.core.triggers.TriggerStateMachines; |
| import org.apache.beam.runners.spark.SparkPipelineOptions; |
| import org.apache.beam.runners.spark.coders.CoderHelpers; |
| import org.apache.beam.runners.spark.translation.TranslationUtils; |
| import org.apache.beam.runners.spark.util.ByteArray; |
| import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.IterableCoder; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.VarLongCoder; |
| import org.apache.beam.sdk.metrics.MetricName; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicate; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table; |
| import org.apache.spark.api.java.JavaSparkContext$; |
| import org.apache.spark.api.java.function.FlatMapFunction; |
| import org.apache.spark.streaming.Duration; |
| import org.apache.spark.streaming.api.java.JavaDStream; |
| import org.apache.spark.streaming.api.java.JavaPairDStream; |
| import org.apache.spark.streaming.dstream.DStream; |
| import org.apache.spark.streaming.dstream.PairDStreamFunctions; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import scala.Option; |
| import scala.Tuple2; |
| import scala.Tuple3; |
| import scala.collection.GenTraversable; |
| import scala.collection.Iterator; |
| import scala.collection.Seq; |
| import scala.runtime.AbstractFunction1; |
| |
| /** |
| * An implementation of {@link GroupAlsoByWindow} logic for grouping by windows and controlling |
| * trigger firings and pane accumulation. |
| * |
| * <p>This implementation is a composite of Spark transformations revolving around state management |
| * using Spark's {@link PairDStreamFunctions#updateStateByKey(scala.Function1, |
| * org.apache.spark.Partitioner, boolean, scala.reflect.ClassTag)} to update state with new data and |
| * timers. |
| * |
| * <p>Using updateStateByKey allows to scan through the entire state visiting not just the updated |
| * state (new values for key) but also check if timers are ready to fire. Since updateStateByKey |
| * bounds the types of state and output to be the same, a (state, output) tuple is used, filtering |
| * the state (and output if no firing) in the following steps. |
| */ |
| public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class); |
| |
| /** State and Timers wrapper. */ |
| public static class StateAndTimers implements Serializable { |
| // Serializable state for internals (namespace to state tag to coded value). |
| private final Table<String, String, byte[]> state; |
| private final Collection<byte[]> serTimers; |
| |
| private StateAndTimers( |
| final Table<String, String, byte[]> state, final Collection<byte[]> timers) { |
| this.state = state; |
| this.serTimers = timers; |
| } |
| |
| Table<String, String, byte[]> getState() { |
| return state; |
| } |
| |
| Collection<byte[]> getTimers() { |
| return serTimers; |
| } |
| } |
| |
| private static class OutputWindowedValueHolder<K, V> |
| implements OutputWindowedValue<KV<K, Iterable<V>>> { |
| private final List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<>(); |
| |
| @Override |
| public void outputWindowedValue( |
| final KV<K, Iterable<V>> output, |
| final Instant timestamp, |
| final Collection<? extends BoundedWindow> windows, |
| final PaneInfo pane) { |
| windowedValues.add(WindowedValue.of(output, timestamp, windows, pane)); |
| } |
| |
| private List<WindowedValue<KV<K, Iterable<V>>>> getWindowedValues() { |
| return windowedValues; |
| } |
| |
| @Override |
| public <AdditionalOutputT> void outputWindowedValue( |
| final TupleTag<AdditionalOutputT> tag, |
| final AdditionalOutputT output, |
| final Instant timestamp, |
| final Collection<? extends BoundedWindow> windows, |
| final PaneInfo pane) { |
| throw new UnsupportedOperationException( |
| "Tagged outputs are not allowed in GroupAlsoByWindow."); |
| } |
| } |
| |
| private static class UpdateStateByKeyFunction<K, InputT, W extends BoundedWindow> |
| extends AbstractFunction1< |
| Iterator< |
| Tuple3< |
| /*K*/ ByteArray, |
| Seq</*Itr<WV<I>>*/ byte[]>, |
| Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>, |
| Iterator< |
| Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>> |
| implements Serializable { |
| |
| private class UpdateStateByKeyOutputIterator |
| extends AbstractIterator< |
| Tuple2< |
| /*K*/ ByteArray, |
| Tuple2<StateAndTimers, /*WV<KV<K, KV<Long(Time),Itr<I>>>>*/ List<byte[]>>>> { |
| |
| private final Iterator< |
| Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> |
| input; |
| private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn; |
| private final CounterCell droppedDueToLateness; |
| |
| private SparkStateInternals<K> processPreviousState( |
| final Option<Tuple2<StateAndTimers, List<byte[]>>> prevStateAndTimersOpt, |
| final K key, |
| final SparkTimerInternals timerInternals) { |
| |
| final SparkStateInternals<K> stateInternals; |
| |
| if (prevStateAndTimersOpt.isEmpty()) { |
| // no previous state. |
| stateInternals = SparkStateInternals.forKey(key); |
| } else { |
| // with pre-existing state. |
| final StateAndTimers prevStateAndTimers = prevStateAndTimersOpt.get()._1(); |
| // get state(internals) per key. |
| stateInternals = SparkStateInternals.forKeyAndState(key, prevStateAndTimers.getState()); |
| |
| timerInternals.addTimers( |
| SparkTimerInternals.deserializeTimers( |
| prevStateAndTimers.getTimers(), timerDataCoder)); |
| } |
| |
| return stateInternals; |
| } |
| |
| UpdateStateByKeyOutputIterator( |
| final Iterator< |
| Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> |
| input, |
| final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn, |
| final CounterCell droppedDueToLateness) { |
| this.input = input; |
| this.reduceFn = reduceFn; |
| this.droppedDueToLateness = droppedDueToLateness; |
| } |
| |
| /** |
| * Retrieves the timers that are eligible for processing by {@link |
| * org.apache.beam.runners.core.ReduceFnRunner}. |
| * |
| * @return A collection of timers that are eligible for processing. For a {@link |
| * TimeDomain#EVENT_TIME} timer, this implies that the watermark has passed the timer's |
| * timestamp. For other <code>TimeDomain</code>s (e.g., {@link |
| * TimeDomain#PROCESSING_TIME}), a timer is always considered eligible for processing (no |
| * restrictions). |
| */ |
| private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing( |
| final Collection<TimerInternals.TimerData> timers, final Instant inputWatermark) { |
| final Predicate<TimerInternals.TimerData> eligibleForProcessing = |
| timer -> |
| !timer.getDomain().equals(TimeDomain.EVENT_TIME) |
| || inputWatermark.isAfter(timer.getTimestamp()); |
| |
| return FluentIterable.from(timers).filter(eligibleForProcessing).toSet(); |
| } |
| |
| @Override |
| protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>> |
| computeNext() { |
| // input iterator is a Spark partition (~bundle), containing keys and their |
| // (possibly) previous-state and (possibly) new data. |
| while (input.hasNext()) { |
| |
| // for each element in the partition: |
| final Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>> next = |
| input.next(); |
| |
| final ByteArray encodedKey = next._1(); |
| final Seq<byte[]> encodedKeyedElements = next._2(); |
| final Option<Tuple2<StateAndTimers, List<byte[]>>> prevStateAndTimersOpt = next._3(); |
| |
| final K key = CoderHelpers.fromByteArray(encodedKey.getValue(), keyCoder); |
| |
| final Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks = |
| GlobalWatermarkHolder.get(getBatchDuration(options)); |
| |
| final SparkTimerInternals timerInternals = |
| SparkTimerInternals.forStreamFromSources(sourceIds, watermarks); |
| |
| final SparkStateInternals<K> stateInternals = |
| processPreviousState(prevStateAndTimersOpt, key, timerInternals); |
| |
| final ExecutableTriggerStateMachine triggerStateMachine = |
| ExecutableTriggerStateMachine.create( |
| TriggerStateMachines.stateMachineForTrigger( |
| TriggerTranslation.toProto(windowingStrategy.getTrigger()))); |
| |
| final OutputWindowedValueHolder<K, InputT> outputHolder = |
| new OutputWindowedValueHolder<>(); |
| |
| final ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner = |
| new ReduceFnRunner<>( |
| key, |
| windowingStrategy, |
| triggerStateMachine, |
| stateInternals, |
| timerInternals, |
| outputHolder, |
| new UnsupportedSideInputReader("GroupAlsoByWindow"), |
| reduceFn, |
| options.get()); |
| |
| if (!encodedKeyedElements.isEmpty()) { |
| // new input for key. |
| try { |
| // cast to GenTraversable to avoid a ambiguous call to head() which can come from |
| // multiple super interfacesof Seq<byte[]> |
| byte[] headBytes = ((GenTraversable<byte[]>) encodedKeyedElements).head(); |
| final KV<Long, Iterable<WindowedValue<InputT>>> keyedElements = |
| CoderHelpers.fromByteArray(headBytes, KvCoder.of(VarLongCoder.of(), itrWvCoder)); |
| |
| final Long rddTimestamp = keyedElements.getKey(); |
| |
| LOG.debug( |
| logPrefix + ": processing RDD with timestamp: {}, watermarks: {}", |
| rddTimestamp, |
| watermarks); |
| |
| final Iterable<WindowedValue<InputT>> elements = keyedElements.getValue(); |
| |
| LOG.trace(logPrefix + ": input elements: {}", elements); |
| |
| // Incoming expired windows are filtered based on |
| // timerInternals.currentInputWatermarkTime() and the configured allowed |
| // lateness. Note that this is done prior to calling |
| // timerInternals.advanceWatermark so essentially the inputWatermark is |
| // the highWatermark of the previous batch and the lowWatermark of the |
| // current batch. |
| // The highWatermark of the current batch will only affect filtering |
| // as of the next batch. |
| final Iterable<WindowedValue<InputT>> nonExpiredElements = |
| Lists.newArrayList( |
| LateDataUtils.dropExpiredWindows( |
| key, elements, timerInternals, windowingStrategy, droppedDueToLateness)); |
| |
| LOG.trace(logPrefix + ": non expired input elements: {}", nonExpiredElements); |
| |
| reduceFnRunner.processElements(nonExpiredElements); |
| } catch (final Exception e) { |
| throw new RuntimeException("Failed to process element with ReduceFnRunner", e); |
| } |
| } else if (stateInternals.getState().isEmpty()) { |
| // no input and no state -> GC evict now. |
| continue; |
| } |
| try { |
| // advance the watermark to HWM to fire by timers. |
| LOG.debug( |
| logPrefix + ": timerInternals before advance are {}", timerInternals.toString()); |
| |
| // store the highWatermark as the new inputWatermark to calculate triggers |
| timerInternals.advanceWatermark(); |
| |
| final Collection<TimerInternals.TimerData> timersEligibleForProcessing = |
| filterTimersEligibleForProcessing( |
| timerInternals.getTimers(), timerInternals.currentInputWatermarkTime()); |
| |
| LOG.debug( |
| logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing); |
| |
| // Note that at this point, the watermark has already advanced since |
| // timerInternals.advanceWatermark() has been called and the highWatermark |
| // is now stored as the new inputWatermark, according to which triggers are |
| // calculated. |
| // Note 2: The implicit contract between the runner and reduceFnRunner is that |
| // event_time based triggers are only delivered if the watermark has passed their |
| // timestamp. |
| // Note 3: Timer cleanups are performed by the GC timer scheduled by reduceFnRunner as |
| // part of processing timers. |
| // Note 4: Even if a given timer is deemed eligible for processing, it does not |
| // necessarily mean that it will actually fire (firing is determined by the trigger |
| // itself, not the TimerInternals/TimerData objects). |
| reduceFnRunner.onTimers(timersEligibleForProcessing); |
| } catch (final Exception e) { |
| throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e); |
| } |
| // this is mostly symbolic since actual persist is done by emitting output. |
| reduceFnRunner.persist(); |
| // obtain output, if fired. |
| final List<WindowedValue<KV<K, Iterable<InputT>>>> outputs = |
| outputHolder.getWindowedValues(); |
| |
| if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { |
| // empty outputs are filtered later using DStream filtering |
| final StateAndTimers updated = |
| new StateAndTimers( |
| stateInternals.getState(), |
| SparkTimerInternals.serializeTimers( |
| timerInternals.getTimers(), timerDataCoder)); |
| |
| /* |
| Not something we want to happen in production, but is very helpful |
| when debugging - TRACE. |
| */ |
| LOG.trace(logPrefix + ": output elements are {}", Joiner.on(", ").join(outputs)); |
| |
| // persist Spark's state by outputting. |
| final List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder); |
| return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput)); |
| } |
| // an empty state with no output, can be evicted completely - do nothing. |
| } |
| return endOfData(); |
| } |
| } |
| |
| private final FullWindowedValueCoder<InputT> wvCoder; |
| private final Coder<K> keyCoder; |
| private final List<Integer> sourceIds; |
| private final TimerInternals.TimerDataCoder timerDataCoder; |
| private final WindowingStrategy<?, W> windowingStrategy; |
| private final SerializablePipelineOptions options; |
| private final IterableCoder<WindowedValue<InputT>> itrWvCoder; |
| private final String logPrefix; |
| private final Coder<WindowedValue<KV<K, Iterable<InputT>>>> wvKvIterCoder; |
| |
| UpdateStateByKeyFunction( |
| final List<Integer> sourceIds, |
| final WindowingStrategy<?, W> windowingStrategy, |
| final FullWindowedValueCoder<InputT> wvCoder, |
| final Coder<K> keyCoder, |
| final SerializablePipelineOptions options, |
| final String logPrefix) { |
| this.wvCoder = wvCoder; |
| this.keyCoder = keyCoder; |
| this.sourceIds = sourceIds; |
| this.timerDataCoder = timerDataCoderOf(windowingStrategy); |
| this.windowingStrategy = windowingStrategy; |
| this.options = options; |
| this.itrWvCoder = IterableCoder.of(wvCoder); |
| this.logPrefix = logPrefix; |
| this.wvKvIterCoder = |
| windowedValueKeyValueCoderOf( |
| keyCoder, |
| wvCoder.getValueCoder(), |
| ((FullWindowedValueCoder<InputT>) wvCoder).getWindowCoder()); |
| } |
| |
| @Override |
| public Iterator< |
| Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>> |
| apply( |
| final Iterator< |
| Tuple3< |
| /*K*/ ByteArray, |
| Seq</*Itr<WV<I>>*/ byte[]>, |
| Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>> |
| input) { |
| // --- ACTUAL STATEFUL OPERATION: |
| // |
| // Input Iterator: the partition (~bundle) of a co-grouping of the input |
| // and the previous state (if exists). |
| // |
| // Output Iterator: the output key, and the updated state. |
| // |
| // possible input scenarios for (K, Seq, Option<S>): |
| // (1) Option<S>.isEmpty: new data with no previous state. |
| // (2) Seq.isEmpty: no new data, but evaluating previous state (timer-like behaviour). |
| // (3) Seq.nonEmpty && Option<S>.isDefined: new data with previous state. |
| |
| final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn = |
| SystemReduceFn.buffering(wvCoder.getValueCoder()); |
| |
| final MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider"); |
| |
| final CounterCell droppedDueToClosedWindow = |
| cellProvider.getCounter( |
| MetricName.named( |
| SparkGroupAlsoByWindowViaWindowSet.class, |
| GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER)); |
| |
| final CounterCell droppedDueToLateness = |
| cellProvider.getCounter( |
| MetricName.named( |
| SparkGroupAlsoByWindowViaWindowSet.class, |
| GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER)); |
| |
| // log if there's something to log. |
| final long lateDropped = droppedDueToLateness.getCumulative(); |
| if (lateDropped > 0) { |
| LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped)); |
| droppedDueToLateness.inc(-droppedDueToLateness.getCumulative()); |
| } |
| final long closedWindowDropped = droppedDueToClosedWindow.getCumulative(); |
| if (closedWindowDropped > 0) { |
| LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped)); |
| droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative()); |
| } |
| |
| return scala.collection.JavaConversions.asScalaIterator( |
| new UpdateStateByKeyOutputIterator(input, reduceFn, droppedDueToLateness)); |
| } |
| } |
| |
| private static <K, InputT> |
| FullWindowedValueCoder<KV<K, Iterable<InputT>>> windowedValueKeyValueCoderOf( |
| final Coder<K> keyCoder, |
| final Coder<InputT> iCoder, |
| final Coder<? extends BoundedWindow> wCoder) { |
| return FullWindowedValueCoder.of(KvCoder.of(keyCoder, IterableCoder.of(iCoder)), wCoder); |
| } |
| |
| private static <W extends BoundedWindow> TimerInternals.TimerDataCoder timerDataCoderOf( |
| final WindowingStrategy<?, W> windowingStrategy) { |
| return TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); |
| } |
| |
| private static void checkpointIfNeeded( |
| final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream, |
| final SerializablePipelineOptions options) { |
| |
| final Long checkpointDurationMillis = getBatchDuration(options); |
| |
| if (checkpointDurationMillis > 0) { |
| firedStream.checkpoint(new Duration(checkpointDurationMillis)); |
| } |
| } |
| |
| private static Long getBatchDuration(final SerializablePipelineOptions options) { |
| return options.get().as(SparkPipelineOptions.class).getCheckpointDurationMillis(); |
| } |
| |
| private static <K, InputT> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> stripStateValues( |
| final DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream, |
| final Coder<K> keyCoder, |
| final FullWindowedValueCoder<InputT> wvCoder) { |
| |
| /*K*/ |
| /*WV<KV<K, Itr<I>>>*/ |
| /*K*/ |
| /*WV<KV<K, Itr<I>>>*/ |
| return JavaPairDStream.fromPairDStream( |
| firedStream, |
| JavaSparkContext$.MODULE$.fakeClassTag(), |
| JavaSparkContext$.MODULE$.fakeClassTag()) |
| .filter( |
| // filter output if defined. |
| t2 -> !t2._2()._2().isEmpty()) |
| .flatMap( |
| new FlatMapFunction< |
| Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>, |
| WindowedValue<KV<K, Iterable<InputT>>>>() { |
| |
| private final FullWindowedValueCoder<KV<K, Iterable<InputT>>> |
| windowedValueKeyValueCoder = |
| windowedValueKeyValueCoderOf( |
| keyCoder, wvCoder.getValueCoder(), wvCoder.getWindowCoder()); |
| |
| @Override |
| public java.util.Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call( |
| final Tuple2< |
| /*K*/ ByteArray, |
| Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>> |
| t2) |
| throws Exception { |
| // drop the state since it is already persisted at this point. |
| // return in serialized form. |
| return CoderHelpers.fromByteArrays(t2._2()._2(), windowedValueKeyValueCoder) |
| .iterator(); |
| } |
| }); |
| } |
| |
| private static <K, InputT> PairDStreamFunctions<ByteArray, byte[]> buildPairDStream( |
| final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, |
| final Coder<K> keyCoder, |
| final Coder<WindowedValue<InputT>> wvCoder) { |
| |
| // we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819. |
| // we also have a broader API for Scala (access to the actual key and entire iterator). |
| // we use coders to convert objects in the PCollection to byte arrays, so they |
| // can be transferred over the network for the shuffle and be in serialized form |
| // for checkpointing. |
| // for readability, we add comments with actual type next to byte[]. |
| // to shorten line length, we use: |
| // ---- WV: WindowedValue |
| // ---- Iterable: Itr |
| // ---- AccumT: A |
| // ---- InputT: I |
| // we use mapPartitions with the RDD API because its the only available API |
| // that allows to preserve partitioning. |
| final DStream<Tuple2<ByteArray, byte[]>> tupleDStream = |
| inputDStream |
| .transformToPair( |
| (rdd, time) -> |
| rdd.mapPartitions( |
| TranslationUtils.functionToFlatMapFunction(WindowedValue::getValue), |
| true) |
| .mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true) |
| .mapValues( |
| // add the batch timestamp for visibility (e.g., debugging) |
| values -> KV.of(time.milliseconds(), values)) |
| // move to bytes representation and use coders for deserialization |
| // because of checkpointing. |
| .mapPartitionsToPair( |
| TranslationUtils.pairFunctionToPairFlatMapFunction( |
| CoderHelpers.toByteFunction( |
| keyCoder, |
| KvCoder.of(VarLongCoder.of(), IterableCoder.of(wvCoder)))), |
| true)) |
| .dstream(); |
| |
| return DStream.toPairDStreamFunctions( |
| tupleDStream, |
| JavaSparkContext$.MODULE$.fakeClassTag(), |
| JavaSparkContext$.MODULE$.fakeClassTag(), |
| null); |
| } |
| |
| public static <K, InputT, W extends BoundedWindow> |
| JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow( |
| final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, |
| final Coder<K> keyCoder, |
| final Coder<WindowedValue<InputT>> wvCoder, |
| final WindowingStrategy<?, W> windowingStrategy, |
| final SerializablePipelineOptions options, |
| final List<Integer> sourceIds, |
| final String transformFullName) { |
| |
| final PairDStreamFunctions<ByteArray, byte[]> pairDStream = |
| buildPairDStream(inputDStream, keyCoder, wvCoder); |
| |
| // use updateStateByKey to scan through the state and update elements and timers. |
| final UpdateStateByKeyFunction<K, InputT, W> updateFunc = |
| new UpdateStateByKeyFunction<>( |
| sourceIds, |
| windowingStrategy, |
| (FullWindowedValueCoder<InputT>) wvCoder, |
| keyCoder, |
| options, |
| transformFullName); |
| |
| final DStream< |
| Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>> |
| firedStream = |
| pairDStream.updateStateByKey( |
| updateFunc, |
| pairDStream.defaultPartitioner(pairDStream.defaultPartitioner$default$1()), |
| true, |
| JavaSparkContext$.MODULE$.fakeClassTag()); |
| |
| checkpointIfNeeded(firedStream, options); |
| |
| // filter state-only output (nothing to fire) and remove the state from the output. |
| return stripStateValues(firedStream, keyCoder, (FullWindowedValueCoder<InputT>) wvCoder); |
| } |
| } |