blob: c4f624e443c97c7caeaf3daa97828be7958273e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It
* implements the interface of {@link PushingAsyncDataInput} which is naturally compatible with one
* input processing in runtime stack.
*
* <p><b>Important Note on Serialization:</b> The SourceOperator inherits the {@link
* java.io.Serializable} interface from the StreamOperator, but is in fact NOT serializable. The
* operator must only be instantiated in the StreamTask from its factory.
*
* @param <OUT> The output type of the operator.
*/
@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
implements OperatorEventHandler,
PushingAsyncDataInput<OUT>,
TimestampsAndWatermarks.WatermarkUpdateListener {
private static final long serialVersionUID = 1405537676017904695L;
// Package private for unit test.
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
/**
* The factory for the source reader. This is a workaround, because currently the SourceReader
* must be lazily initialized, which is mainly because the metrics groups that the reader relies
* on is lazily initialized.
*/
private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory;
/**
* The serializer for the splits, applied to the split types before storing them in the reader
* state.
*/
private final SimpleVersionedSerializer<SplitT> splitSerializer;
/** The event gateway through which this operator talks to its coordinator. */
private final OperatorEventGateway operatorEventGateway;
/** The factory for timestamps and watermark generators. */
private final WatermarkStrategy<OUT> watermarkStrategy;
private final WatermarkAlignmentParams watermarkAlignmentParams;
/** The Flink configuration. */
private final Configuration configuration;
/**
* Host name of the machine where the operator runs, to support locality aware work assignment.
*/
private final String localHostname;
/** Whether to emit intermediate watermarks or only one final watermark at the end of input. */
private final boolean emitProgressiveWatermarks;
// ---- lazily initialized fields (these fields are the "hot" fields) ----
/** The source reader that does most of the work. */
private SourceReader<OUT, SplitT> sourceReader;
private ReaderOutput<OUT> currentMainOutput;
private DataOutput<OUT> lastInvokedOutput;
private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
private boolean idle = false;
/** The state that holds the currently assigned splits. */
private ListState<SplitT> readerState;
/**
* The event time and watermarking logic. Ideally this would be eagerly passed into this
* operator, but we currently need to instantiate this lazily, because the metric groups exist
* only later.
*/
private TimestampsAndWatermarks<OUT> eventTimeLogic;
/** A mode to control the behaviour of the {@link #emitNext(DataOutput)} method. */
private OperatingMode operatingMode;
private final CompletableFuture<Void> finished = new CompletableFuture<>();
private final SourceOperatorAvailabilityHelper availabilityHelper =
new SourceOperatorAvailabilityHelper();
private final List<SplitT> outputPendingSplits = new ArrayList<>();
private int numSplits;
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
private final Set<String> currentlyPausedSplits = new HashSet<>();
private enum OperatingMode {
READING,
WAITING_FOR_ALIGNMENT,
OUTPUT_NOT_INITIALIZED,
SOURCE_DRAINED,
SOURCE_STOPPED,
DATA_FINISHED
}
private InternalSourceReaderMetricGroup sourceMetricGroup;
private long currentMaxDesiredWatermark = Watermark.MAX_WATERMARK.getTimestamp();
/** Can be not completed only in {@link OperatingMode#WAITING_FOR_ALIGNMENT} mode. */
private CompletableFuture<Void> waitingForAlignmentFuture =
CompletableFuture.completedFuture(null);
private @Nullable LatencyMarkerEmitter<OUT> latencyMarkerEmitter;
private final boolean allowUnalignedSourceSplits;
private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
OperatorEventGateway operatorEventGateway,
SimpleVersionedSerializer<SplitT> splitSerializer,
WatermarkStrategy<OUT> watermarkStrategy,
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
boolean emitProgressiveWatermarks,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
this.splitSerializer = checkNotNull(splitSerializer);
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.processingTimeService = timeService;
this.configuration = checkNotNull(configuration);
this.localHostname = checkNotNull(localHostname);
this.emitProgressiveWatermarks = emitProgressiveWatermarks;
this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
}
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
initSourceMetricGroup();
}
@VisibleForTesting
protected void initSourceMetricGroup() {
sourceMetricGroup = InternalSourceReaderMetricGroup.wrap(getMetricGroup());
}
/**
* Initializes the reader. The code from this method should ideally happen in the constructor or
* in the operator factory even. It has to happen here at a slightly later stage, because of the
* lazy metric initialization.
*
* <p>Calling this method explicitly is an optional way to have the reader initialization a bit
* earlier than in open(), as needed by the {@link
* org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask}
*
* <p>This code should move to the constructor once the metric groups are available at task
* setup time.
*/
public void initReader() throws Exception {
if (sourceReader != null) {
return;
}
final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
final SourceReaderContext context =
new SourceReaderContext() {
@Override
public SourceReaderMetricGroup metricGroup() {
return sourceMetricGroup;
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public String getLocalHostName() {
return localHostname;
}
@Override
public int getIndexOfSubtask() {
return subtaskIndex;
}
@Override
public void sendSplitRequest() {
operatorEventGateway.sendEventToCoordinator(
new RequestSplitEvent(getLocalHostName()));
}
@Override
public void sendSourceEventToCoordinator(SourceEvent event) {
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return new UserCodeClassLoader() {
@Override
public ClassLoader asClassLoader() {
return getRuntimeContext().getUserCodeClassLoader();
}
@Override
public void registerReleaseHookIfAbsent(
String releaseHookName, Runnable releaseHook) {
getRuntimeContext()
.registerUserCodeClassLoaderReleaseHookIfAbsent(
releaseHookName, releaseHook);
}
};
}
@Override
public int currentParallelism() {
return getRuntimeContext().getNumberOfParallelSubtasks();
}
};
sourceReader = readerFactory.apply(context);
}
public InternalSourceReaderMetricGroup getSourceMetricGroup() {
return sourceMetricGroup;
}
@Override
public void open() throws Exception {
initReader();
// in the future when we this one is migrated to the "eager initialization" operator
// (StreamOperatorV2), then we should evaluate this during operator construction.
if (emitProgressiveWatermarks) {
eventTimeLogic =
TimestampsAndWatermarks.createProgressiveEventTimeLogic(
watermarkStrategy,
sourceMetricGroup,
getProcessingTimeService(),
getExecutionConfig().getAutoWatermarkInterval());
} else {
eventTimeLogic =
TimestampsAndWatermarks.createNoOpEventTimeLogic(
watermarkStrategy, sourceMetricGroup);
}
// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
LOG.info("Restoring state for {} split(s) to reader.", splits.size());
sourceReader.addSplits(splits);
}
// Register the reader to the coordinator.
registerReader();
sourceMetricGroup.idlingStarted();
// Start the reader after registration, sending messages in start is allowed.
sourceReader.start();
eventTimeLogic.startPeriodicWatermarkEmits();
}
@Override
public void finish() throws Exception {
stopInternalServices();
super.finish();
finished.complete(null);
}
private void stopInternalServices() {
if (eventTimeLogic != null) {
eventTimeLogic.stopPeriodicWatermarkEmits();
}
if (latencyMarkerEmitter != null) {
latencyMarkerEmitter.close();
}
}
public CompletableFuture<Void> stop(StopMode mode) {
switch (operatingMode) {
case WAITING_FOR_ALIGNMENT:
case OUTPUT_NOT_INITIALIZED:
case READING:
this.operatingMode =
mode == StopMode.DRAIN
? OperatingMode.SOURCE_DRAINED
: OperatingMode.SOURCE_STOPPED;
availabilityHelper.forceStop();
if (this.operatingMode == OperatingMode.SOURCE_STOPPED) {
stopInternalServices();
finished.complete(null);
return finished;
}
break;
}
return finished;
}
@Override
public void close() throws Exception {
if (sourceReader != null) {
sourceReader.close();
}
super.close();
}
@Override
public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {
// guarding an assumptions we currently make due to the fact that certain classes
// assume a constant output, this assumption does not need to stand if we emitted all
// records. In that case the output will change to FinishedDataOutput
assert lastInvokedOutput == output
|| lastInvokedOutput == null
|| this.operatingMode == OperatingMode.DATA_FINISHED;
// short circuit the hot path. Without this short circuit (READING handled in the
// switch/case) InputBenchmark.mapSink was showing a performance regression.
if (operatingMode != OperatingMode.READING) {
return emitNextNotReading(output);
}
InputStatus status;
do {
status = sourceReader.pollNext(currentMainOutput);
} while (status == InputStatus.MORE_AVAILABLE
&& canEmitBatchOfRecords.check()
&& !shouldWaitForAlignment());
return convertToInternalStatus(status);
}
private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
if (watermarkAlignmentParams.isEnabled()) {
// Only wrap the output when watermark alignment is enabled, as otherwise this
// introduces a small performance regression (probably because of an extra
// virtual call)
processingTimeService.scheduleWithFixedDelay(
this::emitLatestWatermark,
watermarkAlignmentParams.getUpdateInterval(),
watermarkAlignmentParams.getUpdateInterval());
}
initializeMainOutput(output);
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
case SOURCE_STOPPED:
this.operatingMode = OperatingMode.DATA_FINISHED;
sourceMetricGroup.idlingStarted();
return DataInputStatus.STOPPED;
case SOURCE_DRAINED:
this.operatingMode = OperatingMode.DATA_FINISHED;
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_DATA;
case DATA_FINISHED:
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_INPUT;
case WAITING_FOR_ALIGNMENT:
checkState(!waitingForAlignmentFuture.isDone());
checkState(shouldWaitForAlignment());
return convertToInternalStatus(InputStatus.NOTHING_AVAILABLE);
case READING:
default:
throw new IllegalStateException("Unknown operating mode: " + operatingMode);
}
}
private void initializeMainOutput(DataOutput<OUT> output) {
currentMainOutput = eventTimeLogic.createMainOutput(output, this);
initializeLatencyMarkerEmitter(output);
lastInvokedOutput = output;
// Create per-split output for pending splits added before main output is initialized
createOutputForSplits(outputPendingSplits);
this.operatingMode = OperatingMode.READING;
}
private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) {
long latencyTrackingInterval =
getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration()
.getLong(MetricOptions.LATENCY_INTERVAL);
if (latencyTrackingInterval > 0) {
latencyMarkerEmitter =
new LatencyMarkerEmitter<>(
getProcessingTimeService(),
output::emitLatencyMarker,
latencyTrackingInterval,
getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
}
private DataInputStatus convertToInternalStatus(InputStatus inputStatus) {
switch (inputStatus) {
case MORE_AVAILABLE:
return DataInputStatus.MORE_AVAILABLE;
case NOTHING_AVAILABLE:
sourceMetricGroup.idlingStarted();
return DataInputStatus.NOTHING_AVAILABLE;
case END_OF_INPUT:
this.operatingMode = OperatingMode.DATA_FINISHED;
sourceMetricGroup.idlingStarted();
return DataInputStatus.END_OF_DATA;
default:
throw new IllegalArgumentException("Unknown input status: " + inputStatus);
}
}
private void emitLatestWatermark(long time) {
checkState(currentMainOutput != null);
if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
return;
}
operatorEventGateway.sendEventToCoordinator(
new ReportedWatermarkEvent(
idle ? Watermark.MAX_WATERMARK.getTimestamp() : latestWatermark));
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
readerState.update(sourceReader.snapshotState(checkpointId));
}
@Override
public CompletableFuture<?> getAvailableFuture() {
switch (operatingMode) {
case WAITING_FOR_ALIGNMENT:
return availabilityHelper.update(waitingForAlignmentFuture);
case OUTPUT_NOT_INITIALIZED:
case READING:
return availabilityHelper.update(sourceReader.isAvailable());
case SOURCE_STOPPED:
case SOURCE_DRAINED:
case DATA_FINISHED:
return AvailabilityProvider.AVAILABLE;
default:
throw new IllegalStateException("Unknown operating mode: " + operatingMode);
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
sourceReader.notifyCheckpointComplete(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);
sourceReader.notifyCheckpointAborted(checkpointId);
}
@SuppressWarnings("unchecked")
public void handleOperatorEvent(OperatorEvent event) {
if (event instanceof WatermarkAlignmentEvent) {
updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);
checkWatermarkAlignment();
checkSplitWatermarkAlignment();
} else if (event instanceof AddSplitEvent) {
handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));
} else if (event instanceof SourceEventWrapper) {
sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
} else if (event instanceof NoMoreSplitsEvent) {
sourceReader.notifyNoMoreSplits();
} else {
throw new IllegalStateException("Received unexpected operator event " + event);
}
}
private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
try {
List<SplitT> newSplits = event.splits(splitSerializer);
numSplits += newSplits.size();
if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
// For splits arrived before the main output is initialized, store them into the
// pending list. Outputs of these splits will be created once the main output is
// ready.
outputPendingSplits.addAll(newSplits);
} else {
// Create output directly for new splits if the main output is already initialized.
createOutputForSplits(newSplits);
}
sourceReader.addSplits(newSplits);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
}
}
private void createOutputForSplits(List<SplitT> newSplits) {
for (SplitT split : newSplits) {
currentMainOutput.createOutputForSplit(split.splitId());
}
}
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
@Override
public void updateIdle(boolean isIdle) {
this.idle = isIdle;
}
@Override
public void updateCurrentEffectiveWatermark(long watermark) {
latestWatermark = watermark;
checkWatermarkAlignment();
}
@Override
public void updateCurrentSplitWatermark(String splitId, long watermark) {
splitCurrentWatermarks.put(splitId, watermark);
if (numSplits > 1
&& watermark > currentMaxDesiredWatermark
&& !currentlyPausedSplits.contains(splitId)) {
pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList());
currentlyPausedSplits.add(splitId);
}
}
/**
* Finds the splits that are beyond the current max watermark and pauses them. At the same time,
* splits that have been paused and where the global watermark caught up are resumed.
*
* <p>Note: This takes effect only if there are multiple splits, otherwise it does nothing.
*/
private void checkSplitWatermarkAlignment() {
if (numSplits <= 1) {
// A single split can't overtake any other splits assigned to this operator instance.
// It is sufficient for the source to stop processing.
return;
}
Collection<String> splitsToPause = new ArrayList<>();
Collection<String> splitsToResume = new ArrayList<>();
splitCurrentWatermarks.forEach(
(splitId, splitWatermark) -> {
if (splitWatermark > currentMaxDesiredWatermark) {
splitsToPause.add(splitId);
} else if (currentlyPausedSplits.contains(splitId)) {
splitsToResume.add(splitId);
}
});
splitsToPause.removeAll(currentlyPausedSplits);
if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
pauseOrResumeSplits(splitsToPause, splitsToResume);
currentlyPausedSplits.addAll(splitsToPause);
splitsToResume.forEach(currentlyPausedSplits::remove);
}
}
private void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {
try {
sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
} catch (UnsupportedOperationException e) {
if (!allowUnalignedSourceSplits) {
throw e;
}
}
}
private void checkWatermarkAlignment() {
if (operatingMode == OperatingMode.READING) {
checkState(waitingForAlignmentFuture.isDone());
if (shouldWaitForAlignment()) {
operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
waitingForAlignmentFuture = new CompletableFuture<>();
}
} else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
checkState(!waitingForAlignmentFuture.isDone());
if (!shouldWaitForAlignment()) {
operatingMode = OperatingMode.READING;
waitingForAlignmentFuture.complete(null);
}
}
}
private boolean shouldWaitForAlignment() {
return currentMaxDesiredWatermark < latestWatermark;
}
private void registerReader() {
operatorEventGateway.sendEventToCoordinator(
new ReaderRegistrationEvent(
getRuntimeContext().getIndexOfThisSubtask(), localHostname));
}
// --------------- methods for unit tests ------------
@VisibleForTesting
public SourceReader<OUT, SplitT> getSourceReader() {
return sourceReader;
}
@VisibleForTesting
ListState<SplitT> getReaderState() {
return readerState;
}
private static class SourceOperatorAvailabilityHelper {
private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>();
private final MultipleFuturesAvailabilityHelper availabilityHelper;
private SourceOperatorAvailabilityHelper() {
availabilityHelper = new MultipleFuturesAvailabilityHelper(2);
availabilityHelper.anyOf(0, forcedStopFuture);
}
public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) {
if (sourceReaderFuture == AvailabilityProvider.AVAILABLE
|| sourceReaderFuture.isDone()) {
return AvailabilityProvider.AVAILABLE;
}
availabilityHelper.resetToUnAvailable();
availabilityHelper.anyOf(0, forcedStopFuture);
availabilityHelper.anyOf(1, sourceReaderFuture);
return availabilityHelper.getAvailableFuture();
}
public void forceStop() {
forcedStopFuture.complete(null);
}
}
}