/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hugegraph.task;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.iterator.ExtendableIterator;
import org.apache.hugegraph.iterator.MapperIterator;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.SchemaManager;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.task.HugeTask.P;
import org.apache.hugegraph.task.TaskCallable.SysTaskCallable;
import org.apache.hugegraph.task.TaskManager.ContextCallable;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Cardinality;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.structure.Graph.Hidden;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableMap;

public class StandardTaskScheduler implements TaskScheduler {

    private static final Logger LOG = Log.logger(StandardTaskScheduler.class);

    private final HugeGraphParams graph;
    private final ServerInfoManager serverManager;

    private final ExecutorService taskExecutor;
    private final ExecutorService taskDbExecutor;

    private final Map<Id, HugeTask<?>> tasks;

    private volatile TaskTransaction taskTx;

    private static final long NO_LIMIT = -1L;
    private static final long PAGE_SIZE = 500L;
    private static final long QUERY_INTERVAL = 100L;
    private static final int MAX_PENDING_TASKS = 10000;

    public StandardTaskScheduler(HugeGraphParams graph,
                                 ExecutorService taskExecutor,
                                 ExecutorService taskDbExecutor,
                                 ExecutorService serverInfoDbExecutor) {
        E.checkNotNull(graph, "graph");
        E.checkNotNull(taskExecutor, "taskExecutor");
        E.checkNotNull(taskDbExecutor, "dbExecutor");

        this.graph = graph;
        this.taskExecutor = taskExecutor;
        this.taskDbExecutor = taskDbExecutor;

        this.serverManager = new ServerInfoManager(graph, serverInfoDbExecutor);
        this.tasks = new ConcurrentHashMap<>();

        this.taskTx = null;
    }

    @Override
    public HugeGraph graph() {
        return this.graph.graph();
    }

    public String graphName() {
        return this.graph.name();
    }

    @Override
    public int pendingTasks() {
        return this.tasks.size();
    }

    private TaskTransaction tx() {
        // NOTE: only the owner thread can access task tx
        if (this.taskTx == null) {
            /*
             * NOTE: don't synchronized(this) due to scheduler thread hold
             * this lock through scheduleTasks(), then query tasks and wait
             * for db-worker thread after call(), the tx may not be initialized
             * but can't catch this lock, then cause deadlock.
             * We just use this.serverManager as a monitor here
             */
            synchronized (this.serverManager) {
                if (this.taskTx == null) {
                    BackendStore store = this.graph.loadSystemStore();
                    TaskTransaction tx = new TaskTransaction(this.graph, store);
                    assert this.taskTx == null; // may be reentrant?
                    this.taskTx = tx;
                }
            }
        }
        assert this.taskTx != null;
        return this.taskTx;
    }

    @Override
    public <V> void restoreTasks() {
        Id selfServer = this.serverManager().selfServerId();
        // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order.
        for (TaskStatus status : TaskStatus.PENDING_STATUSES) {
            String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
            do {
                Iterator<HugeTask<V>> iter;
                for (iter = this.findTask(status, PAGE_SIZE, page);
                     iter.hasNext();) {
                    HugeTask<V> task = iter.next();
                    if (selfServer.equals(task.server())) {
                        this.restore(task);
                    }
                }
                if (page != null) {
                    page = PageInfo.pageInfo(iter);
                }
            } while (page != null);
        }
    }

    private <V> Future<?> restore(HugeTask<V> task) {
        E.checkArgumentNotNull(task, "Task can't be null");
        E.checkArgument(!this.tasks.containsKey(task.id()),
                        "Task '%s' is already in the queue", task.id());
        E.checkArgument(!task.isDone() && !task.completed(),
                        "No need to restore completed task '%s' with status %s",
                        task.id(), task.status());
        task.status(TaskStatus.RESTORING);
        task.retry();
        return this.submitTask(task);
    }

