blob: 416dbb2a297d5989e0930c089d211e665d8276ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkState;
/**
* An abstract implementation of {@link SourceReader} which provides some synchronization between
* the mail box main thread and the SourceReader internal threads. This class allows user to just
* provide a {@link SplitReader} and snapshot the split state.
*
* <p>This implementation provides the following metrics out of the box:
*
* <ul>
* <li>{@link OperatorIOMetricGroup#getNumRecordsInCounter()}
* </ul>
*
* @param <E> The rich element type that contains information for split state update or timestamp
* extraction.
* @param <T> The final element type to emit.
* @param <SplitT> the immutable split type.
* @param <SplitStateT> the mutable type of split state.
*/
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
/** A queue to buffer the elements fetched by the fetcher thread. */
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
/** The state of the splits. */
private final Map<String, SplitContext<T, SplitStateT>> splitStates;
/** The record emitter to handle the records read by the SplitReaders. */
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
/** The split fetcher manager to run split fetchers. */
protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
/** The configuration for the reader. */
protected final SourceReaderOptions options;
/** The raw configurations that may be used by subclasses. */
protected final Configuration config;
private final Counter numRecordsInCounter;
/** The context of this source reader. */
protected SourceReaderContext context;
/** The latest fetched batch of records-by-split from the split reader. */
@Nullable private RecordsWithSplitIds<E> currentFetch;
@Nullable private SplitContext<T, SplitStateT> currentSplitContext;
@Nullable private SourceOutput<T> currentSplitOutput;
/** Indicating whether the SourceReader will be assigned more splits or not. */
private boolean noMoreSplitsAssignment;
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
this.elementsQueue = elementsQueue;
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
@Override
public void start() {}
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
if (recordsWithSplitId == null) {
return trace(finishedOrAvailableLater());
}
}
// we need to loop here, because we may have to go across splits
while (true) {
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
// else fall through the loop
}
}
private InputStatus trace(InputStatus status) {
LOG.trace("Source reader status: {}", status);
return status;
}
@Nullable
private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
splitFetcherManager.checkErrors();
LOG.trace("Getting next source data batch from queue");
final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
// No element available, set to available later if needed.
return null;
}
currentFetch = recordsWithSplitId;
return recordsWithSplitId;
}
private void finishCurrentFetch(
final RecordsWithSplitIds<E> fetch, final ReaderOutput<T> output) {
currentFetch = null;
currentSplitContext = null;
currentSplitOutput = null;
final Set<String> finishedSplits = fetch.finishedSplits();
if (!finishedSplits.isEmpty()) {
LOG.info("Finished reading split(s) {}", finishedSplits);
Map<String, SplitStateT> stateOfFinishedSplits = new HashMap<>();
for (String finishedSplitId : finishedSplits) {
stateOfFinishedSplits.put(
finishedSplitId, splitStates.remove(finishedSplitId).state);
output.releaseOutputForSplit(finishedSplitId);
}
onSplitFinished(stateOfFinishedSplits);
}
fetch.recycle();
}
private boolean moveToNextSplit(
RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
final String nextSplitId = recordsWithSplitIds.nextSplit();
if (nextSplitId == null) {
LOG.trace("Current fetch is finished.");
finishCurrentFetch(recordsWithSplitIds, output);
return false;
}
currentSplitContext = splitStates.get(nextSplitId);
checkState(currentSplitContext != null, "Have records for a split that was not registered");
currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
LOG.trace("Emitting records from fetch for split {}", nextSplitId);
return true;
}
@Override
public CompletableFuture<Void> isAvailable() {
return currentFetch != null
? FutureCompletingBlockingQueue.AVAILABLE
: elementsQueue.getAvailabilityFuture();
}
@Override
public List<SplitT> snapshotState(long checkpointId) {
List<SplitT> splits = new ArrayList<>();
splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state)));
return splits;
}
@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
// Initialize the state for each split.
splits.forEach(
s ->
splitStates.put(
s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
@Override
public void notifyNoMoreSplits() {
LOG.info("Reader received NoMoreSplits event.");
noMoreSplitsAssignment = true;
elementsQueue.notifyAvailable();
}
@Override
public void handleSourceEvents(SourceEvent sourceEvent) {
LOG.info("Received unhandled source event: {}", sourceEvent);
}
@Override
public void close() throws Exception {
LOG.info("Closing Source Reader.");
splitFetcherManager.close(options.sourceReaderCloseTimeout);
}
/**
* Gets the number of splits the reads has currently assigned.
*
* <p>These are the splits that have been added via {@link #addSplits(List)} and have not yet
* been finished by returning them from the {@link SplitReader#fetch()} as part of {@link
* RecordsWithSplitIds#finishedSplits()}.
*/
public int getNumberOfCurrentlyAssignedSplits() {
return splitStates.size();
}
// -------------------- Abstract method to allow different implementations ------------------
/** Handles the finished splits to clean the state if needed. */
protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds);
/**
* When new splits are added to the reader. The initialize the state of the new splits.
*
* @param split a newly added split.
*/
protected abstract SplitStateT initializedState(SplitT split);
/**
* Convert a mutable SplitStateT to immutable SplitT.
*
* @param splitState splitState.
* @return an immutable Split state.
*/
protected abstract SplitT toSplitType(String splitId, SplitStateT splitState);
// ------------------ private helper methods ---------------------
private InputStatus finishedOrAvailableLater() {
final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
return InputStatus.NOTHING_AVAILABLE;
}
if (elementsQueue.isEmpty()) {
// We may reach here because of exceptional split fetcher, check it.
splitFetcherManager.checkErrors();
return InputStatus.END_OF_INPUT;
} else {
// We can reach this case if we just processed all data from the queue and finished a
// split,
// and concurrently the fetcher finished another split, whose data is then in the queue.
return InputStatus.MORE_AVAILABLE;
}
}
// ------------------ private helper classes ---------------------
private static final class SplitContext<T, SplitStateT> {
final String splitId;
final SplitStateT state;
SourceOutput<T> sourceOutput;
private SplitContext(String splitId, SplitStateT state) {
this.state = state;
this.splitId = splitId;
}
SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
if (sourceOutput == null) {
sourceOutput = mainOutput.createOutputForSplit(splitId);
}
return sourceOutput;
}
}
}