/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.ode.dao.jpa;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.FaultDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.dao.ScopeStateEnum;
import org.apache.ode.bpel.dao.XmlDataDAO;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.w3c.dom.Element;

import javax.persistence.Basic;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Entity
@Table(name="ODE_PROCESS_INSTANCE")
@NamedQueries({
    @NamedQuery(name=ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS, query="delete from ProcessInstanceDAOImpl as i where i._process = :process"),
    @NamedQuery(name=ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS, query="select i._instanceId from ProcessInstanceDAOImpl as i where i._process = :process"),
    @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS, query="select count(i._instanceId) from ProcessInstanceDAOImpl as i where i._process = :process"),
    @NamedQuery(name=ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS, query="select i._faultId from ProcessInstanceDAOImpl as i where i._process = :process and i._faultId is not null"),
    @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID,
            query="select count(i._instanceId), max(i._lastRecovery) from ProcessInstanceDAOImpl as i where i._process._processId = :processId and i._state in(:states) and exists(select r from ActivityRecoveryDAOImpl r where i = r._instance)"),
    @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS,
            query="select i._process._processId, count(i._instanceId), max(i._lastRecovery) from ProcessInstanceDAOImpl as i where i._process._processId in (:processIds) and i._state = 20 and exists(select r from ActivityRecoveryDAOImpl r where i = r._instance) group by i._process._processId"),
    @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES, 
            query="select i._process._processId, count(i._instanceId) from ProcessInstanceDAOImpl as i where i._process._processId in (:processIds) and i._state in(:states) group by i._process._processId")
})
public class ProcessInstanceDAOImpl extends OpenJPADAO implements ProcessInstanceDAO {
    private static final Logger __log = LoggerFactory.getLogger(ProcessInstanceDAOImpl.class);

    public final static String DELETE_INSTANCES_BY_PROCESS = "DELETE_INSTANCES_BY_PROCESS";
    public final static String SELECT_INSTANCE_IDS_BY_PROCESS = "SELECT_INSTANCE_IDS_BY_PROCESS";
    public final static String COUNT_INSTANCE_IDS_BY_PROCESS = "COUNT_INSTANCE_IDS_BY_PROCESS";

    public final static String SELECT_FAULT_IDS_BY_PROCESS = "SELECT_FAULT_IDS_BY_PROCESS";
    public final static String COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID = "COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID";
    public final static String COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS = "COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS";    
    public static final String COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES = "COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES";
    
