/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.rocketmq.connect.runtime.connectorwrapper;

import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.converter.record.ConverterConfig;
import org.apache.rocketmq.connect.runtime.converter.record.ConverterType;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

/**
 * A worker to schedule all connectors and tasks in a process.
 */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);

    /**
     * Current running connectors.
     */
    private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();

    /**
     * Current tasks state.
     */
    private Map<Runnable, Long/*timestamp*/> pendingTasks = new ConcurrentHashMap<>();
    private Set<Runnable> runningTasks = new ConcurrentSet<>();
    private Map<Runnable, Long/*timestamp*/> stoppingTasks = new ConcurrentHashMap<>();
    private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
    private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
    private Set<Runnable> errorTasks = new ConcurrentSet<>();
    private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();

    Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
    /**
     * Current running tasks to its Future map.
     */
    private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
    /**
     * Thread pool for connectors and tasks.
     */
    private final ExecutorService taskExecutor;
    /**
     * Position management for source tasks.
     */
    private final PositionManagementService positionManagementService;
    /**
     * A scheduled task to commit source position of source tasks.
     */
//    private final TaskPositionCommitService taskPositionCommitService;
    private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
    private final ConnectConfig workerConfig;
    private final Plugin plugin;

    /**
     * Atomic state variable
     */
    private AtomicReference<WorkerState> workerState;
    private StateMachineService stateMachineService = new StateMachineService();
    private final ConnectStatsManager connectStatsManager;
    private final ConnectStatsService connectStatsService;

    public Worker(ConnectConfig workerConfig,
        PositionManagementService positionManagementService,
        ConfigManagementService configManagementService,
        Plugin plugin, AbstractConnectController connectController) {
        this.workerConfig = workerConfig;
        this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
        this.positionManagementService = positionManagementService;
        this.plugin = plugin;
        this.connectStatsManager = connectController.getConnectStatsManager();
        this.connectStatsService = connectController.getConnectStatsService();
        this.sourceTaskOffsetCommitter = Optional.of(new SourceTaskOffsetCommitter(workerConfig));
    }

    public void start() {
        workerState = new AtomicReference<>(WorkerState.STARTED);
        stateMachineService.start();
    }

    /**
     * Start a collection of connectors with the given configs. If a connector is already started with the same configs,
     * it will not start again. If a connector is already started but not contained in the new configs, it will stop.
     *
     * @param connectorConfigs
     * @param connectController
     * @throws Exception
     */
    public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
        AbstractConnectController connectController) throws Exception {
        Set<WorkerConnector> stoppedConnector = new HashSet<>();
        for (WorkerConnector workerConnector : workingConnectors) {
            try {
                String connectorName = workerConnector.getConnectorName();
                ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
                if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
                    workerConnector.stop();
                    log.info("Connector {} stop", workerConnector.getConnectorName());
                    stoppedConnector.add(workerConnector);
                } else if (!keyValue.equals(workerConnector.getKeyValue())) {
                    workerConnector.reconfigure(keyValue);
                }
            } catch (Exception e) {
                log.error("stop or reconfigure connector error, connectName: " + workerConnector.getConnectorName(), e);
            }
        }
        workingConnectors.removeAll(stoppedConnector);

        if (null == connectorConfigs || 0 == connectorConfigs.size()) {
            return;
        }
        Map<String, ConnectKeyValue> newConnectors = new HashMap<>();
        for (String connectorName : connectorConfigs.keySet()) {
            boolean isNewConnector = true;
            for (WorkerConnector workerConnector : workingConnectors) {
                if (workerConnector.getConnectorName().equals(connectorName)) {
                    isNewConnector = false;
                    break;
                }
            }
            if (isNewConnector) {
                newConnectors.put(connectorName, connectorConfigs.get(connectorName));
            }
        }

        for (String connectorName : newConnectors.keySet()) {
            ClassLoader savedLoader = plugin.currentThreadLoader();
            try {
                ConnectKeyValue keyValue = newConnectors.get(connectorName);
                String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
                ClassLoader connectorLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
                savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);

                // instance connector
                final Connector connector = plugin.newConnector(connectorClass);
                WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));

                // start connector
                workerConnector.initialize();
                workerConnector.start();
                log.info("Connector {} start", workerConnector.getConnectorName());
                this.workingConnectors.add(workerConnector);
            } catch (Exception e) {
                log.error("worker connector start exception. workerName: " + connectorName, e);
            } finally {
                // compare and swap
                Plugin.compareAndSwapLoaders(savedLoader);
            }
        }
    }

    /**
     * Start a collection of tasks with the given configs. If a task is already started with the same configs, it will
     * not start again. If a task is already started but not contained in the new configs, it will stop.
     *
     * @param taskConfigs
     * @throws Exception
     */
    public void startTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
        synchronized (latestTaskConfigs) {
            this.latestTaskConfigs = taskConfigs;
        }
    }

    private boolean isConfigInSet(ConnectKeyValue keyValue, Set<Runnable> set) {
        for (Runnable runnable : set) {
            ConnectKeyValue taskConfig = null;
            if (runnable instanceof WorkerSourceTask) {
                taskConfig = ((WorkerSourceTask) runnable).currentTaskConfig();
            } else if (runnable instanceof WorkerSinkTask) {
                taskConfig = ((WorkerSinkTask) runnable).currentTaskConfig();
            } else if (runnable instanceof WorkerDirectTask) {
                taskConfig = ((WorkerDirectTask) runnable).currentTaskConfig();
            }
            if (keyValue.equals(taskConfig)) {
                return true;
            }
        }
        return false;
    }

    /**
     * We can choose to persist in-memory task status
     * so we can view history tasks
     */
    public void stop() {
        workerState.set(WorkerState.TERMINATED);
        try {
            sourceTaskOffsetCommitter.ifPresent(committer -> committer.close(5000));
            taskExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Task termination error.", e);
        }
        stateMachineService.shutdown();

    }

    public Set<WorkerConnector> getWorkingConnectors() {
        return workingConnectors;
    }

    public void setWorkingConnectors(
        Set<WorkerConnector> workingConnectors) {
        this.workingConnectors = workingConnectors;
    }

    /**
     * Beaware that we are not creating a defensive copy of these tasks
     * So developers should only use these references for read-only purposes.
     * These variables should be immutable
     *
     * @return
     */
    public Set<Runnable> getWorkingTasks() {
        return runningTasks;
    }

    public Set<Runnable> getErrorTasks() {
        return errorTasks;
    }

    public Set<Runnable> getPendingTasks() {
        return pendingTasks.keySet();
    }

    public Set<Runnable> getStoppedTasks() {
        return stoppedTasks;
    }

    public Set<Runnable> getStoppingTasks() {
        return stoppingTasks.keySet();
    }

    public Set<Runnable> getCleanedErrorTasks() {
        return cleanedErrorTasks;
    }

    public Set<Runnable> getCleanedStoppedTasks() {
        return cleanedStoppedTasks;
    }

    public void setWorkingTasks(Set<Runnable> workingTasks) {
        this.runningTasks = workingTasks;
    }

    public void maintainConnectorState() {

    }

    /**
     * maintain task state
     *
     * @throws Exception
     */
    public void maintainTaskState() throws Exception {
        Map<String, List<ConnectKeyValue>> connectorConfig = new HashMap<>();
        synchronized (latestTaskConfigs) {
            connectorConfig.putAll(latestTaskConfigs);
        }

        //  STEP 1: check running tasks and put to error status
        checkRunningTasks(connectorConfig);

        // get new Tasks
        Map<String, List<ConnectKeyValue>> newTasks = newTasks(connectorConfig);

        //  STEP 2: try to create new tasks
        startTask(newTasks);

        //  STEP 3: check all pending state
        checkPendingTask();

        //  STEP 4 check stopping tasks
        checkStoppingTasks();

        //  STEP 5 check error tasks
        checkErrorTasks();

        //  STEP 6 check errorTasks and stopped tasks
        checkStoppedTasks();
    }

    /**
     * check running task
     *
     * @param connectorConfig
     */
    private void checkRunningTasks(Map<String, List<ConnectKeyValue>> connectorConfig) {
        //  STEP 1: check running tasks and put to error status
        for (Runnable runnable : runningTasks) {
            WorkerTask workerTask = (WorkerTask) runnable;
            String connectorName = workerTask.id().connector();
            ConnectKeyValue taskConfig = workerTask.currentTaskConfig();
            List<ConnectKeyValue> taskConfigs = connectorConfig.get(connectorName);
            WorkerTaskState state = ((WorkerTask) runnable).getState();
            switch (state) {
                case ERROR:
                    errorTasks.add(runnable);
                    runningTasks.remove(runnable);
                    break;
                case RUNNING:
                    if (isNeedStop(taskConfig, taskConfigs)) {
                        try {
                            // remove committer offset
                            sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
                            workerTask.doClose();
                        } catch (Exception e) {
                            log.error("workerTask stop exception, workerTask: " + workerTask.currentTaskConfig(), e);
                        }
                        log.info("Task stopping, connector name {}, config {}", workerTask.id().connector(), workerTask.currentTaskConfig());
                        runningTasks.remove(runnable);
                        stoppingTasks.put(runnable, System.currentTimeMillis());
                    }
                    break;
                default:
                    log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
                        ((WorkerTask) runnable).id().connector(), state);
                    break;
            }
        }
    }

    /**
     * check is need stop
     *
     * @param taskConfig
     * @param keyValues
     * @return
     */
    private boolean isNeedStop(ConnectKeyValue taskConfig, List<ConnectKeyValue> keyValues) {
        if (CollectionUtils.isEmpty(keyValues)) {
            return true;
        }
        for (ConnectKeyValue keyValue : keyValues) {
            if (keyValue.equals(taskConfig)) {
                // not stop
                return false;
            }
        }
        return true;
    }

    /**
     * check stopped tasks
     */
    private void checkStoppedTasks() {
        for (Runnable runnable : stoppedTasks) {
            WorkerTask workerTask = (WorkerTask) runnable;
            workerTask.cleanup();
            Future future = taskToFutureMap.get(runnable);
            try {
                if (null != future) {
                    future.get(workerConfig.getMaxStartTimeoutMills(), TimeUnit.MILLISECONDS);
                } else {
                    log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
                }
            } catch (ExecutionException e) {
                Throwable t = e.getCause();
                log.info("[BUG] Stopped Tasks should not throw any exception");
                t.printStackTrace();
            } catch (CancellationException e) {
                log.info("[BUG] Stopped Tasks throws PrintStackTrace");
                e.printStackTrace();
            } catch (TimeoutException e) {
                log.info("[BUG] Stopped Tasks should not throw any exception");
                e.printStackTrace();
            } catch (InterruptedException e) {
                log.info("[BUG] Stopped Tasks should not throw any exception");
                e.printStackTrace();
            } finally {
                // remove committer offset
                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
                future.cancel(true);
                taskToFutureMap.remove(runnable);
                stoppedTasks.remove(runnable);
                cleanedStoppedTasks.add(runnable);
            }
        }
    }

    private void checkErrorTasks() {
        for (Runnable runnable : errorTasks) {
            WorkerTask workerTask = (WorkerTask) runnable;
            Future future = taskToFutureMap.get(runnable);
            try {
                if (null != future) {
                    future.get(workerConfig.getMaxStopTimeoutMills(), TimeUnit.MILLISECONDS);
                } else {
                    log.error("[BUG] errorTasks reference not found in taskFutureMap");
                }
            } catch (ExecutionException e) {
                log.error("Execution exception , {}", e);
            } catch (CancellationException | TimeoutException | InterruptedException e) {
                log.error("error, {}", e);
            } finally {
                // remove committer offset
                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));

                future.cancel(true);
                workerTask.cleanup();
                taskToFutureMap.remove(runnable);
                errorTasks.remove(runnable);
                cleanedErrorTasks.add(runnable);
            }
        }
    }

    private void checkStoppingTasks() {
        for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
            Runnable runnable = entry.getKey();
            Long stopTimestamp = entry.getValue();
            Long currentTimeMillis = System.currentTimeMillis();
            Future future = taskToFutureMap.get(runnable);
            WorkerTaskState state = ((WorkerTask) runnable).getState();
            // exited normally
            switch (state) {
                case STOPPED:
                    // concurrent modification Exception ? Will it pop that in the
                    if (null == future || !future.isDone()) {
                        log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
                    }
                    stoppingTasks.remove(runnable);
                    stoppedTasks.add(runnable);
                    break;
                case ERROR:
                    stoppingTasks.remove(runnable);
                    errorTasks.add(runnable);
                    break;
                case STOPPING:
                    if (currentTimeMillis - stopTimestamp > workerConfig.getMaxStopTimeoutMills()) {
                        ((WorkerTask) runnable).timeout();
                        stoppingTasks.remove(runnable);
                        errorTasks.add(runnable);
                    }
                    break;
                default:
                    log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
                        ((WorkerTask) runnable).id().connector(), state.toString());
            }
        }
    }

    private void checkPendingTask() {
        for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
            Runnable runnable = entry.getKey();
            Long startTimestamp = entry.getValue();
            Long currentTimeMillis = System.currentTimeMillis();
            WorkerTaskState state = ((WorkerTask) runnable).getState();
            switch (state) {
                case ERROR:
                    errorTasks.add(runnable);
                    pendingTasks.remove(runnable);
                    break;
                case RUNNING:
                    runningTasks.add(runnable);
                    pendingTasks.remove(runnable);
                    break;
                case NEW:
                    log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
                    break;
                case PENDING:
                    if (currentTimeMillis - startTimestamp > workerConfig.getMaxStartTimeoutMills()) {
                        ((WorkerTask) runnable).timeout();
                        pendingTasks.remove(runnable);
                        errorTasks.add(runnable);
                    }
                    break;
                default:
                    log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
                        ((WorkerTask) runnable).id().connector(), state.toString());
                    break;
            }
        }
    }

    /**
     * start task
     *
     * @param newTasks
     * @throws Exception
     */
    private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
        for (String connectorName : newTasks.keySet()) {
            for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
                int taskId = keyValue.getInt(RuntimeConfigDefine.TASK_ID);
                ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);
                String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
                if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
                    createDirectTask(id, keyValue);
                    continue;
                }

                ClassLoader savedLoader = plugin.currentThreadLoader();
                try {
                    String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
                    ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
                    savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
                    // new task
                    final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
                    final Task task = plugin.newTask(taskClass);

                    /**
                     * create key/value converter
                     */
                    RecordConverter valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
                    RecordConverter keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);

                    if (keyConverter == null) {
                        keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
                        log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
                    } else {
                        log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
                    }
                    if (valueConverter == null) {
                        valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
                        log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
                    } else {
                        log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
                    }

                    if (task instanceof SourceTask) {
                        DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
                        // create retry operator
                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));

                        WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
                            (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);

                        Future future = taskExecutor.submit(workerSourceTask);
                        // schedule offset committer
                        sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));

                        taskToFutureMap.put(workerSourceTask, future);
                        this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());

                    } else if (task instanceof SinkTask) {
                        log.info("sink task config keyValue is {}", keyValue.getProperties());
                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(workerConfig, id, keyValue);
                        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
                        if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
                            ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
                        }
                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
                        // create retry operator
                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));

                        WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
                            (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
                            retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
                        Future future = taskExecutor.submit(workerSinkTask);
                        taskToFutureMap.put(workerSinkTask, future);
                        this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
                    }
                    Plugin.compareAndSwapLoaders(savedLoader);
                } catch (Exception e) {
                    log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
                    Plugin.compareAndSwapLoaders(savedLoader);
                }
            }
        }
    }

    private Map<String, List<ConnectKeyValue>> newTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
        Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
        for (String connectorName : taskConfigs.keySet()) {
            for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
                boolean isNewTask = true;
                if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
                    isNewTask = false;
                }
                if (isNewTask) {
                    if (!newTasks.containsKey(connectorName)) {
                        newTasks.put(connectorName, new ArrayList<>());
                    }
                    log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
                    newTasks.get(connectorName).add(keyValue);
                }
            }
        }
        return newTasks;
    }

    private void createDirectTask(ConnectorTaskId id, ConnectKeyValue keyValue) throws Exception {
        String sourceTaskClass = keyValue.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS);
        Task sourceTask = getTask(sourceTaskClass);

        String sinkTaskClass = keyValue.getString(RuntimeConfigDefine.SINK_TASK_CLASS);
        Task sinkTask = getTask(sinkTaskClass);

        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
        // create retry operator
        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));

        WorkerDirectTask workerDirectTask = new WorkerDirectTask(
            workerConfig,
            id,
            (SourceTask) sourceTask,
            null,
            (SinkTask) sinkTask,
            keyValue,
            positionManagementService,
            workerState,
            connectStatsManager,
            connectStatsService,
            transformChain,
            retryWithToleranceOperator);

        Future future = taskExecutor.submit(workerDirectTask);

        // schedule offset committer
        sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerDirectTask));

        taskToFutureMap.put(workerDirectTask, future);
        this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
    }

    private Task getTask(String taskClass){
        ClassLoader savedLoader = plugin.currentThreadLoader();
        Task task = null;
        try {
            // Get plugin loader
            ClassLoader taskLoader = plugin.delegatingLoader().pluginClassLoader(taskClass);
            // Compare and set current loader
            savedLoader = Plugin.compareAndSwapLoaders(taskLoader);
            // load class
            Class taskClazz = Utils.getContextCurrentClassLoader().loadClass(taskClass).asSubclass(Task.class);
            // new task
            task = plugin.newTask(taskClazz);
        } catch (Exception ex ){
            throw new ConnectException("Create direct task failure", ex);
        } finally {
            Plugin.compareAndSwapLoaders(savedLoader);
        }
        return task;
    }

    public class StateMachineService extends ServiceThread {
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                this.waitForRunning(1000);
                try {
                    Worker.this.maintainConnectorState();
                    Worker.this.maintainTaskState();
                } catch (Exception e) {
                    log.error("RebalanceImpl#StateMachineService start connector or task failed", e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return StateMachineService.class.getSimpleName();
        }
    }

    public enum TaskType {
        SOURCE,
        SINK,
        DIRECT;
    }
}
