blob: 4748ebb0350a0cfb87c9e36d3112fa74643a7253 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;
import java.util.Collections;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Samza operator for {@link org.apache.beam.sdk.transforms.GroupByKey}. */
public class GroupByKeyOp<K, InputT, OutputT>
implements Op<KeyedWorkItem<K, InputT>, KV<K, OutputT>, K> {
private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyOp.class);
private static final String TIMER_STATE_ID = "timer";
private final TupleTag<KV<K, OutputT>> mainOutputTag;
private final KeyedWorkItemCoder<K, InputT> inputCoder;
private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
private final OutputManagerFactory<KV<K, OutputT>> outputManagerFactory;
private final Coder<K> keyCoder;
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn;
private final String stepName;
private final String stepId;
private final IsBounded isBounded;
private transient StateInternalsFactory<K> stateInternalsFactory;
private transient SamzaTimerInternalsFactory<K> timerInternalsFactory;
private transient DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> fnRunner;
private transient SamzaPipelineOptions pipelineOptions;
public GroupByKeyOp(
TupleTag<KV<K, OutputT>> mainOutputTag,
Coder<KeyedWorkItem<K, InputT>> inputCoder,
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn,
WindowingStrategy<?, BoundedWindow> windowingStrategy,
OutputManagerFactory<KV<K, OutputT>> outputManagerFactory,
String stepName,
String stepId,
IsBounded isBounded) {
this.mainOutputTag = mainOutputTag;
this.windowingStrategy = windowingStrategy;
this.outputManagerFactory = outputManagerFactory;
this.stepName = stepName;
this.stepId = stepId;
this.isBounded = isBounded;
if (!(inputCoder instanceof KeyedWorkItemCoder)) {
throw new IllegalArgumentException(
String.format(
"GroupByKeyOp requires input to use KeyedWorkItemCoder. Got: %s",
inputCoder.getClass()));
}
this.inputCoder = (KeyedWorkItemCoder<K, InputT>) inputCoder;
this.keyCoder = this.inputCoder.getKeyCoder();
this.reduceFn = reduceFn;
}
@Override
public void open(
Config config,
Context context,
Scheduler<KeyedTimerData<K>> timerRegistry,
OpEmitter<KV<K, OutputT>> emitter) {
this.pipelineOptions =
Base64Serializer.deserializeUnchecked(
config.get("beamPipelineOptions"), SerializablePipelineOptions.class)
.get()
.as(SamzaPipelineOptions.class);
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
stepId, null, context.getTaskContext(), pipelineOptions, null);
final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter);
this.stateInternalsFactory =
new SamzaStoreStateInternals.Factory<>(
stepId,
Collections.singletonMap(
SamzaStoreStateInternals.BEAM_STORE,
SamzaStoreStateInternals.getBeamStore(context.getTaskContext())),
keyCoder,
pipelineOptions.getStoreBatchGetSize());
this.timerInternalsFactory =
SamzaTimerInternalsFactory.createTimerInternalFactory(
keyCoder,
timerRegistry,
TIMER_STATE_ID,
nonKeyedStateInternalsFactory,
windowingStrategy,
isBounded,
pipelineOptions);
final DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
GroupAlsoByWindowViaWindowSetNewDoFn.create(
windowingStrategy,
stateInternalsFactory,
timerInternalsFactory,
NullSideInputReader.of(Collections.emptyList()),
reduceFn,
outputManager,
mainOutputTag);
final KeyedInternals<K> keyedInternals =
new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory);
final StepContext stepContext =
new StepContext() {
@Override
public StateInternals stateInternals() {
return keyedInternals.stateInternals();
}
@Override
public TimerInternals timerInternals() {
return keyedInternals.timerInternals();
}
};
final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner =
DoFnRunners.simpleRunner(
PipelineOptionsFactory.create(),
doFn,
NullSideInputReader.of(Collections.emptyList()),
outputManager,
mainOutputTag,
Collections.emptyList(),
stepContext,
null,
Collections.emptyMap(),
windowingStrategy,
DoFnSchemaInformation.create());
final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
this.fnRunner =
DoFnRunnerWithMetrics.wrap(doFnRunner, executionContext.getMetricsContainer(), stepName);
}
@Override
public void processElement(
WindowedValue<KeyedWorkItem<K, InputT>> inputElement, OpEmitter<KV<K, OutputT>> emitter) {
fnRunner.startBundle();
fnRunner.processElement(inputElement);
fnRunner.finishBundle();
}
@Override
public void processWatermark(Instant watermark, OpEmitter<KV<K, OutputT>> ctx) {
timerInternalsFactory.setInputWatermark(watermark);
fnRunner.startBundle();
for (KeyedTimerData<K> keyedTimerData : timerInternalsFactory.removeReadyTimers()) {
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
}
fnRunner.finishBundle();
if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(watermark)) {
timerInternalsFactory.setOutputWatermark(watermark);
ctx.emitWatermark(timerInternalsFactory.getOutputWatermark());
}
}
@Override
public void processTimer(KeyedTimerData<K> keyedTimerData) {
fnRunner.startBundle();
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
fnRunner.finishBundle();
timerInternalsFactory.removeProcessingTimer(keyedTimerData);
}
private void fireTimer(K key, TimerData timer) {
LOG.debug("Firing timer {} for key {}", timer, key);
fnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timer))));
}
}