    @Override
    public <V> Future<?> schedule(HugeTask<V> task) {
        E.checkArgumentNotNull(task, "Task can't be null");

        if (task.status() == TaskStatus.QUEUED) {
            /*
             * Just submit to queue if status=QUEUED (means re-schedule task)
             * NOTE: schedule() method may be called multi times by
             * HugeTask.checkDependenciesSuccess() method
             */
            return this.resubmitTask(task);
        }

        if (task.callable() instanceof EphemeralJob) {
            /*
             * Due to EphemeralJob won't be serialized and deserialized through
             * shared storage, submit EphemeralJob immediately on master
             * NOTE: don't need to save EphemeralJob task
             */
            task.status(TaskStatus.QUEUED);
            return this.submitTask(task);
        }

        // Only check if not EphemeralJob
        this.checkOnMasterNode("schedule");

        if (this.serverManager().onlySingleNode() && !task.computer()) {
            /*
             * Speed up for single node, submit task immediately,
             * this code can be removed without affecting logic
             */
            task.status(TaskStatus.QUEUED);
            task.server(this.serverManager().selfServerId());
            this.save(task);
            return this.submitTask(task);
        } else {
            /*
             * Just set SCHEDULING status and save task,
             * it will be scheduled by periodic scheduler worker
             */
            task.status(TaskStatus.SCHEDULING);
            this.save(task);

            // Notify master server to schedule and execute immediately
            TaskManager.instance().notifyNewTask(task);

            return task;
        }
    }

    private <V> Future<?> submitTask(HugeTask<V> task) {
        int size = this.tasks.size() + 1;
        E.checkArgument(size <= MAX_PENDING_TASKS,
                        "Pending tasks size %s has exceeded the max limit %s",
                        size, MAX_PENDING_TASKS);
        this.initTaskCallable(task);
        assert !this.tasks.containsKey(task.id()) : task;
        this.tasks.put(task.id(), task);
        return this.taskExecutor.submit(task);
    }

    private <V> Future<?> resubmitTask(HugeTask<V> task) {
        E.checkArgument(task.status() == TaskStatus.QUEUED,
                        "Can't resubmit task '%s' with status %s",
                        task.id(), TaskStatus.QUEUED);
        E.checkArgument(this.tasks.containsKey(task.id()),
                        "Can't resubmit task '%s' not been submitted before",
                        task.id());
        return this.taskExecutor.submit(task);
    }

    public <V> void initTaskCallable(HugeTask<V> task) {
        task.scheduler(this);

        TaskCallable<V> callable = task.callable();
        callable.task(task);
        callable.graph(this.graph());
        if (callable instanceof SysTaskCallable) {
            // Only authorized to the necessary tasks
            ((SysTaskCallable<V>) callable).params(this.graph);
        }
    }

    @Override
    public synchronized <V> void cancel(HugeTask<V> task) {
        E.checkArgumentNotNull(task, "Task can't be null");
        this.checkOnMasterNode("cancel");

        if (task.completed() || task.cancelling()) {
            return;
        }

        LOG.info("Cancel task '{}' in status {}", task.id(), task.status());

        if (task.server() == null) {
            // The task not scheduled to workers, set canceled immediately
            assert task.status().code() < TaskStatus.QUEUED.code();
            if (task.status(TaskStatus.CANCELLED)) {
                this.save(task);
                return;
            }
        } else if (task.status(TaskStatus.CANCELLING)) {
            // The task scheduled to workers, let the worker node to cancel
            this.save(task);
            assert task.server() != null : task;
            assert this.serverManager().master();
            if (!task.server().equals(this.serverManager().selfServerId())) {
                /*
                 * Remove task from memory if it's running on worker node,
                 * but keep task in memory if it's running on master node.
                 * cancel-scheduling will read task from backend store, if
                 * removed this instance from memory, there will be two task
                 * instances with same id, and can't cancel the real task that
                 * is running but removed from memory.
                 */
                this.remove(task);
            }
            // Notify master server to schedule and execute immediately
            TaskManager.instance().notifyNewTask(task);
            return;
        }

        throw new HugeException("Can't cancel task '%s' in status %s",
                                task.id(), task.status());
    }

    protected ServerInfoManager serverManager() {
        return this.serverManager;
    }

