/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.ofbiz.service.job;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.commons.lang.StringUtils;
import org.ofbiz.base.config.GenericConfigException;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.UtilDateTime;
import org.ofbiz.base.util.UtilGenerics;
import org.ofbiz.base.util.UtilValidate;
import org.ofbiz.entity.Delegator;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.GenericValue;
import org.ofbiz.entity.condition.EntityCondition;
import org.ofbiz.entity.condition.EntityFieldMap;
import org.ofbiz.entity.serialize.SerializeException;
import org.ofbiz.entity.serialize.XmlSerializer;
import org.ofbiz.service.DispatchContext;
import org.ofbiz.service.GenericRequester;
import org.ofbiz.service.ServiceUtil;
import org.ofbiz.service.calendar.RecurrenceInfo;
import org.ofbiz.service.calendar.RecurrenceInfoException;
import org.ofbiz.service.calendar.TemporalExpression;
import org.ofbiz.service.calendar.TemporalExpressionWorker;
import org.ofbiz.service.config.ServiceConfigUtil;
import org.xml.sax.SAXException;

import com.ibm.icu.util.Calendar;

/**
 * A {@link Job} that is backed by the entity engine. Job data is stored
 * in the JobSandbox entity.
 * <p>When the job is queued, this object "owns" the entity value. Any external changes
 * are ignored except the cancelDateTime field - jobs can be canceled after they are queued.</p>
 */
@SuppressWarnings("serial")
public class PersistedServiceJob extends GenericServiceJob {

    public static final String module = PersistedServiceJob.class.getName();

    private final transient Delegator delegator;
    private long nextRecurrence = -1;
    private final long maxRetry;
    private final long currentRetryCount;
    private final GenericValue jobValue;
    private final long startTime;

