/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.executor.jpa;

import java.sql.Timestamp;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.persistence.EntityManager;
import javax.persistence.NoResultException;
import javax.persistence.Query;

import org.apache.oozie.BulkResponseInfo;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.rest.BulkResponseImpl;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;

/**
 * The query executor class for bulk monitoring queries i.e. debugging bundle coord actions directly
 */
public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
    private Map<String, List<String>> bulkFilter;
    // defaults
    private int start = 1;
    private int len = 50;
    private enum PARAM_TYPE {
        ID, NAME
    }

    public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
        this.bulkFilter = Objects.requireNonNull(bulkFilter, "bulkFilter cannot be null");
        this.start = start;
        this.len = len;
    }

    @Override
    public String getName() {
        return "BulkJPAExecutor";
    }

    @Override
    public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
        List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
        Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();

        try {
            List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD);
            List<String> statuses = bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS);
            List<String> params = new ArrayList<String>();

            // Lightweight Query 1 on Bundle level to fetch the bundle job(s)
            // corresponding to names or ids
            List<BundleJobBean> bundleBeans = bundleQuery(em);

            // Join query between coordinator job and coordinator action tables
            // to get entries for specific bundleId only
            String conditions = actionQuery(coords, params, statuses, em, bundleBeans, actionTimes, responseList);

            // Query to get the count of records
            long total = countQuery(statuses, params, conditions, em, bundleBeans, actionTimes);

            BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
            return bulk;
        }
        catch (Exception e) {
            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
        }
    }

    /**
     * build the bundle level query to get bundle beans for the specified ids or appnames
     * @param em
     * @return List BundleJobBeans
     * @throws JPAExecutorException
     */
    @SuppressWarnings("unchecked")
    private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException {
        Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
        StringBuilder bundleQuery = new StringBuilder(q.toString());

        StringBuilder whereClause = null;
        List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE);
        if (bundles != null) {
            PARAM_TYPE type = getParamType(bundles.get(0), 'B');
            if (type == PARAM_TYPE.NAME) {
                whereClause = inClause(bundles.size(), "appName", 'b', "bundles");
            }
            else if (type == PARAM_TYPE.ID) {
                whereClause = inClause(bundles.size(), "id", 'b', "bundles");
            }

            // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...)
            bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE"));
            Query tmp = em.createQuery(bundleQuery.toString());

            fillParameters(tmp, "bundles", bundles);

            List<Object[]> bundleObjs = (List<Object[]>) tmp.getResultList();
            if (bundleObjs.isEmpty()) {
                final String message = "No entries found for given bundle(s)";
                throw new JPAExecutorException(ErrorCode.E0603, message, new NoResultException(message));
            }

            List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>();
            for (Object[] bundleElem : bundleObjs) {
                bundleBeans.add(constructBundleBean(bundleElem));
            }
            return bundleBeans;
        }
        return null;
    }

    /**
     * Validate and determine whether passed param is job-id or appname
     * @param id
     * @param job
     * @return PARAM_TYPE
     */
    private PARAM_TYPE getParamType(String id, char job) {
        Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job);
        Matcher m = p.matcher(id);
        if (m.matches()) {
            return PARAM_TYPE.ID;
        }
        return PARAM_TYPE.NAME;
    }

    /**
     * Compose the coord action level query comprising bundle id/appname filter and coord action
     * status filter (if specified) and start-time or nominal-time filter (if specified)
     * @param em
     * @param bundles
     * @param times
     * @param responseList
     * @return Query string
     * @throws ParseException
     */
    @SuppressWarnings("unchecked")
    private String actionQuery(final List<String> coords, final List<String> params, List<String> statuses, EntityManager em,
                               List<BundleJobBean> bundles, Map<String, Timestamp> times, List<BulkResponseImpl> responseList)
                               throws ParseException {
        Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
        StringBuilder getActions = new StringBuilder(q.toString());
        int offset = getActions.indexOf("ORDER");
        StringBuilder conditionClause = new StringBuilder();

        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
        // AND c.bundleId = :bundleId AND c.appName/id IN (...)

        if (coords != null) {
            PARAM_TYPE type = getParamType(coords.get(0), 'C');
            if (type == PARAM_TYPE.NAME) {
                conditionClause.append(inClause(coords.size(), "appName", 'c', "param"));
                params.addAll(coords);
            }
            else if (type == PARAM_TYPE.ID) {
                conditionClause.append(inClause(coords.size(), "id", 'c', "param"));
                params.addAll(coords);
            }
        }
        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
        conditionClause.append(statusClause(statuses));

        offset = getActions.indexOf("ORDER");
        getActions.insert(offset - 1, conditionClause);

        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
        // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated
        // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal
        timesClause(getActions, offset, times);
        q = em.createQuery(getActions.toString());
        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, Timestamp> time = iter.next();
            q.setParameter(time.getKey(), time.getValue());
        }
        // pagination
        q.setFirstResult(start - 1);
        q.setMaxResults(len);

        if (coords != null) {
            fillParameters(q, "param",  coords);
        }

        if (statuses != null) {
            fillParameters(q, "status", statuses);
        }

        // repeatedly execute above query for each bundle
        for (BundleJobBean bundle : bundles) {
            q.setParameter("bundleId", bundle.getId());
            List<Object[]> response = q.getResultList();
            for (Object[] r : response) {
                BulkResponseImpl br = getResponseFromObject(bundle, r);
                responseList.add(br);
            }
        }
        return q.toString();
    }

    /**
     * Get total number of records for use with offset and len in API
     * @param clause
     * @param em
     * @param bundles
     * @return total count of coord actions
     */
    private long countQuery(List<String> statuses, List<String> params, String clause, EntityManager em,
                            List<BundleJobBean> bundles, Map<String, Timestamp> times) {
        Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
        StringBuilder getTotal = new StringBuilder(q.toString() + " ");
        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c
        // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job
        // and action status and times
        getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
        int offset = getTotal.indexOf("bundleId");
        List<String> bundleIds = new ArrayList<String>();
        for (BundleJobBean bundle : bundles) {
            bundleIds.add(bundle.getId());
        }
        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ...
        // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list
        getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds.size(), "bundleId", 'c', "count").toString());
        q = em.createQuery(getTotal.toString());

        fillParameters(q, "count", bundleIds);

        if (statuses != null) {
            fillParameters(q, "status", statuses);
        }

        if (params != null) {
            fillParameters(q, "param",  params);
        }

        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, Timestamp> time = iter.next();
            q.setParameter(time.getKey(), time.getValue());
        }
        long total = ((Long) q.getSingleResult()).longValue();
        return total;
    }

    // Form the where clause to filter by coordinator appname/id
    private StringBuilder inClause(int noOfValues, String col, char type, String paramPrefix) {
        StringBuilder sb = new StringBuilder();
        boolean firstVal = true;

        for (int i = 0; i < noOfValues; i++) {
            if (firstVal) {
                sb.append(" AND " + type + "." + col + " IN (:" + paramPrefix + i);
                firstVal = false;
            }
            else {
                sb.append(", :" + paramPrefix + i);
            }
        }
        if (!firstVal) {
            sb.append(") ");
        }

        return sb;
    }

    // Form the where clause to filter by coord action status
    private StringBuilder statusClause(List<String> statuses) {
        StringBuilder sb = new StringBuilder();

        if (statuses != null) {
            sb = inClause(statuses.size(), "statusStr", 'a', "status");
        }

        if (sb.length() == 0) { // statuses was null. adding default
            sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') ");
        }

        return sb;
    }

    private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
        Timestamp ts = null;
        List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
        if (times != null) {
            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
            sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
            eachTime.put("startCreated", ts);
        }
        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
        if (times != null) {
            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
            sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
            eachTime.put("endCreated", ts);
        }
        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
        if (times != null) {
            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
            sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
            eachTime.put("startNominal", ts);
        }
        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
        if (times != null) {
            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
            sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
            eachTime.put("endNominal", ts);
        }
    }

    private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
        BulkResponseImpl bean = new BulkResponseImpl();
        CoordinatorJobBean coordBean = new CoordinatorJobBean();
        CoordinatorActionBean actionBean = new CoordinatorActionBean();
        if (arr[0] != null) {
            actionBean.setId((String) arr[0]);
        }
        if (arr[1] != null) {
            actionBean.setActionNumber((Integer) arr[1]);
        }
        if (arr[2] != null) {
            actionBean.setErrorCode((String) arr[2]);
        }
        if (arr[3] != null) {
            actionBean.setErrorMessage((String) arr[3]);
        }
        if (arr[4] != null) {
            actionBean.setExternalId((String) arr[4]);
        }
        if (arr[5] != null) {
            actionBean.setExternalStatus((String) arr[5]);
        }
        if (arr[6] != null) {
            actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
        }
        if (arr[7] != null) {
            actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
        }
        if (arr[8] != null) {
            actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
        }
        if (arr[9] != null) {
            actionBean.setMissingDependenciesBlob((StringBlob) arr[9]);
        }
        if (arr[10] != null) {
            coordBean.setId((String) arr[10]);
            actionBean.setJobId((String) arr[10]);
        }
        if (arr[11] != null) {
            coordBean.setAppName((String) arr[11]);
        }
        if (arr[12] != null) {
            coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
        }
        bean.setBundle(bundleBean);
        bean.setCoordinator(coordBean);
        bean.setAction(actionBean);
        return bean;
    }

    private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException {
        BundleJobBean bean = new BundleJobBean();
        if (barr[0] != null) {
            bean.setId((String) barr[0]);
        }
        else {
            throw new JPAExecutorException(ErrorCode.E0603,
                    "bundleId returned by query is null - cannot retrieve bulk results");
        }
        if (barr[1] != null) {
            bean.setAppName((String) barr[1]);
        }
        if (barr[2] != null) {
            bean.setStatus(BundleJob.Status.valueOf((String) barr[2]));
        }
        if (barr[3] != null) {
            bean.setUser((String) barr[3]);
        }
        return bean;
    }

    private void fillParameters(Query query, String prefix, List<?> values) {
        for (int i = 0; i < values.size(); i++) {
            query.setParameter(prefix + i, values.get(i));
        }
    }

    // null safeguard
    public static List<String> nullToEmpty(List<String> input) {
        return input == null ? Collections.<String> emptyList() : input;
    }

}
