blob: 5506c4446e1a0b0c5c19fe50006c3c88d59fa5c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.dynamic.source.enumerator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* A proxy enumerator context that supports life cycle management of underlying threads related to a
* sub {@link org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator}. This is
* motivated by the need to cancel the periodic partition discovery in scheduled tasks when sub
* Kafka Enumerators are restarted. The worker thread pool in {@link
* org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext} should not contain tasks of
* inactive KafkaSourceEnumerators, after source restart.
*
* <p>Due to the inability to cancel scheduled tasks from {@link
* org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext}, this enumerator context
* will safely catch exceptions during enumerator restart and use a closeable proxy scheduler to
* invoke tasks on the coordinator main thread to maintain the single threaded property.
*/
@Internal
public class StoppableKafkaEnumContextProxy
implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(StoppableKafkaEnumContextProxy.class);
private final String kafkaClusterId;
private final KafkaMetadataService kafkaMetadataService;
private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
private final ScheduledExecutorService subEnumeratorWorker;
private boolean noMoreSplits = false;
private volatile boolean isClosing;
/**
* Constructor for the enumerator context.
*
* @param kafkaClusterId The Kafka cluster id in order to maintain the mapping to the sub
* KafkaSourceEnumerator
* @param kafkaMetadataService the Kafka metadata service to facilitate error handling
* @param enumContext the underlying enumerator context
*/
public StoppableKafkaEnumContextProxy(
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService,
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
this.kafkaClusterId = kafkaClusterId;
this.kafkaMetadataService = kafkaMetadataService;
this.enumContext = enumContext;
this.subEnumeratorWorker =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory(kafkaClusterId + "-enum-worker"));
this.isClosing = false;
}
@Override
public SplitEnumeratorMetricGroup metricGroup() {
return enumContext.metricGroup();
}
@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
enumContext.sendEventToSourceReader(subtaskId, event);
}
@Override
public int currentParallelism() {
return enumContext.currentParallelism();
}
@Override
public Map<Integer, ReaderInfo> registeredReaders() {
return enumContext.registeredReaders();
}
/** Wrap splits with cluster metadata. */
@Override
public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {
if (logger.isInfoEnabled()) {
logger.info(
"Assigning {} splits for cluster {}: {}",
newSplitAssignments.assignment().values().stream()
.mapToLong(Collection::size)
.sum(),
kafkaClusterId,
newSplitAssignments);
}
Map<Integer, List<DynamicKafkaSourceSplit>> readerToSplitsMap = new HashMap<>();
newSplitAssignments
.assignment()
.forEach(
(subtask, splits) ->
readerToSplitsMap.put(
subtask,
splits.stream()
.map(
split ->
new DynamicKafkaSourceSplit(
kafkaClusterId, split))
.collect(Collectors.toList())));
if (!readerToSplitsMap.isEmpty()) {
enumContext.assignSplits(new SplitsAssignment<>(readerToSplitsMap));
}
}
@Override
public void signalNoMoreSplits(int subtask) {
// there are no more splits for this cluster
noMoreSplits = true;
}
/** Execute the one time callables in the coordinator. */
@Override
public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
enumContext.callAsync(
wrapCallAsyncCallable(callable), wrapCallAsyncCallableHandler(handler));
}
/**
* Schedule task via internal thread pool to proxy task so that the task handler callback can
* execute in the single threaded source coordinator thread pool to avoid synchronization needs.
*
* <p>Having the scheduled task in the internal thread pool also allows us to cancel the task
* when the context needs to close due to dynamic enumerator restart.
*
* <p>In the case of KafkaEnumerator partition discovery, the callback modifies KafkaEnumerator
* object state.
*/
@Override
public <T> void callAsync(
Callable<T> callable,
BiConsumer<T, Throwable> handler,
long initialDelay,
long period) {
subEnumeratorWorker.scheduleAtFixedRate(
() -> callAsync(callable, handler), initialDelay, period, TimeUnit.MILLISECONDS);
}
@Override
public void runInCoordinatorThread(Runnable runnable) {
enumContext.runInCoordinatorThread(runnable);
}
public boolean isNoMoreSplits() {
return noMoreSplits;
}
/**
* Note that we can't close the source coordinator here, because these contexts can be closed
* during metadata change when the coordinator still needs to continue to run. We can only close
* the coordinator context in Flink job shutdown, which Flink will do for us. That's why there
* is the complexity of the internal thread pools in this class.
*
* <p>TODO: Attach Flink JIRA ticket -- discuss with upstream how to cancel scheduled tasks
* belonging to enumerator.
*/
@Override
public void close() throws Exception {
logger.info("Closing enum context for {}", kafkaClusterId);
if (subEnumeratorWorker != null) {
// KafkaSubscriber worker thread will fail if admin client is closed in the middle.
// Swallow the error and set the context to closed state.
isClosing = true;
subEnumeratorWorker.shutdown();
subEnumeratorWorker.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
/**
* Wraps callable in call async executed in worker thread pool with exception propagation to
* optimize on doing IO in non-coordinator thread.
*/
protected <T> Callable<T> wrapCallAsyncCallable(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (Exception e) {
if (isClosing) {
throw new HandledFlinkKafkaException(e, kafkaClusterId);
}
Optional<KafkaException> throwable =
ExceptionUtils.findThrowable(e, KafkaException.class);
// check if Kafka related and if Kafka cluster is inactive
if (throwable.isPresent()
&& !kafkaMetadataService.isClusterActive(kafkaClusterId)) {
throw new HandledFlinkKafkaException(throwable.get(), kafkaClusterId);
}
throw e;
}
};
}
/**
* Handle exception that is propagated by a callable, executed on coordinator thread. Depending
* on condition(s) the exception may be swallowed or forwarded. This is the Kafka topic
* partition discovery callable handler.
*/
protected <T> BiConsumer<T, Throwable> wrapCallAsyncCallableHandler(
BiConsumer<T, Throwable> mainHandler) {
return (result, t) -> {
// check if exception is handled
Optional<HandledFlinkKafkaException> throwable =
ExceptionUtils.findThrowable(t, HandledFlinkKafkaException.class);
if (throwable.isPresent()) {
logger.warn("Swallowed handled exception for {}.", kafkaClusterId, throwable.get());
return;
}
// let the main handler deal with the potential exception
mainHandler.accept(result, t);
};
}
/**
* General exception to signal to internal exception handling mechanisms that a benign error
* occurred.
*/
@Internal
public static class HandledFlinkKafkaException extends RuntimeException {
private static final String ERROR_MESSAGE = "An error occurred with %s";
private final String kafkaClusterId;
public HandledFlinkKafkaException(Throwable cause, String kafkaClusterId) {
super(cause);
this.kafkaClusterId = kafkaClusterId;
}
public String getMessage() {
return String.format(ERROR_MESSAGE, kafkaClusterId);
}
}
/**
* This factory exposes a way to override the {@link StoppableKafkaEnumContextProxy} used in the
* enumerator. This pluggable factory is extended in unit tests to facilitate invoking the
* periodic discovery loops on demand.
*/
@Internal
public interface StoppableKafkaEnumContextProxyFactory {
StoppableKafkaEnumContextProxy create(
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService);
static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
return (enumContext, kafkaClusterId, kafkaMetadataService) ->
new StoppableKafkaEnumContextProxy(
kafkaClusterId, kafkaMetadataService, enumContext);
}
}
}