    /**
     * Creates a new PersistedServiceJob
     * @param dctx
     * @param jobValue
     * @param req
     */
    public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) {
        super(dctx, jobValue.getString("jobId"), jobValue.getString("jobName"), null, null, req);
        this.delegator = dctx.getDelegator();
        this.jobValue = jobValue;
        Timestamp storedDate = jobValue.getTimestamp("runTime");
        this.startTime = storedDate.getTime();
        this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1;
        Long retryCount = jobValue.getLong("currentRetryCount");
        if (retryCount != null) {
            this.currentRetryCount = retryCount.longValue();
        } else {
            // backward compatibility
            this.currentRetryCount = getRetries(this.delegator);
        }
    }

    @Override
    public void queue() throws InvalidJobException {
        super.queue();
        try {
            jobValue.refresh();
        } catch (GenericEntityException e) {
            throw new InvalidJobException("Unable to refresh JobSandbox value", e);
        }
        if (!JobManager.instanceId.equals(jobValue.getString("runByInstanceId"))) {
            throw new InvalidJobException("Job has been accepted by a different instance");
        }
        Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
        Timestamp startTime = jobValue.getTimestamp("startDateTime");
        if (cancelTime != null || startTime != null) {
            // job not available
            throw new InvalidJobException("Job [" + getJobId() + "] is not available");
        } else {
            jobValue.set("statusId", "SERVICE_QUEUED");
            try {
                jobValue.store();
            } catch (GenericEntityException e) {
                throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
            }
            if (Debug.verboseOn()) {
                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
            }
        }
    }

    @Override
    protected void init() throws InvalidJobException {
        super.init();
        try {
            jobValue.refresh();
        } catch (GenericEntityException e) {
            throw new InvalidJobException("Unable to refresh JobSandbox value", e);
        }
        if (!JobManager.instanceId.equals(jobValue.getString("runByInstanceId"))) {
            throw new InvalidJobException("Job has been accepted by a different instance");
        }
        if (jobValue.getTimestamp("cancelDateTime") != null) {
            // Job cancelled
            throw new InvalidJobException("Job [" + getJobId() + "] was cancelled");
        }
        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
        jobValue.set("statusId", "SERVICE_RUNNING");
        try {
            jobValue.store();
        } catch (GenericEntityException e) {
            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
        }
        if (Debug.verboseOn()) {
            Debug.logVerbose("Job [" + getJobId() + "] running", module);
        }
        // configure any additional recurrences
        long maxRecurrenceCount = -1;
        long currentRecurrenceCount = 0;
        TemporalExpression expr = null;
        RecurrenceInfo recurrence = getRecurrenceInfo();
        if (recurrence != null) {
            Debug.logWarning("Persisted Job [" + getJobId() + "] references a RecurrenceInfo, recommend using TemporalExpression instead", module);
            currentRecurrenceCount = recurrence.getCurrentCount();
            expr = RecurrenceInfo.toTemporalExpression(recurrence);
        }
        if (expr == null && UtilValidate.isNotEmpty(jobValue.getString("tempExprId"))) {
            try {
                expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, jobValue.getString("tempExprId"));
            } catch (GenericEntityException e) {
                throw new RuntimeException(e.getMessage());
            }
        }
        if (jobValue.get("maxRecurrenceCount") != null) {
            maxRecurrenceCount = jobValue.getLong("maxRecurrenceCount").longValue();
        }
        if (jobValue.get("currentRecurrenceCount") != null) {
            currentRecurrenceCount = jobValue.getLong("currentRecurrenceCount").longValue();
        }
        if (maxRecurrenceCount != -1) {
            currentRecurrenceCount++;
            jobValue.set("currentRecurrenceCount", currentRecurrenceCount);
        }
        try {
            if (expr != null && (maxRecurrenceCount == -1 || currentRecurrenceCount <= maxRecurrenceCount)) {
                if (recurrence != null) {
                    recurrence.incrementCurrentCount();
                }
                Calendar next = expr.next(Calendar.getInstance());
                if (next != null) {
                    createRecurrence(next.getTimeInMillis(), false);
                }
            }
        } catch (GenericEntityException e) {
            throw new InvalidJobException(e);
        }
        if (Debug.infoOn()) Debug.logInfo("Job  [" + getJobName() + "] Id ["  + getJobId() + "] -- Next runtime: " + new Date(nextRecurrence), module);
    }

    private void createRecurrence(long next, boolean isRetryOnFailure) throws GenericEntityException {
        if (Debug.verboseOn()) Debug.logVerbose("Next runtime returned: " + next, module);
        if (next > startTime) {
            String pJobId = jobValue.getString("parentJobId");
            if (pJobId == null) {
                pJobId = jobValue.getString("jobId");
            }
            GenericValue newJob = GenericValue.create(jobValue);
            newJob.remove("jobId");
            newJob.set("previousJobId", jobValue.getString("jobId"));
            newJob.set("parentJobId", pJobId);
            newJob.set("statusId", "SERVICE_PENDING");
            newJob.set("startDateTime", null);
            newJob.set("runByInstanceId", null);
            newJob.set("runTime", new java.sql.Timestamp(next));
            if (isRetryOnFailure) {
                newJob.set("currentRetryCount", new Long(currentRetryCount + 1));
            } else {
                newJob.set("currentRetryCount", new Long(0));
            }
            nextRecurrence = next;
            delegator.createSetNextSeqId(newJob);
            if (Debug.verboseOn()) Debug.logVerbose("Created next job entry: " + newJob, module);
        }
    }

    @Override
    protected void finish(Map<String, Object> result) throws InvalidJobException {
        super.finish(result);
        // set the finish date
        jobValue.set("statusId", "SERVICE_FINISHED");
        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
        String jobResult = null;
        if (ServiceUtil.isError(result)) {
            jobResult = StringUtils.substring(ServiceUtil.getErrorMessage(result), 0, 255);
        } else {
            jobResult = StringUtils.substring(ServiceUtil.makeSuccessMessage(result, "", "", "", ""), 0, 255);
        }
        if (UtilValidate.isNotEmpty(jobResult)) {
            jobValue.set("jobResult", jobResult);
        }
        try {
            jobValue.store();
        } catch (GenericEntityException e) {
            Debug.logError(e, "Cannot update the job [" + getJobId() + "] sandbox", module);
        }
    }

    @Override
    protected void failed(Throwable t) throws InvalidJobException {
        super.failed(t);
        // if the job has not been re-scheduled; we need to re-schedule and run again
        if (nextRecurrence == -1) {
            if (this.canRetry()) {
                // create a recurrence
                Calendar cal = Calendar.getInstance();
                try {
                    cal.add(Calendar.MINUTE, ServiceConfigUtil.getServiceEngine().getThreadPool().getFailedRetryMin());
                } catch (GenericConfigException e) {
                    Debug.logWarning(e, "Unable to get retry minutes for job [" + getJobId() + "], defaulting to now: ", module);
                }
                long next = cal.getTimeInMillis();
                try {
                    createRecurrence(next, true);
                } catch (GenericEntityException e) {
                    Debug.logError(e, "Unable to re-schedule job [" + getJobId() + "]: ", module);
                }
                Debug.logInfo("Persisted Job [" + getJobId() + "] Failed. Re-Scheduling : " + next, module);
            } else {
                Debug.logWarning("Persisted Job [" + getJobId() + "] Failed. Max Retry Hit, not re-scheduling", module);
            }
        }
        // set the failed status
        jobValue.set("statusId", "SERVICE_FAILED");
        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
        jobValue.set("jobResult", StringUtils.substring(t.getMessage(), 0, 255));
        try {
            jobValue.store();
        } catch (GenericEntityException e) {
            Debug.logError(e, "Cannot update the JobSandbox entity", module);
        }
    }

    @Override
    protected String getServiceName() {
        if (jobValue == null || jobValue.get("serviceName") == null) {
            return null;
        }
        return jobValue.getString("serviceName");
    }

    @Override
    protected Map<String, Object> getContext() throws InvalidJobException {
        Map<String, Object> context = null;
        try {
            if (!UtilValidate.isEmpty(jobValue.getString("runtimeDataId"))) {
                GenericValue contextObj = jobValue.getRelatedOne("RuntimeData", false);
                if (contextObj != null) {
                    context = UtilGenerics.checkMap(XmlSerializer.deserialize(contextObj.getString("runtimeInfo"), delegator), String.class, Object.class);
                }
            }
            if (context == null) {
                context = new HashMap<String, Object>();
            }
            // check the runAsUser
            if (!UtilValidate.isEmpty(jobValue.getString("runAsUser"))) {
                context.put("userLogin", ServiceUtil.getUserLogin(dctx, context, jobValue.getString("runAsUser")));
            }
        } catch (GenericEntityException e) {
            Debug.logError(e, "PersistedServiceJob.getContext(): Entity Exception", module);
        } catch (SerializeException e) {
            Debug.logError(e, "PersistedServiceJob.getContext(): Serialize Exception", module);
        } catch (ParserConfigurationException e) {
            Debug.logError(e, "PersistedServiceJob.getContext(): Parse Exception", module);
        } catch (SAXException e) {
            Debug.logError(e, "PersistedServiceJob.getContext(): SAXException", module);
        } catch (IOException e) {
            Debug.logError(e, "PersistedServiceJob.getContext(): IOException", module);
        }
        if (context == null) {
            Debug.logError("Job context is null", module);
        }
        return context;
    }

    // returns the number of current retries
    private long getRetries(Delegator delegator) {
        String pJobId = jobValue.getString("parentJobId");
        if (pJobId == null) {
            return 0;
        }
        long count = 0;
        try {
            EntityFieldMap ecl = EntityCondition.makeConditionMap("parentJobId", pJobId, "statusId", "SERVICE_FAILED");
            count = delegator.findCountByCondition("JobSandbox", ecl, null, null);
        } catch (GenericEntityException e) {
            Debug.logError(e, "Exception thrown while counting retries: ", module);
        }
        return count + 1; // add one for the parent
    }

    private boolean canRetry() {
        if (maxRetry == -1) {
            return true;
        }
        return currentRetryCount < maxRetry;
    }

    private RecurrenceInfo getRecurrenceInfo() {
        try {
            if (UtilValidate.isNotEmpty(jobValue.getString("recurrenceInfoId"))) {
                GenericValue ri = jobValue.getRelatedOne("RecurrenceInfo", false);
                if (ri != null) {
                    return new RecurrenceInfo(ri);
                }
            }
        } catch (GenericEntityException e) {
            Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module);
        } catch (RecurrenceInfoException re) {
            Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module);
        }
        return null;
    }

    @Override
    public void deQueue() throws InvalidJobException {
        if (currentState != State.QUEUED) {
            throw new InvalidJobException("Illegal state change");
        }
        currentState = State.CREATED;
        try {
            jobValue.refresh();
            jobValue.set("startDateTime", null);
            jobValue.set("runByInstanceId", null);
            jobValue.set("statusId", "SERVICE_PENDING");
            jobValue.store();
        } catch (GenericEntityException e) {
            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
        }
        if (Debug.verboseOn()) {
            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
        }
    }

    @Override
    public Date getStartTime() {
        return new Date(startTime);
    }
}
