/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.taverna.platform.execution.impl.local;

import static java.util.logging.Level.SEVERE;
import static org.apache.taverna.platform.execution.impl.local.T2ReferenceConverter.convertPathToObject;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Logger;

import org.apache.taverna.facade.ResultListener;
import org.apache.taverna.facade.WorkflowInstanceFacade;
import org.apache.taverna.invocation.InvocationContext;
import org.apache.taverna.invocation.TokenOrderException;
import org.apache.taverna.invocation.WorkflowDataToken;
import org.apache.taverna.monitor.MonitorManager;
import org.apache.taverna.provenance.reporter.ProvenanceReporter;
import org.apache.taverna.reference.ReferenceService;
import org.apache.taverna.reference.T2Reference;
import org.apache.taverna.workflowmodel.Dataflow;
import org.apache.taverna.workflowmodel.DataflowInputPort;
import org.apache.taverna.workflowmodel.Edits;
import org.apache.taverna.workflowmodel.InvalidDataflowException;

import org.apache.taverna.robundle.Bundle;

import org.apache.taverna.databundle.DataBundles;
import org.apache.taverna.platform.capability.api.ActivityService;
import org.apache.taverna.platform.capability.api.DispatchLayerService;
import org.apache.taverna.platform.execution.api.AbstractExecution;
import org.apache.taverna.platform.execution.api.InvalidWorkflowException;
import org.apache.taverna.platform.report.ActivityReport;
import org.apache.taverna.platform.report.ProcessorReport;
import org.apache.taverna.platform.report.WorkflowReport;
import org.apache.taverna.scufl2.api.container.WorkflowBundle;
import org.apache.taverna.scufl2.api.core.Workflow;
import org.apache.taverna.scufl2.api.profiles.Profile;

/**
 * An {@link org.apache.taverna.platform.execution.api.Execution Execution} for
 * executing Taverna workflows on a local Taverna Dataflow Engine.
 * 
 * @author David Withers
 */
public class LocalExecution extends AbstractExecution implements ResultListener {

	private static Logger logger = Logger.getLogger(LocalExecution.class
			.getName());

	private final WorkflowToDataflowMapper mapping;

	private final WorkflowInstanceFacade facade;

	private final LocalExecutionMonitor executionMonitor;

	private final ReferenceService referenceService;

	private final Map<String, DataflowInputPort> inputPorts = new HashMap<String, DataflowInputPort>();

	/**
	 * Constructs an Execution for executing Taverna workflows on a local
	 * Taverna Dataflow Engine.
	 * 
	 * @param workflowBundle
	 *            the <code>WorkflowBundle</code> containing the
	 *            <code>Workflow</code>s required for execution
	 * @param workflow
	 *            the <code>Workflow</code> to execute
	 * @param profile
	 *            the <code>Profile</code> to use when executing the
	 *            <code>Workflow</code>
	 * @param dataBundle
	 *            the <code>Bundle</code> containing the data values for the
	 *            <code>Workflow</code>
	 * @param referenceService
	 *            the <code>ReferenceService</code> used to register inputs,
	 *            outputs and intermediate values
	 * @throws InvalidWorkflowException
	 *             if the specified workflow is invalid
	 */
	public LocalExecution(WorkflowBundle workflowBundle, Workflow workflow,
			Profile profile, Bundle dataBundle,
			ReferenceService referenceService, Edits edits,
			ActivityService activityService,
			DispatchLayerService dispatchLayerService)
			throws InvalidWorkflowException {
		super(workflowBundle, workflow, profile, dataBundle);
		this.referenceService = referenceService;
		try {
			mapping = new WorkflowToDataflowMapper(workflowBundle, profile,
					edits, activityService, dispatchLayerService);
			Dataflow dataflow = mapping.getDataflow(workflow);
			for (DataflowInputPort dataflowInputPort : dataflow.getInputPorts())
				inputPorts.put(dataflowInputPort.getName(), dataflowInputPort);
			facade = edits.createWorkflowInstanceFacade(dataflow,
					createContext(), "");
			executionMonitor = new LocalExecutionMonitor(getWorkflowReport(),
					getDataBundle(), mapping, facade.getIdentifier());
		} catch (InvalidDataflowException e) {
			throw new InvalidWorkflowException(e);
		}
	}

	@Override
	public void delete() {
		cancel();
	}

	@Override
	public void start() {
		MonitorManager.getInstance().addObserver(executionMonitor);
		/*
		 * have to add a result listener otherwise facade doesn't record when
		 * workflow is finished
		 */
		facade.addResultListener(this);
		facade.fire();
		try {
			if (DataBundles.hasInputs(getDataBundle())) {
				Path inputs = DataBundles.getInputs(getDataBundle());
				for (Entry<String, DataflowInputPort> inputPort : inputPorts
						.entrySet()) {
					String portName = inputPort.getKey();
					Path path = DataBundles.getPort(inputs, portName);
					if (!DataBundles.isMissing(path)) {
						T2Reference identifier = referenceService.register(
								convertPathToObject(path), inputPort.getValue()
										.getDepth(), true, null);
						int[] index = new int[] {};
						WorkflowDataToken token = new WorkflowDataToken("",
								index, identifier, facade.getContext());
						try {
							facade.pushData(token, portName);
						} catch (TokenOrderException e) {
							logger.log(SEVERE, "Unable to push data for input "
									+ portName, e);
						}
					}
				}
			}
		} catch (IOException e) {
			logger.log(SEVERE, "Error getting input data", e);
		}
	}

	@Override
	public void pause() {
		facade.pauseWorkflowRun();
	}

	@Override
	public void resume() {
		facade.resumeWorkflowRun();
	}

	@Override
	public void cancel() {
		facade.cancelWorkflowRun();
		facade.removeResultListener(this);
		MonitorManager.getInstance().removeObserver(executionMonitor);
	}

	@Override
	protected WorkflowReport createWorkflowReport(Workflow workflow) {
		return new WorkflowReport(workflow);
	}

	@Override
	public ProcessorReport createProcessorReport(
			org.apache.taverna.scufl2.api.core.Processor processor) {
		return new LocalProcessorReport(processor);
	}

	@Override
	public ActivityReport createActivityReport(
			org.apache.taverna.scufl2.api.activity.Activity activity) {
		return new ActivityReport(activity);
	}

	private InvocationContext createContext() {
		InvocationContext context = new InvocationContext() {
			private List<Object> entities = Collections
					.synchronizedList(new ArrayList<Object>());

			@Override
			public <T> List<T> getEntities(Class<T> entityType) {
				List<T> entitiesOfType = new ArrayList<>();
				synchronized (entities) {
					for (Object entity : entities)
						if (entityType.isInstance(entity))
							entitiesOfType.add(entityType.cast(entity));
				}
				return entitiesOfType;
			}

			@Override
			public void addEntity(Object entity) {
				entities.add(entity);
			}

			@Override
			public ReferenceService getReferenceService() {
				return referenceService;
			}

			@Override
			public ProvenanceReporter getProvenanceReporter() {
				return null;
			}

		};
		return context;
	}

	@Override
	public void resultTokenProduced(WorkflowDataToken token, String portName) {
	}
}
