| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.base.source.reader.fetcher; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.connector.source.SourceSplit; |
| import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; |
| import org.apache.flink.connector.base.source.reader.SourceReaderBase; |
| import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; |
| import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| |
| /** |
| * A class responsible for starting the {@link SplitFetcher} and manage the life cycles of them. |
| * This class works with the {@link SourceReaderBase}. |
| * |
| * <p>The split fetcher manager could be used to support different threading models by implementing |
| * the {@link #addSplits(List)} method differently. For example, a single thread split fetcher |
| * manager would only start a single fetcher and assign all the splits to it. A one-thread-per-split |
| * fetcher may spawn a new thread every time a new split is assigned. |
| */ |
| @Internal |
| public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { |
| private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class); |
| |
| private final Consumer<Throwable> errorHandler; |
| |
| /** An atomic integer to generate monotonically increasing fetcher ids. */ |
| private final AtomicInteger fetcherIdGenerator; |
| |
| /** A supplier to provide split readers. */ |
| private final Supplier<SplitReader<E, SplitT>> splitReaderFactory; |
| |
| /** Uncaught exception in the split fetchers. */ |
| private final AtomicReference<Throwable> uncaughtFetcherException; |
| |
| /** The element queue that the split fetchers will put elements into. */ |
| private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; |
| |
| /** A map keeping track of all the split fetchers. */ |
| protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers; |
| |
| /** |
| * An executor service with two threads. One for the fetcher and one for the future completing |
| * thread. |
| */ |
| private final ExecutorService executors; |
| |
| /** Indicating the split fetcher manager has closed or not. */ |
| private volatile boolean closed; |
| |
| /** |
| * Hook for handling finished splits in {@link SplitFetcher}, usually used for testing split |
| * finishing behavior of {@link SplitFetcher} and {@link SplitReader}. |
| */ |
| private final Consumer<Collection<String>> splitFinishedHook; |
| |
| /** |
| * Create a split fetcher manager. |
| * |
| * @param elementsQueue the queue that split readers will put elements into. |
| * @param splitReaderFactory a supplier that could be used to create split readers. |
| */ |
| public SplitFetcherManager( |
| FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, |
| Supplier<SplitReader<E, SplitT>> splitReaderFactory) { |
| this(elementsQueue, splitReaderFactory, (ignore) -> {}); |
| } |
| |
| /** |
| * Create a split fetcher manager. |
| * |
| * @param elementsQueue the queue that split readers will put elements into. |
| * @param splitReaderFactory a supplier that could be used to create split readers. |
| * @param splitFinishedHook Hook for handling finished splits in split fetchers. |
| */ |
| @VisibleForTesting |
| public SplitFetcherManager( |
| FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, |
| Supplier<SplitReader<E, SplitT>> splitReaderFactory, |
| Consumer<Collection<String>> splitFinishedHook) { |
| this.elementsQueue = elementsQueue; |
| this.errorHandler = |
| new Consumer<Throwable>() { |
| @Override |
| public void accept(Throwable t) { |
| LOG.error("Received uncaught exception.", t); |
| if (!uncaughtFetcherException.compareAndSet(null, t)) { |
| // Add the exception to the exception list. |
| uncaughtFetcherException.get().addSuppressed(t); |
| } |
| // Wake up the main thread to let it know the exception. |
| elementsQueue.notifyAvailable(); |
| } |
| }; |
| this.splitReaderFactory = splitReaderFactory; |
| this.splitFinishedHook = splitFinishedHook; |
| this.uncaughtFetcherException = new AtomicReference<>(null); |
| this.fetcherIdGenerator = new AtomicInteger(0); |
| this.fetchers = new ConcurrentHashMap<>(); |
| |
| // Create the executor with a thread factory that fails the source reader if one of |
| // the fetcher thread exits abnormally. |
| final String taskThreadName = Thread.currentThread().getName(); |
| this.executors = |
| Executors.newCachedThreadPool( |
| r -> new Thread(r, "Source Data Fetcher for " + taskThreadName)); |
| this.closed = false; |
| } |
| |
| public abstract void addSplits(List<SplitT> splitsToAdd); |
| |
| protected void startFetcher(SplitFetcher<E, SplitT> fetcher) { |
| executors.submit(fetcher); |
| } |
| |
| /** |
| * Synchronize method to ensure no fetcher is created after the split fetcher manager has |
| * closed. |
| * |
| * @return the created split fetcher. |
| * @throws IllegalStateException if the split fetcher manager has closed. |
| */ |
| protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() { |
| if (closed) { |
| throw new IllegalStateException("The split fetcher manager has closed."); |
| } |
| // Create SplitReader. |
| SplitReader<E, SplitT> splitReader = splitReaderFactory.get(); |
| |
| int fetcherId = fetcherIdGenerator.getAndIncrement(); |
| SplitFetcher<E, SplitT> splitFetcher = |
| new SplitFetcher<>( |
| fetcherId, |
| elementsQueue, |
| splitReader, |
| errorHandler, |
| () -> { |
| fetchers.remove(fetcherId); |
| // We need this to synchronize status of fetchers to concurrent partners |
| // as |
| // ConcurrentHashMap's aggregate status methods including size, isEmpty, |
| // and |
| // containsValue are not designed for program control. |
| elementsQueue.notifyAvailable(); |
| }, |
| this.splitFinishedHook); |
| fetchers.put(fetcherId, splitFetcher); |
| return splitFetcher; |
| } |
| |
| /** |
| * Check and shutdown the fetchers that have completed their work. |
| * |
| * @return true if all the fetchers have completed the work, false otherwise. |
| */ |
| public boolean maybeShutdownFinishedFetchers() { |
| Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> iter = fetchers.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = iter.next(); |
| SplitFetcher<E, SplitT> fetcher = entry.getValue(); |
| if (fetcher.isIdle()) { |
| LOG.info("Closing splitFetcher {} because it is idle.", entry.getKey()); |
| fetcher.shutdown(); |
| iter.remove(); |
| } |
| } |
| return fetchers.isEmpty(); |
| } |
| |
| /** |
| * Close the split fetcher manager. |
| * |
| * @param timeoutMs the max time in milliseconds to wait. |
| * @throws Exception when failed to close the split fetcher manager. |
| */ |
| public synchronized void close(long timeoutMs) throws Exception { |
| closed = true; |
| fetchers.values().forEach(SplitFetcher::shutdown); |
| executors.shutdown(); |
| if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { |
| LOG.warn( |
| "Failed to close the source reader in {} ms. There are still {} split fetchers running", |
| timeoutMs, |
| fetchers.size()); |
| } |
| } |
| |
| public void checkErrors() { |
| if (uncaughtFetcherException.get() != null) { |
| throw new RuntimeException( |
| "One or more fetchers have encountered exception", |
| uncaughtFetcherException.get()); |
| } |
| } |
| |
| // ----------------------- |
| |
| @VisibleForTesting |
| public int getNumAliveFetchers() { |
| return fetchers.size(); |
| } |
| } |