    protected synchronized void scheduleTasks() {
        // Master server schedule all scheduling tasks to suitable worker nodes
        Collection<HugeServerInfo> scheduleInfos = this.serverManager()
                                                       .allServerInfos();
        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
        do {
            Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING,
                                                          PAGE_SIZE, page);
            while (tasks.hasNext()) {
                HugeTask<?> task = tasks.next();
                if (task.server() != null) {
                    // Skip if already scheduled
                    continue;
                }

                if (!this.serverManager.master()) {
                    return;
                }

                HugeServerInfo server = this.serverManager().pickWorkerNode(
                                        scheduleInfos, task);
                if (server == null) {
                    LOG.info("The master can't find suitable servers to " +
                             "execute task '{}', wait for next schedule",
                             task.id());
                    continue;
                }

                // Found suitable server, update task status
                assert server.id() != null;
                task.server(server.id());
                task.status(TaskStatus.SCHEDULED);
                this.save(task);

                // Update server load in memory, it will be saved at the ending
                server.increaseLoad(task.load());

                LOG.info("Scheduled task '{}' to server '{}'",
                         task.id(), server.id());
            }
            if (page != null) {
                page = PageInfo.pageInfo(tasks);
            }
        } while (page != null);

        this.serverManager().updateServerInfos(scheduleInfos);
    }

    protected void executeTasksOnWorker(Id server) {
        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
        do {
            Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
                                                          PAGE_SIZE, page);
            while (tasks.hasNext()) {
                HugeTask<?> task = tasks.next();
                this.initTaskCallable(task);
                Id taskServer = task.server();
                if (taskServer == null) {
                    LOG.warn("Task '{}' may not be scheduled", task.id());
                    continue;
                }
                HugeTask<?> memTask = this.tasks.get(task.id());
                if (memTask != null) {
                    assert memTask.status().code() > task.status().code();
                    continue;
                }
                if (taskServer.equals(server)) {
                    task.status(TaskStatus.QUEUED);
                    this.save(task);
                    this.submitTask(task);
                }
            }
            if (page != null) {
                page = PageInfo.pageInfo(tasks);
            }
        } while (page != null);
    }

    protected void cancelTasksOnWorker(Id server) {
        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
        do {
            Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING,
                                                          PAGE_SIZE, page);
            while (tasks.hasNext()) {
                HugeTask<?> task = tasks.next();
                Id taskServer = task.server();
                if (taskServer == null) {
                    LOG.warn("Task '{}' may not be scheduled", task.id());
                    continue;
                }
                if (!taskServer.equals(server)) {
                    continue;
                }
                /*
                 * Task may be loaded from backend store and not initialized.
                 * like: A task is completed but failed to save in the last
                 * step, resulting in the status of the task not being
                 * updated to storage, the task is not in memory, so it's not
                 * initialized when canceled.
                 */
                HugeTask<?> memTask = this.tasks.get(task.id());
                if (memTask != null) {
                    task = memTask;
                } else {
                    this.initTaskCallable(task);
                }
                boolean cancelled = task.cancel(true);
                LOG.info("Server '{}' cancel task '{}' with cancelled={}",
                         server, task.id(), cancelled);
            }
            if (page != null) {
                page = PageInfo.pageInfo(tasks);
            }
        } while (page != null);
    }

    protected void taskDone(HugeTask<?> task) {
        this.remove(task);

        Id selfServerId = this.serverManager().selfServerId();
        try {
            this.serverManager().decreaseLoad(task.load());
        } catch (Throwable e) {
            LOG.error("Failed to decrease load for task '{}' on server '{}'",
                      task.id(), selfServerId, e);
        }
        LOG.debug("Task '{}' done on server '{}'", task.id(), selfServerId);
    }

    protected void remove(HugeTask<?> task) {
        E.checkNotNull(task, "remove task");
        HugeTask<?> delTask = this.tasks.remove(task.id());
        if (delTask != null && delTask != task) {
            LOG.warn("Task '{}' may be inconsistent status {}(expect {})",
                      task.id(), task.status(), delTask.status());
        }
        assert delTask == null || delTask.completed() ||
               delTask.cancelling() || delTask.isCancelled() : delTask;
    }

    @Override
    public <V> void save(HugeTask<V> task) {
        LOG.debug("Saving task: {}", task);
        task.scheduler(this);
        E.checkArgumentNotNull(task, "Task can't be null");
        this.call(() -> {
            // Construct vertex from task
            HugeVertex vertex = this.tx().constructVertex(task);
            // Delete index of old vertex to avoid stale index
            this.tx().deleteIndex(vertex);
            // Add or update task info to backend store
            return this.tx().addVertex(vertex);
        });
    }

    @Override
    public void init() {
        this.call(() -> this.tx().initSchema());
    }

    @Override
    public boolean close() {
        if (!this.taskDbExecutor.isShutdown()) {
            this.call(() -> {
                try {
                    this.tx().close();
                } catch (ConnectionException ignored) {
                    // ConnectionException means no connection established
                }
                this.graph.closeTx();
            });
        }
        return this.serverManager.close();
    }

    @Override
    public <V> HugeTask<V> task(Id id) {
        E.checkArgumentNotNull(id, "Parameter task id can't be null");
        @SuppressWarnings("unchecked")
        HugeTask<V> task = (HugeTask<V>) this.tasks.get(id);
        if (task != null) {
            return task;
        }
        return this.findTask(id);
    }

    @Override
    public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
        List<Id> taskIdsNotInMem = new ArrayList<>();
        List<HugeTask<V>> taskInMem = new ArrayList<>();
        for (Id id : ids) {
            @SuppressWarnings("unchecked")
            HugeTask<V> task = (HugeTask<V>) this.tasks.get(id);
            if (task != null) {
                taskInMem.add(task);
            } else {
                taskIdsNotInMem.add(id);
            }
        }
        ExtendableIterator<HugeTask<V>> iterator;
        if (taskInMem.isEmpty()) {
            iterator = new ExtendableIterator<>();
        } else {
            iterator = new ExtendableIterator<>(taskInMem.iterator());
        }
        iterator.extend(this.findTasks(taskIdsNotInMem));
        return iterator;
    }

    @Override
    public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
                                           long limit, String page) {
        if (status == null) {
            return this.findAllTask(limit, page);
        }
        return this.findTask(status, limit, page);
    }

    public <V> HugeTask<V> findTask(Id id) {
        HugeTask<V> result =  this.call(() -> {
            Iterator<Vertex> vertices = this.tx().queryVertices(id);
            Vertex vertex = QueryResults.one(vertices);
            if (vertex == null) {
                return null;
            }
            return HugeTask.fromVertex(vertex);
        });
        if (result == null) {
            throw new NotFoundException("Can't find task with id '%s'", id);
        }
        return result;
    }

    public <V> Iterator<HugeTask<V>> findTasks(List<Id> ids) {
        return this.queryTask(ids);
    }

    public <V> Iterator<HugeTask<V>> findAllTask(long limit, String page) {
        return this.queryTask(ImmutableMap.of(), limit, page);
    }

    public <V> Iterator<HugeTask<V>> findTask(TaskStatus status,
                                              long limit, String page) {
        return this.queryTask(P.STATUS, status.code(), limit, page);
    }

    @Override
    public <V> HugeTask<V> delete(Id id) {
        this.checkOnMasterNode("delete");

        HugeTask<?> task = this.task(id);
        /*
         * The following is out of date when task running on worker node:
         * HugeTask<?> task = this.tasks.get(id);
         * Tasks are removed from memory after completed at most time,
         * but there is a tiny gap between tasks are completed and
         * removed from memory.
         * We assume tasks only in memory may be incomplete status,
         * in fact, it is also possible to appear on the backend tasks
         * when the database status is inconsistent.
         */
        if (task != null) {
            E.checkArgument(task.completed(),
                            "Can't delete incomplete task '%s' in status %s" +
                            ", Please try to cancel the task first",
                            id, task.status());
            this.remove(task);
        }

        return this.call(() -> {
            Iterator<Vertex> vertices = this.tx().queryVertices(id);
            HugeVertex vertex = (HugeVertex) QueryResults.one(vertices);
            if (vertex == null) {
                return null;
            }
            HugeTask<V> result = HugeTask.fromVertex(vertex);
            E.checkState(result.completed(),
                         "Can't delete incomplete task '%s' in status %s",
                         id, result.status());
            this.tx().removeVertex(vertex);
            return result;
        });
    }

    @Override
    public <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds)
                                                  throws TimeoutException {
        return this.waitUntilTaskCompleted(id, seconds, QUERY_INTERVAL);
    }

    @Override
    public <V> HugeTask<V> waitUntilTaskCompleted(Id id)
                                                  throws TimeoutException {
        // This method is just used by tests
        long timeout = this.graph.configuration()
                                 .get(CoreOptions.TASK_WAIT_TIMEOUT);
        return this.waitUntilTaskCompleted(id, timeout, 1L);
    }

    private <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds,
                                                   long intervalMs)
                                                   throws TimeoutException {
        long passes = seconds * 1000 / intervalMs;
        HugeTask<V> task = null;
        for (long pass = 0;; pass++) {
            try {
                task = this.task(id);
            } catch (NotFoundException e) {
                if (task != null && task.completed()) {
                    assert task.id().asLong() < 0L : task.id();
                    sleep(intervalMs);
                    return task;
                }
                throw e;
            }
            if (task.completed()) {
                // Wait for task result being set after status is completed
                sleep(intervalMs);
                return task;
            }
            if (pass >= passes) {
                break;
            }
            sleep(intervalMs);
        }
        throw new TimeoutException(String.format(
                  "Task '%s' was not completed in %s seconds", id, seconds));
    }

    @Override
    public void waitUntilAllTasksCompleted(long seconds)
                                           throws TimeoutException {
        long passes = seconds * 1000 / QUERY_INTERVAL;
        int taskSize;
        for (long pass = 0;; pass++) {
            taskSize = this.pendingTasks();
            if (taskSize == 0) {
                sleep(QUERY_INTERVAL);
                return;
            }
            if (pass >= passes) {
                break;
            }
            sleep(QUERY_INTERVAL);
        }
        throw new TimeoutException(String.format(
                  "There are still %s incomplete tasks after %s seconds",
                  taskSize, seconds));
    }

    @Override
    public void checkRequirement(String op) {
        this.checkOnMasterNode(op);
    }

    private <V> Iterator<HugeTask<V>> queryTask(String key, Object value,
                                                long limit, String page) {
        return this.queryTask(ImmutableMap.of(key, value), limit, page);
    }

    private <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> conditions,
                                                long limit, String page) {
        return this.call(() -> {
            ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
            if (page != null) {
                query.page(page);
            }
            VertexLabel vl = this.graph().vertexLabel(P.TASK);
            query.eq(HugeKeys.LABEL, vl.id());
            for (Map.Entry<String, Object> entry : conditions.entrySet()) {
                PropertyKey pk = this.graph().propertyKey(entry.getKey());
                query.query(Condition.eq(pk.id(), entry.getValue()));
            }
            query.showHidden(true);
            if (limit != NO_LIMIT) {
                query.limit(limit);
            }
            Iterator<Vertex> vertices = this.tx().queryVertices(query);
            Iterator<HugeTask<V>> tasks =
                    new MapperIterator<>(vertices, HugeTask::fromVertex);
            // Convert iterator to list to avoid across thread tx accessed
            return QueryResults.toList(tasks);
        });
    }

    private <V> Iterator<HugeTask<V>> queryTask(List<Id> ids) {
        return this.call(() -> {
            Object[] idArray = ids.toArray(new Id[0]);
            Iterator<Vertex> vertices = this.tx().queryVertices(idArray);
            Iterator<HugeTask<V>> tasks =
                    new MapperIterator<>(vertices, HugeTask::fromVertex);
            // Convert iterator to list to avoid across thread tx accessed
            return QueryResults.toList(tasks);
        });
    }

    private <V> V call(Runnable runnable) {
        return this.call(Executors.callable(runnable, null));
    }

    private <V> V call(Callable<V> callable) {
        assert !Thread.currentThread().getName().startsWith(
               "task-db-worker") : "can't call by itself";
        try {
            // Pass task context for db thread
            callable = new ContextCallable<>(callable);
            // Ensure all db operations are executed in dbExecutor thread(s)
            return this.taskDbExecutor.submit(callable).get();
        } catch (Throwable e) {
            throw new HugeException("Failed to update/query TaskStore: %s",
                                    e, e.toString());
        }
    }

    private void checkOnMasterNode(String op) {
        if (!this.serverManager().master()) {
            throw new HugeException("Can't %s task on non-master server", op);
        }
    }

    private boolean supportsPaging() {
        return this.graph.backendStoreFeatures().supportsQueryByPage();
    }

    private static boolean sleep(long ms) {
        try {
            Thread.sleep(ms);
            return true;
        } catch (InterruptedException ignored) {
            // Ignore InterruptedException
            return false;
        }
    }

    private static class TaskTransaction extends GraphTransaction {

        public static final String TASK = P.TASK;

        public TaskTransaction(HugeGraphParams graph, BackendStore store) {
            super(graph, store);
            this.autoCommit(true);
        }

        public HugeVertex constructVertex(HugeTask<?> task) {
            if (!this.graph().existsVertexLabel(TASK)) {
                throw new HugeException("Schema is missing for task(%s) '%s'",
                                        task.id(), task.name());
            }
            return this.constructVertex(false, task.asArray());
        }

        public void deleteIndex(HugeVertex vertex) {
            // Delete the old record if exist
            Iterator<Vertex> old = this.queryVertices(vertex.id());
            HugeVertex oldV = (HugeVertex) QueryResults.one(old);
            if (oldV == null) {
                return;
            }
            this.deleteIndexIfNeeded(oldV, vertex);
        }

        private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) {
            if (!oldV.value(P.STATUS).equals(newV.value(P.STATUS))) {
                // Only delete vertex if index value changed else override it
                this.updateIndex(this.indexLabel(P.STATUS).id(), oldV, true);
                return true;
            }
            return false;
        }

        public void initSchema() {
            if (this.existVertexLabel(TASK)) {
                return;
            }

            HugeGraph graph = this.graph();
            String[] properties = this.initProperties();

            // Create vertex label '~task'
            VertexLabel label = graph.schema().vertexLabel(TASK)
                                     .properties(properties)
                                     .useCustomizeNumberId()
                                     .nullableKeys(P.DESCRIPTION, P.CONTEXT,
                                                   P.UPDATE, P.INPUT, P.RESULT,
                                                   P.DEPENDENCIES, P.SERVER)
                                     .enableLabelIndex(true)
                                     .build();
            this.params().schemaTransaction().addVertexLabel(label);

            // Create index
            this.createIndexLabel(label, P.STATUS);
        }

        private boolean existVertexLabel(String label) {
            return this.params().schemaTransaction()
                                .getVertexLabel(label) != null;
        }

        private String[] initProperties() {
            List<String> props = new ArrayList<>();

            props.add(createPropertyKey(P.TYPE));
            props.add(createPropertyKey(P.NAME));
            props.add(createPropertyKey(P.CALLABLE));
            props.add(createPropertyKey(P.DESCRIPTION));
            props.add(createPropertyKey(P.CONTEXT));
            props.add(createPropertyKey(P.STATUS, DataType.BYTE));
            props.add(createPropertyKey(P.PROGRESS, DataType.INT));
            props.add(createPropertyKey(P.CREATE, DataType.DATE));
            props.add(createPropertyKey(P.UPDATE, DataType.DATE));
            props.add(createPropertyKey(P.RETRIES, DataType.INT));
            props.add(createPropertyKey(P.INPUT, DataType.BLOB));
            props.add(createPropertyKey(P.RESULT, DataType.BLOB));
            props.add(createPropertyKey(P.DEPENDENCIES, DataType.LONG,
                                        Cardinality.SET));
            props.add(createPropertyKey(P.SERVER));

            return props.toArray(new String[0]);
        }

        private String createPropertyKey(String name) {
            return this.createPropertyKey(name, DataType.TEXT);
        }

        private String createPropertyKey(String name, DataType dataType) {
            return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
        }

        private String createPropertyKey(String name, DataType dataType,
                                         Cardinality cardinality) {
            HugeGraph graph = this.graph();
            SchemaManager schema = graph.schema();
            PropertyKey propertyKey = schema.propertyKey(name)
                                            .dataType(dataType)
                                            .cardinality(cardinality)
                                            .build();
            this.params().schemaTransaction().addPropertyKey(propertyKey);
            return name;
        }

        private IndexLabel createIndexLabel(VertexLabel label, String field) {
            HugeGraph graph = this.graph();
            SchemaManager schema = graph.schema();
            String name = Hidden.hide("task-index-by-" + field);
            IndexLabel indexLabel = schema.indexLabel(name)
                                          .on(HugeType.VERTEX_LABEL, TASK)
                                          .by(field)
                                          .build();
            this.params().schemaTransaction().addIndexLabel(label, indexLabel);
            return indexLabel;
        }

        private IndexLabel indexLabel(String field) {
            String name = Hidden.hide("task-index-by-" + field);
            HugeGraph graph = this.graph();
            return graph.indexLabel(name);
        }
    }
}
