blob: 72ae6cd79f3cff7b153b2ddf0b2256427f1d6587 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It
* implements the interface of {@link PushingAsyncDataInput} for naturally compatible with one input
* processing in runtime stack.
*
* <p><b>Important Note on Serialization:</b> The SourceOperator inherits the {@link
* java.io.Serializable} interface from the StreamOperator, but is in fact NOT serializable. The
* operator must only be instantiates in the StreamTask from its factory.
*
* @param <OUT> The output type of the operator.
*/
@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
private static final long serialVersionUID = 1405537676017904695L;
// Package private for unit test.
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
/**
* The factory for the source reader. This is a workaround, because currently the SourceReader
* must be lazily initialized, which is mainly because the metrics groups that the reader relies
* on is lazily initialized.
*/
private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory;
/**
* The serializer for the splits, applied to the split types before storing them in the reader
* state.
*/
private final SimpleVersionedSerializer<SplitT> splitSerializer;
/** The event gateway through which this operator talks to its coordinator. */
private final OperatorEventGateway operatorEventGateway;
/** The factory for timestamps and watermark generators. */
private final WatermarkStrategy<OUT> watermarkStrategy;
/** The Flink configuration. */
private final Configuration configuration;
/**
* Host name of the machine where the operator runs, to support locality aware work assignment.
*/
private final String localHostname;
/** Whether to emit intermediate watermarks or only one final watermark at the end of input. */
private final boolean emitProgressiveWatermarks;
// ---- lazily initialized fields (these fields are the "hot" fields) ----
/** The source reader that does most of the work. */
private SourceReader<OUT, SplitT> sourceReader;
private ReaderOutput<OUT> currentMainOutput;
private DataOutput<OUT> lastInvokedOutput;
/** The state that holds the currently assigned splits. */
private ListState<SplitT> readerState;
/**
* The event time and watermarking logic. Ideally this would be eagerly passed into this
* operator, but we currently need to instantiate this lazily, because the metric groups exist
* only later.
*/
private TimestampsAndWatermarks<OUT> eventTimeLogic;
/** A mode to control the behaviour of the {@link #emitNext(DataOutput)} method. */
private OperatingMode operatingMode;
private final CompletableFuture<Void> finished = new CompletableFuture<>();
private final SourceOperatorAvailabilityHelper availabilityHelper =
new SourceOperatorAvailabilityHelper();
private enum OperatingMode {
READING,
OUTPUT_NOT_INITIALIZED,
SOURCE_STOPPED,
DATA_FINISHED
}
private InternalSourceReaderMetricGroup sourceMetricGroup;
public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
OperatorEventGateway operatorEventGateway,
SimpleVersionedSerializer<SplitT> splitSerializer,
WatermarkStrategy<OUT> watermarkStrategy,
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
boolean emitProgressiveWatermarks) {
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
this.splitSerializer = checkNotNull(splitSerializer);
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.processingTimeService = timeService;
this.configuration = checkNotNull(configuration);
this.localHostname = checkNotNull(localHostname);
this.emitProgressiveWatermarks = emitProgressiveWatermarks;
this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
}
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
initSourceMetricGroup();
}
@VisibleForTesting
protected void initSourceMetricGroup() {
sourceMetricGroup = InternalSourceReaderMetricGroup.wrap(getMetricGroup());
}
/**
* Initializes the reader. The code from this method should ideally happen in the constructor or
* in the operator factory even. It has to happen here at a slightly later stage, because of the
* lazy metric initialization.
*
* <p>Calling this method explicitly is an optional way to have the reader initialization a bit
* earlier than in open(), as needed by the {@link
* org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask}
*
* <p>This code should move to the constructor once the metric groups are available at task
* setup time.
*/
public void initReader() throws Exception {
if (sourceReader != null) {
return;
}
final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
final SourceReaderContext context =
new SourceReaderContext() {
@Override
public SourceReaderMetricGroup metricGroup() {
return sourceMetricGroup;
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public String getLocalHostName() {
return localHostname;
}
@Override
public int getIndexOfSubtask() {
return subtaskIndex;
}
@Override
public void sendSplitRequest() {
operatorEventGateway.sendEventToCoordinator(
new RequestSplitEvent(getLocalHostName()));
}
@Override
public void sendSourceEventToCoordinator(SourceEvent event) {
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return new UserCodeClassLoader() {
@Override
public ClassLoader asClassLoader() {
return getRuntimeContext().getUserCodeClassLoader();
}
@Override
public void registerReleaseHookIfAbsent(
String releaseHookName, Runnable releaseHook) {
getRuntimeContext()
.registerUserCodeClassLoaderReleaseHookIfAbsent(
releaseHookName, releaseHook);
}
};
}
};
sourceReader = readerFactory.apply(context);
}
public InternalSourceReaderMetricGroup getSourceMetricGroup() {
return sourceMetricGroup;
}
@Override
public void open() throws Exception {
initReader();
// in the future when we this one is migrated to the "eager initialization" operator
// (StreamOperatorV2), then we should evaluate this during operator construction.
if (emitProgressiveWatermarks) {
eventTimeLogic =
TimestampsAndWatermarks.createProgressiveEventTimeLogic(
watermarkStrategy,
sourceMetricGroup,
getProcessingTimeService(),
getExecutionConfig().getAutoWatermarkInterval());
} else {
eventTimeLogic =
TimestampsAndWatermarks.createNoOpEventTimeLogic(
watermarkStrategy, sourceMetricGroup);
}
// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
sourceReader.addSplits(splits);
}
// Register the reader to the coordinator.
registerReader();
sourceMetricGroup.idlingStarted();
// Start the reader after registration, sending messages in start is allowed.
sourceReader.start();
eventTimeLogic.startPeriodicWatermarkEmits();
}
@Override
public void finish() throws Exception {
if (eventTimeLogic != null) {
eventTimeLogic.stopPeriodicWatermarkEmits();
}
super.finish();
finished.complete(null);
}
public CompletableFuture<Void> stop() {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
case READING:
this.operatingMode = OperatingMode.SOURCE_STOPPED;
availabilityHelper.forceStop();
break;
}
return finished;
}
@Override
public void close() throws Exception {
if (sourceReader != null) {
sourceReader.close();
}
super.close();
}
@Override
public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {
// guarding an assumptions we currently make due to the fact that certain classes
// assume a constant output, this assumption does not need to stand if we emitted all
// records. In that case the output will change to FinishedDataOutput
assert lastInvokedOutput == output
|| lastInvokedOutput == null
|| this.operatingMode == OperatingMode.DATA_FINISHED;
// short circuit the hot path. Without this short circuit (READING handled in the
// switch/case) InputBenchmark.mapSink was showing a performance regression.
if (operatingMode == OperatingMode.READING) {
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
}
return emitNextNotReading(output);
}
private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
currentMainOutput = eventTimeLogic.createMainOutput(output);
lastInvokedOutput = output;
this.operatingMode = OperatingMode.READING;
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
case SOURCE_STOPPED:
this.operatingMode = OperatingMode.DATA_FINISHED;
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_DATA;
case DATA_FINISHED:
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_INPUT;
case READING:
default:
throw new IllegalStateException("Unknown operating mode: " + operatingMode);
}
}
private DataInputStatus convertToInternalStatus(InputStatus inputStatus) {
switch (inputStatus) {
case MORE_AVAILABLE:
return DataInputStatus.MORE_AVAILABLE;
case NOTHING_AVAILABLE:
sourceMetricGroup.idlingStarted();
return DataInputStatus.NOTHING_AVAILABLE;
case END_OF_INPUT:
this.operatingMode = OperatingMode.DATA_FINISHED;
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_DATA;
default:
throw new IllegalArgumentException("Unknown input status: " + inputStatus);
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
readerState.update(sourceReader.snapshotState(checkpointId));
}
@Override
public CompletableFuture<?> getAvailableFuture() {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
case READING:
return availabilityHelper.update(sourceReader.isAvailable());
case SOURCE_STOPPED:
case DATA_FINISHED:
return AvailabilityProvider.AVAILABLE;
default:
throw new IllegalStateException("Unknown operating mode: " + operatingMode);
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
sourceReader.notifyCheckpointComplete(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);
sourceReader.notifyCheckpointAborted(checkpointId);
}
@SuppressWarnings("unchecked")
public void handleOperatorEvent(OperatorEvent event) {
if (event instanceof AddSplitEvent) {
try {
sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits(splitSerializer));
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
}
} else if (event instanceof SourceEventWrapper) {
sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
} else if (event instanceof NoMoreSplitsEvent) {
sourceReader.notifyNoMoreSplits();
} else {
throw new IllegalStateException("Received unexpected operator event " + event);
}
}
private void registerReader() {
operatorEventGateway.sendEventToCoordinator(
new ReaderRegistrationEvent(
getRuntimeContext().getIndexOfThisSubtask(), localHostname));
}
// --------------- methods for unit tests ------------
@VisibleForTesting
public SourceReader<OUT, SplitT> getSourceReader() {
return sourceReader;
}
@VisibleForTesting
ListState<SplitT> getReaderState() {
return readerState;
}
private static class SourceOperatorAvailabilityHelper {
private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>();
private CompletableFuture<Void> currentReaderFuture;
private CompletableFuture<?> currentCombinedFuture;
public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) {
if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) {
return sourceReaderFuture;
} else if (sourceReaderFuture == currentReaderFuture) {
return currentCombinedFuture;
} else {
currentReaderFuture = sourceReaderFuture;
currentCombinedFuture =
CompletableFuture.anyOf(forcedStopFuture, sourceReaderFuture);
return currentCombinedFuture;
}
}
public void forceStop() {
this.forcedStopFuture.complete(null);
}
}
}