blob: f2aa2379cdf031050a45397764c7e7452418443f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/** The internal fetcher runnable responsible for polling message from the external system. */
@Internal
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
private final int id;
private final BlockingDeque<SplitFetcherTask> taskQueue;
// track the assigned splits so we can suspend the reader when there is no splits assigned.
private final Map<String, SplitT> assignedSplits;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Consumer<Throwable> errorHandler;
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
private final FetchTask<E, SplitT> fetchTask;
private volatile SplitFetcherTask runningTask = null;
private final Object lock = new Object();
/**
* Flag whether this fetcher has no work assigned at the moment. Fetcher that have work (a
* split) assigned but are currently blocked (for example enqueueing a fetch and hitting the
* element queue limit) are NOT considered idle.
*/
@GuardedBy("lock")
private volatile boolean isIdle;
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook,
Consumer<Collection<String>> splitFinishedHook) {
this.id = id;
this.taskQueue = new LinkedBlockingDeque<>();
this.elementsQueue = elementsQueue;
this.assignedSplits = new HashMap<>();
this.splitReader = splitReader;
this.errorHandler = errorHandler;
this.shutdownHook = shutdownHook;
this.isIdle = true;
this.wakeUp = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
this.fetchTask =
new FetchTask<>(
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
splitFinishedHook.accept(ids);
LOG.info("Finished reading from splits {}", ids);
},
id);
}
@Override
public void run() {
LOG.info("Starting split fetcher {}", id);
try {
while (!closed.get()) {
runOnce();
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
}
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}
/** Package private method to help unit test. */
void runOnce() {
try {
// The fetch task should run if the split assignment is not empty or there is a split
// change.
if (shouldRunFetchTask()) {
runningTask = fetchTask;
} else {
runningTask = taskQueue.take();
}
// Now the running task is not null. If wakeUp() is called after this point,
// task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before
// this point, the wakeUp flag must have already been set. The code hence checks the
// wakeUp
// flag first to avoid an unnecessary task run.
// Note that the runningTask may still encounter the case that the task is waken up
// before
// the it starts running.
LOG.debug("Prepare to run {}", runningTask);
if (!wakeUp.get() && runningTask.run()) {
LOG.debug("Finished running task {}", runningTask);
// the task has finished running. Set it to null so it won't be enqueued.
runningTask = null;
checkAndSetIdle();
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
// If the task is not null that means this task needs to be re-executed. This only
// happens when the task is the fetching task or the task was interrupted.
maybeEnqueueTask(runningTask);
synchronized (wakeUp) {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
}
}
/**
* Add splits to the split fetcher. This operation is asynchronous.
*
* @param splitsToAdd the splits to add.
*/
public void addSplits(List<SplitT> splitsToAdd) {
enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
wakeUp(true);
}
public void enqueueTask(SplitFetcherTask task) {
synchronized (lock) {
taskQueue.offer(task);
isIdle = false;
}
}
public SplitReader<E, SplitT> getSplitReader() {
return splitReader;
}
public int fetcherId() {
return id;
}
/** Shutdown the split fetcher. */
public void shutdown() {
if (closed.compareAndSet(false, true)) {
LOG.info("Shutting down split fetcher {}", id);
wakeUp(false);
}
}
/**
* Package private for unit test.
*
* @return the assigned splits.
*/
Map<String, SplitT> assignedSplits() {
return assignedSplits;
}
/**
* Package private for unit test.
*
* @return true if task queue is not empty, false otherwise.
*/
boolean isIdle() {
return isIdle;
}
/**
* Check whether the fetch task should run. The fetch task should only run when all the
* following conditions are met. 1. there is no task in the task queue to run. 2. there are
* assigned splits Package private for testing purpose.
*
* @return whether the fetch task should be run.
*/
boolean shouldRunFetchTask() {
return taskQueue.isEmpty() && !assignedSplits.isEmpty();
}
/**
* Wake up the fetcher thread. There are only two blocking points in a running fetcher. 1.
* Taking the next task out of the task queue. 2. Running a task.
*
* <p>They need to be waken up differently. If the fetcher is blocking waiting on the next task
* in the task queue, we should just interrupt the fetcher thread. If the fetcher is running the
* user split reader, we should call SplitReader.wakeUp() instead of naively interrupt the
* thread.
*
* <p>The correctness can be think of in the following way. The purpose of wake up is to let the
* fetcher thread go to the very beginning of the running loop. There are three major events in
* each run of the loop.
*
* <ol>
* <li>pick a task (blocking)
* <li>assign the task to runningTask variable.
* <li>run the runningTask. (blocking)
* </ol>
*
* <p>We don't need to worry about things after step 3 because there is no blocking point
* anymore.
*
* <p>We always first set the wakeup flag when waking up the fetcher, then use the value of
* running task to determine where the fetcher thread is.
*
* <ul>
* <li>If runningThread is null, it is before step 2, so we should interrupt fetcher. This
* interruption will not be propagated to the split reader, because the wakeUp flag will
* prevent the fetchTask from running.
* <li>If runningThread is not null, it is after step 2. so we should wakeUp the split reader
* instead of interrupt the fetcher.
* </ul>
*
* <p>The above logic only works in the same {@link #runOnce()} invocation. So we need to
* synchronize to ensure the wake up logic do not touch a different invocation.
*/
void wakeUp(boolean taskOnly) {
// Synchronize to make sure the wake up only works for the current invocation of runOnce().
synchronized (wakeUp) {
// Do not wake up repeatedly.
wakeUp.set(true);
// Now the wakeUp flag is set.
SplitFetcherTask currentTask = runningTask;
if (isRunningTask(currentTask)) {
// The running task may have missed our wakeUp flag and running, wake it up.
LOG.debug("Waking up running task {}", currentTask);
currentTask.wakeUp();
} else if (!taskOnly) {
// The task has not started running yet, and it will not run for this
// runOnce() invocation due to the wakeUp flag. But we might have to
// wake up the fetcher thread in case it is blocking on the task queue.
// Only wake up when the thread has started and there is no running task.
LOG.debug("Waking up fetcher thread.");
taskQueue.add(WAKEUP_TASK);
}
}
}
private void maybeEnqueueTask(SplitFetcherTask task) {
// Only enqueue unfinished non-fetch task.
if (!closed.get()
&& isRunningTask(task)
&& task != fetchTask
&& !taskQueue.offerFirst(task)) {
throw new RuntimeException(
"The task queue is full. This is only theoretically possible when really bad thing happens.");
}
if (task != null) {
LOG.debug("Enqueued task {}", task);
}
}
private boolean isRunningTask(SplitFetcherTask task) {
return task != null && task != WAKEUP_TASK;
}
private void checkAndSetIdle() {
if (shouldIdle()) {
synchronized (lock) {
if (shouldIdle()) {
isIdle = true;
}
}
// because the method might get invoked past the point when the source reader last
// checked
// the elements queue, we need to notify availability in the case when we become idle
elementsQueue.notifyAvailable();
}
}
private boolean shouldIdle() {
return assignedSplits.isEmpty() && taskQueue.isEmpty();
}
// --------------------- Helper class ------------------
private static class DummySplitFetcherTask implements SplitFetcherTask {
private final String name;
private DummySplitFetcherTask(String name) {
this.name = name;
}
@Override
public boolean run() {
return false;
}
@Override
public void wakeUp() {}
@Override
public String toString() {
return name;
}
}
}