blob: 369ec9d15f0e1740f8968f26eff5a36da56deedb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.converter.record.ConverterConfig;
import org.apache.rocketmq.connect.runtime.converter.record.ConverterType;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
* A worker to schedule all connectors and tasks in a process.
*/
public class Worker {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
/**
* Current running connectors.
*/
private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();
/**
* Current tasks state.
*/
private Map<Runnable, Long/*timestamp*/> pendingTasks = new ConcurrentHashMap<>();
private Set<Runnable> runningTasks = new ConcurrentSet<>();
private Map<Runnable, Long/*timestamp*/> stoppingTasks = new ConcurrentHashMap<>();
private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
private Set<Runnable> errorTasks = new ConcurrentSet<>();
private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();
Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
/**
* Current running tasks to its Future map.
*/
private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
/**
* Thread pool for connectors and tasks.
*/
private final ExecutorService taskExecutor;
/**
* Position management for source tasks.
*/
private final PositionManagementService positionManagementService;
/**
* A scheduled task to commit source position of source tasks.
*/
// private final TaskPositionCommitService taskPositionCommitService;
private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
private final ConnectConfig workerConfig;
private final Plugin plugin;
/**
* Atomic state variable
*/
private AtomicReference<WorkerState> workerState;
private StateMachineService stateMachineService = new StateMachineService();
private final ConnectStatsManager connectStatsManager;
private final ConnectStatsService connectStatsService;
public Worker(ConnectConfig workerConfig,
PositionManagementService positionManagementService,
ConfigManagementService configManagementService,
Plugin plugin, AbstractConnectController connectController) {
this.workerConfig = workerConfig;
this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
this.positionManagementService = positionManagementService;
this.plugin = plugin;
this.connectStatsManager = connectController.getConnectStatsManager();
this.connectStatsService = connectController.getConnectStatsService();
this.sourceTaskOffsetCommitter = Optional.of(new SourceTaskOffsetCommitter(workerConfig));
}
public void start() {
workerState = new AtomicReference<>(WorkerState.STARTED);
stateMachineService.start();
}
/**
* Start a collection of connectors with the given configs. If a connector is already started with the same configs,
* it will not start again. If a connector is already started but not contained in the new configs, it will stop.
*
* @param connectorConfigs
* @param connectController
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
AbstractConnectController connectController) throws Exception {
Set<WorkerConnector> stoppedConnector = new HashSet<>();
for (WorkerConnector workerConnector : workingConnectors) {
try {
String connectorName = workerConnector.getConnectorName();
ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
workerConnector.stop();
log.info("Connector {} stop", workerConnector.getConnectorName());
stoppedConnector.add(workerConnector);
} else if (!keyValue.equals(workerConnector.getKeyValue())) {
workerConnector.reconfigure(keyValue);
}
} catch (Exception e) {
log.error("stop or reconfigure connector error, connectName: " + workerConnector.getConnectorName(), e);
}
}
workingConnectors.removeAll(stoppedConnector);
if (null == connectorConfigs || 0 == connectorConfigs.size()) {
return;
}
Map<String, ConnectKeyValue> newConnectors = new HashMap<>();
for (String connectorName : connectorConfigs.keySet()) {
boolean isNewConnector = true;
for (WorkerConnector workerConnector : workingConnectors) {
if (workerConnector.getConnectorName().equals(connectorName)) {
isNewConnector = false;
break;
}
}
if (isNewConnector) {
newConnectors.put(connectorName, connectorConfigs.get(connectorName));
}
}
for (String connectorName : newConnectors.keySet()) {
ClassLoader savedLoader = plugin.currentThreadLoader();
try {
ConnectKeyValue keyValue = newConnectors.get(connectorName);
String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
ClassLoader connectorLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
// instance connector
final Connector connector = plugin.newConnector(connectorClass);
WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
// start connector
workerConnector.initialize();
workerConnector.start();
log.info("Connector {} start", workerConnector.getConnectorName());
this.workingConnectors.add(workerConnector);
} catch (Exception e) {
log.error("worker connector start exception. workerName: " + connectorName, e);
} finally {
// compare and swap
Plugin.compareAndSwapLoaders(savedLoader);
}
}
}
/**
* Start a collection of tasks with the given configs. If a task is already started with the same configs, it will
* not start again. If a task is already started but not contained in the new configs, it will stop.
*
* @param taskConfigs
* @throws Exception
*/
public void startTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
synchronized (latestTaskConfigs) {
this.latestTaskConfigs = taskConfigs;
}
}
private boolean isConfigInSet(ConnectKeyValue keyValue, Set<Runnable> set) {
for (Runnable runnable : set) {
ConnectKeyValue taskConfig = null;
if (runnable instanceof WorkerSourceTask) {
taskConfig = ((WorkerSourceTask) runnable).currentTaskConfig();
} else if (runnable instanceof WorkerSinkTask) {
taskConfig = ((WorkerSinkTask) runnable).currentTaskConfig();
} else if (runnable instanceof WorkerDirectTask) {
taskConfig = ((WorkerDirectTask) runnable).currentTaskConfig();
}
if (keyValue.equals(taskConfig)) {
return true;
}
}
return false;
}
/**
* We can choose to persist in-memory task status
* so we can view history tasks
*/
public void stop() {
workerState.set(WorkerState.TERMINATED);
try {
sourceTaskOffsetCommitter.ifPresent(committer -> committer.close(5000));
taskExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Task termination error.", e);
}
stateMachineService.shutdown();
}
public Set<WorkerConnector> getWorkingConnectors() {
return workingConnectors;
}
public void setWorkingConnectors(
Set<WorkerConnector> workingConnectors) {
this.workingConnectors = workingConnectors;
}
/**
* Beaware that we are not creating a defensive copy of these tasks
* So developers should only use these references for read-only purposes.
* These variables should be immutable
*
* @return
*/
public Set<Runnable> getWorkingTasks() {
return runningTasks;
}
public Set<Runnable> getErrorTasks() {
return errorTasks;
}
public Set<Runnable> getPendingTasks() {
return pendingTasks.keySet();
}
public Set<Runnable> getStoppedTasks() {
return stoppedTasks;
}
public Set<Runnable> getStoppingTasks() {
return stoppingTasks.keySet();
}
public Set<Runnable> getCleanedErrorTasks() {
return cleanedErrorTasks;
}
public Set<Runnable> getCleanedStoppedTasks() {
return cleanedStoppedTasks;
}
public void setWorkingTasks(Set<Runnable> workingTasks) {
this.runningTasks = workingTasks;
}
public void maintainConnectorState() {
}
/**
* maintain task state
*
* @throws Exception
*/
public void maintainTaskState() throws Exception {
Map<String, List<ConnectKeyValue>> connectorConfig = new HashMap<>();
synchronized (latestTaskConfigs) {
connectorConfig.putAll(latestTaskConfigs);
}
// STEP 1: check running tasks and put to error status
checkRunningTasks(connectorConfig);
// get new Tasks
Map<String, List<ConnectKeyValue>> newTasks = newTasks(connectorConfig);
// STEP 2: try to create new tasks
startTask(newTasks);
// STEP 3: check all pending state
checkPendingTask();
// STEP 4 check stopping tasks
checkStoppingTasks();
// STEP 5 check error tasks
checkErrorTasks();
// STEP 6 check errorTasks and stopped tasks
checkStoppedTasks();
}
/**
* check running task
*
* @param connectorConfig
*/
private void checkRunningTasks(Map<String, List<ConnectKeyValue>> connectorConfig) {
// STEP 1: check running tasks and put to error status
for (Runnable runnable : runningTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
String connectorName = workerTask.id().connector();
ConnectKeyValue taskConfig = workerTask.currentTaskConfig();
List<ConnectKeyValue> taskConfigs = connectorConfig.get(connectorName);
WorkerTaskState state = ((WorkerTask) runnable).getState();
switch (state) {
case ERROR:
errorTasks.add(runnable);
runningTasks.remove(runnable);
break;
case RUNNING:
if (isNeedStop(taskConfig, taskConfigs)) {
try {
// remove committer offset
sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
workerTask.doClose();
} catch (Exception e) {
log.error("workerTask stop exception, workerTask: " + workerTask.currentTaskConfig(), e);
}
log.info("Task stopping, connector name {}, config {}", workerTask.id().connector(), workerTask.currentTaskConfig());
runningTasks.remove(runnable);
stoppingTasks.put(runnable, System.currentTimeMillis());
}
break;
default:
log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
((WorkerTask) runnable).id().connector(), state);
break;
}
}
}
/**
* check is need stop
*
* @param taskConfig
* @param keyValues
* @return
*/
private boolean isNeedStop(ConnectKeyValue taskConfig, List<ConnectKeyValue> keyValues) {
if (CollectionUtils.isEmpty(keyValues)) {
return true;
}
for (ConnectKeyValue keyValue : keyValues) {
if (keyValue.equals(taskConfig)) {
// not stop
return false;
}
}
return true;
}
/**
* check stopped tasks
*/
private void checkStoppedTasks() {
for (Runnable runnable : stoppedTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
workerTask.cleanup();
Future future = taskToFutureMap.get(runnable);
try {
if (null != future) {
future.get(workerConfig.getMaxStartTimeoutMills(), TimeUnit.MILLISECONDS);
} else {
log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
}
} catch (ExecutionException e) {
Throwable t = e.getCause();
log.info("[BUG] Stopped Tasks should not throw any exception");
t.printStackTrace();
} catch (CancellationException e) {
log.info("[BUG] Stopped Tasks throws PrintStackTrace");
e.printStackTrace();
} catch (TimeoutException e) {
log.info("[BUG] Stopped Tasks should not throw any exception");
e.printStackTrace();
} catch (InterruptedException e) {
log.info("[BUG] Stopped Tasks should not throw any exception");
e.printStackTrace();
} finally {
// remove committer offset
sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
future.cancel(true);
taskToFutureMap.remove(runnable);
stoppedTasks.remove(runnable);
cleanedStoppedTasks.add(runnable);
}
}
}
private void checkErrorTasks() {
for (Runnable runnable : errorTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
Future future = taskToFutureMap.get(runnable);
try {
if (null != future) {
future.get(workerConfig.getMaxStopTimeoutMills(), TimeUnit.MILLISECONDS);
} else {
log.error("[BUG] errorTasks reference not found in taskFutureMap");
}
} catch (ExecutionException e) {
log.error("Execution exception , {}", e);
} catch (CancellationException | TimeoutException | InterruptedException e) {
log.error("error, {}", e);
} finally {
// remove committer offset
sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
future.cancel(true);
workerTask.cleanup();
taskToFutureMap.remove(runnable);
errorTasks.remove(runnable);
cleanedErrorTasks.add(runnable);
}
}
}
private void checkStoppingTasks() {
for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
Runnable runnable = entry.getKey();
Long stopTimestamp = entry.getValue();
Long currentTimeMillis = System.currentTimeMillis();
Future future = taskToFutureMap.get(runnable);
WorkerTaskState state = ((WorkerTask) runnable).getState();
// exited normally
switch (state) {
case STOPPED:
// concurrent modification Exception ? Will it pop that in the
if (null == future || !future.isDone()) {
log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
}
stoppingTasks.remove(runnable);
stoppedTasks.add(runnable);
break;
case ERROR:
stoppingTasks.remove(runnable);
errorTasks.add(runnable);
break;
case STOPPING:
if (currentTimeMillis - stopTimestamp > workerConfig.getMaxStopTimeoutMills()) {
((WorkerTask) runnable).timeout();
stoppingTasks.remove(runnable);
errorTasks.add(runnable);
}
break;
default:
log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
((WorkerTask) runnable).id().connector(), state.toString());
}
}
}
private void checkPendingTask() {
for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
Runnable runnable = entry.getKey();
Long startTimestamp = entry.getValue();
Long currentTimeMillis = System.currentTimeMillis();
WorkerTaskState state = ((WorkerTask) runnable).getState();
switch (state) {
case ERROR:
errorTasks.add(runnable);
pendingTasks.remove(runnable);
break;
case RUNNING:
runningTasks.add(runnable);
pendingTasks.remove(runnable);
break;
case NEW:
log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
break;
case PENDING:
if (currentTimeMillis - startTimestamp > workerConfig.getMaxStartTimeoutMills()) {
((WorkerTask) runnable).timeout();
pendingTasks.remove(runnable);
errorTasks.add(runnable);
}
break;
default:
log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
((WorkerTask) runnable).id().connector(), state.toString());
break;
}
}
}
/**
* start task
*
* @param newTasks
* @throws Exception
*/
private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
for (String connectorName : newTasks.keySet()) {
for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
int taskId = keyValue.getInt(RuntimeConfigDefine.TASK_ID);
ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);
String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
createDirectTask(id, keyValue);
continue;
}
ClassLoader savedLoader = plugin.currentThreadLoader();
try {
String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
// new task
final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
final Task task = plugin.newTask(taskClass);
/**
* create key/value converter
*/
RecordConverter valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
RecordConverter keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (task instanceof SourceTask) {
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
(SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
Future future = taskExecutor.submit(workerSourceTask);
// schedule offset committer
sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
taskToFutureMap.put(workerSourceTask, future);
this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
} else if (task instanceof SinkTask) {
log.info("sink task config keyValue is {}", keyValue.getProperties());
DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(workerConfig, id, keyValue);
Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
}
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
(SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
Future future = taskExecutor.submit(workerSinkTask);
taskToFutureMap.put(workerSinkTask, future);
this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
}
Plugin.compareAndSwapLoaders(savedLoader);
} catch (Exception e) {
log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
Plugin.compareAndSwapLoaders(savedLoader);
}
}
}
}
private Map<String, List<ConnectKeyValue>> newTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
for (String connectorName : taskConfigs.keySet()) {
for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
boolean isNewTask = true;
if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
isNewTask = false;
}
if (isNewTask) {
if (!newTasks.containsKey(connectorName)) {
newTasks.put(connectorName, new ArrayList<>());
}
log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
newTasks.get(connectorName).add(keyValue);
}
}
}
return newTasks;
}
private void createDirectTask(ConnectorTaskId id, ConnectKeyValue keyValue) throws Exception {
String sourceTaskClass = keyValue.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS);
Task sourceTask = getTask(sourceTaskClass);
String sinkTaskClass = keyValue.getString(RuntimeConfigDefine.SINK_TASK_CLASS);
Task sinkTask = getTask(sinkTaskClass);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));
WorkerDirectTask workerDirectTask = new WorkerDirectTask(
workerConfig,
id,
(SourceTask) sourceTask,
null,
(SinkTask) sinkTask,
keyValue,
positionManagementService,
workerState,
connectStatsManager,
connectStatsService,
transformChain,
retryWithToleranceOperator);
Future future = taskExecutor.submit(workerDirectTask);
// schedule offset committer
sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerDirectTask));
taskToFutureMap.put(workerDirectTask, future);
this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
}
private Task getTask(String taskClass){
ClassLoader savedLoader = plugin.currentThreadLoader();
Task task = null;
try {
// Get plugin loader
ClassLoader taskLoader = plugin.delegatingLoader().pluginClassLoader(taskClass);
// Compare and set current loader
savedLoader = Plugin.compareAndSwapLoaders(taskLoader);
// load class
Class taskClazz = Utils.getContextCurrentClassLoader().loadClass(taskClass).asSubclass(Task.class);
// new task
task = plugin.newTask(taskClazz);
} catch (Exception ex ){
throw new ConnectException("Create direct task failure", ex);
} finally {
Plugin.compareAndSwapLoaders(savedLoader);
}
return task;
}
public class StateMachineService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(1000);
try {
Worker.this.maintainConnectorState();
Worker.this.maintainTaskState();
} catch (Exception e) {
log.error("RebalanceImpl#StateMachineService start connector or task failed", e);
}
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return StateMachineService.class.getSimpleName();
}
}
public enum TaskType {
SOURCE,
SINK,
DIRECT;
}
}