    @Id @Column(name="ID")
    @GeneratedValue(strategy=GenerationType.AUTO)
    private Long _instanceId;
    @Basic @Column(name="LAST_RECOVERY_DATE")
    private Date _lastRecovery;
    @Basic @Column(name="LAST_ACTIVE_TIME")
    private Date _lastActive;
    @Basic @Column(name="INSTANCE_STATE")
    private short _state;
    @Basic @Column(name="PREVIOUS_STATE")
    private short _previousState;
    @Lob @Column(name="EXECUTION_STATE")
    private byte[] _executionState;
    @Basic @Column(name="SEQUENCE")
    private long _sequence;
    @Basic @Column(name="DATE_CREATED")
    private Date _dateCreated = new Date();

    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @Column(name="ROOT_SCOPE_ID")
    private ScopeDAOImpl _rootScope;
    @OneToMany(targetEntity=ScopeDAOImpl.class,mappedBy="_processInstance",fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH})
    private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
    @OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
    private Collection<ActivityRecoveryDAO> _recoveries = new ArrayList<ActivityRecoveryDAO>();

    @SuppressWarnings("unused")
    @Basic @Column(name="FAULT_ID", insertable=false, updatable=false, nullable=true)
    private long _faultId;
    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @Column(name="FAULT_ID")
    private FaultDAOImpl _fault;
    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
    private ProcessDAOImpl _process;
    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="INSTANTIATING_CORRELATOR_ID")
    private CorrelatorDAOImpl _instantiatingCorrelator;

    @OneToMany(targetEntity=MessageExchangeDAOImpl.class,mappedBy="_processInst",fetch=FetchType.LAZY)
    @SuppressWarnings("unused")
    private Collection<MessageExchangeDAO> _messageExchanges = new ArrayList<MessageExchangeDAO>();

    private transient int _activityFailureCount = -1;

    public ProcessInstanceDAOImpl() {}
    public ProcessInstanceDAOImpl(CorrelatorDAOImpl correlator, ProcessDAOImpl process) {
        _instantiatingCorrelator = correlator;
        _process = process;
    }

    public void createActivityRecovery(String channel, long activityId,
            String reason, Date dateTime, Element data, String[] actions,
            int retries) {
        ActivityRecoveryDAOImpl ar = new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries);
        _recoveries.add(ar);
        ar.setInstance(this);
        _lastRecovery = dateTime;
    }

    public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
        ScopeDAOImpl ret = new ScopeDAOImpl((ScopeDAOImpl)parentScope,name,scopeModelId,this);
        ret.setState(ScopeStateEnum.ACTIVE);
        _scopes.add(ret);
        _rootScope = (parentScope == null)?ret:_rootScope;

        // Must persist the scope to generate a scope ID
        getEM().persist(ret);
        return ret;
    }

    @SuppressWarnings("unchecked")
    public Collection<CorrelationSetDAO> selectCorrelationSets(Collection<ProcessInstanceDAO> instances) {
        return getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SETS_BY_INSTANCES).setParameter("instances", instances).getResultList();
    }

    public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
        delete(cleanupCategories, true);
    }

    public void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex) {
        if(__log.isDebugEnabled()) __log.debug("Cleaning up instance Data with " + cleanupCategories);

        // remove jacob state
        setExecutionState(null);
        if (getEM() != null) {
            if( !cleanupCategories.isEmpty() ) {
                // by default, we do not flush before select; flush it, so we can delete no matter if an entity is loaded up
                // or not; more importantly, OpenJPA will secretly load from the entire table if some entities reside only
                // in memory
                getEM().flush();
            }

            if (cleanupCategories.contains(CLEANUP_CATEGORY.EVENTS)) {
                deleteEvents();
            }
            if (cleanupCategories.contains(CLEANUP_CATEGORY.CORRELATIONS)) {
                deleteCorrelations();
            }
            if( cleanupCategories.contains(CLEANUP_CATEGORY.MESSAGES) ) {
                deleteMessageRoutes();
            }
            if (cleanupCategories.contains(CLEANUP_CATEGORY.VARIABLES)) {
                deleteVariables();
            }
            if (cleanupCategories.contains(CLEANUP_CATEGORY.INSTANCE)) {
                deleteInstance();
            }

            getEM().flush();
        }
    }

    private void deleteInstance() {
        if( _fault != null ) {
            getEM().remove(_fault);
        }
        getEM().remove(this); // This deletes ActivityRecoveryDAO
    }

    @SuppressWarnings("unchecked")
    private void deleteVariables() {
        Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
        batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds");
        Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds");

        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds");
        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
    }

    private void deleteMessageRoutes() {
        getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
    }

    @SuppressWarnings("unchecked")
    private void deleteCorrelations() {
        Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
        batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds");
        batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids");
    }

    private void deleteEvents() {
        getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
    }

    public void deleteActivityRecovery(String channel) {
        ActivityRecoveryDAOImpl toRemove = null;
        for (ActivityRecoveryDAO _recovery : _recoveries) {
            ActivityRecoveryDAOImpl arElement = (ActivityRecoveryDAOImpl) _recovery;
            if (arElement.getChannel().equals(channel)) {
                toRemove = arElement;
                break;
            }
        }
        if (toRemove != null) {
            getEM().remove(toRemove);
            _recoveries.remove(toRemove);
        }

    }

    public void finishCompletion() {
        // make sure we have completed.
        assert (ProcessState.isFinished(this.getState()));
        // let our process know that we've done our work.
    }

    public long genMonotonic() {
        return _sequence++;
    }

    public int getActivityFailureCount() {
        if( _activityFailureCount == -1 ) {
            _activityFailureCount = _recoveries.size();
        }

        return _activityFailureCount;
    }

    public void setActivityFailureCount(int activityFailureCount) {
        _activityFailureCount = activityFailureCount;
    }

    public Date getActivityFailureDateTime() {
        return _lastRecovery;
    }

    public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
        return _recoveries;
    }

    public CorrelationSetDAO getCorrelationSet(String name) {
        //    TODO: should this method be deprecated?

        //  Its not clear where the correlation set for the process is used
        //  or populated.

        throw new UnsupportedOperationException();

        //return null;
    }

    public Set<CorrelationSetDAO> getCorrelationSets() {
        //    TODO: should this method be deprecated?
        //  Its not clear where the correlation set for the process is used
        //  or populated.
        return new HashSet<CorrelationSetDAO>();
    }

    public Date getCreateTime() {
        return _dateCreated;
    }

    public EventsFirstLastCountTuple getEventsFirstLastCount() {
        // TODO Auto-generated method stub
        return null;
    }

    public byte[] getExecutionState() {
        return _executionState;
    }

    public FaultDAO getFault() {
        return _fault;
    }

    public Long getInstanceId() {
        return _instanceId;
    }

    public CorrelatorDAO getInstantiatingCorrelator() {
        return _instantiatingCorrelator;
    }

    public Date getLastActiveTime() {
        return _lastActive;
    }

    public short getPreviousState() {
        return _previousState;
    }

    public ProcessDAO getProcess() {
        return _process;
    }

    public ScopeDAO getRootScope() {
        return _rootScope;
    }

    public ScopeDAO getScope(Long scopeInstanceId) {
        return getEM().find(ScopeDAOImpl.class, scopeInstanceId);
    }

    public Collection<ScopeDAO> getScopes(String scopeName) {
        Collection<ScopeDAO> ret = new ArrayList<ScopeDAO>();

        for (ScopeDAO sElement : _scopes) {
            if ( sElement.getName().equals(scopeName)) ret.add(sElement);
        }
        return ret;
    }

    public Collection<ScopeDAO> getScopes() {
        return _scopes;
    }

    public short getState() {
        return _state;
    }

    public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {

        //TODO: This method is not used and should be considered a deprecation candidate.

        List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();

        for (ScopeDAO sElement : _scopes) {
            if ( sElement.getModelId() == scopeModelId) {
                XmlDataDAO var = sElement.getVariable(variableName);
                if ( var != null ) results.add(var);
            }
        }
        return results.toArray(new XmlDataDAO[results.size()]);
    }

    public void insertBpelEvent(ProcessInstanceEvent event) {
        getConn().insertBpelEvent(event, getProcess(), this);
    }

    public void setExecutionState(byte[] execState) {
        _executionState = execState;
    }

    public void setFault(FaultDAO fault) {
        _fault = (FaultDAOImpl)fault;
    }

    public void setFault(QName faultName, String explanation, int faultLineNo,
            int activityId, Element faultMessage) {
        _fault = new FaultDAOImpl(faultName,explanation,faultLineNo,activityId,faultMessage);
    }

    public void setLastActiveTime(Date dt) {
        _lastActive = dt;
    }

    public void setState(short state) {
        _previousState = _state;
        _state = state;
        if (state == ProcessState.STATE_TERMINATED) {
          deleteMessageRoutes();
        }
    }

    void removeRoutes(String routeGroupId) {
        _process.removeRoutes(routeGroupId, this);
    }

    public BpelDAOConnection getConnection() {
        return new BPELDAOConnectionImpl(getEM());
    }

    public Collection<String> getMessageExchangeIds() {
        Collection<String> c = new HashSet<String>();
        for (MessageExchangeDAO m : _messageExchanges) {
            c.add(m.getMessageExchangeId());
        }
        return c;
    }
}
