/*******************************************************************************
 * Copyright (C) 2007 The University of Manchester   
 * 
 *  Modifications to the initial code base are copyright of their
 *  respective authors, or their employers as appropriate.
 * 
 *  This program is free software; you can redistribute it and/or
 *  modify it under the terms of the GNU Lesser General Public License
 *  as published by the Free Software Foundation; either version 2.1 of
 *  the License, or (at your option) any later version.
 *    
 *  This program is distributed in the hope that it will be useful, but
 *  WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 *  Lesser General Public License for more details.
 *    
 *  You should have received a copy of the GNU Lesser General Public
 *  License along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
 ******************************************************************************/
package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;

import static java.lang.System.currentTimeMillis;

import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import net.sf.taverna.t2.invocation.Event;
import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.workflowmodel.Dataflow;
import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;

import org.apache.log4j.Logger;

/**
 * Sits above the {@link Invoke} layer and collects information about the
 * current workflow run to be stored by the {@link ProvenanceConnector}.
 * 
 * @author Ian Dunlop
 * @author Stian Soiland-Reyes
 * 
 */
public class IntermediateProvenance extends AbstractDispatchLayer<String> {
	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
	private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);

	private ProvenanceReporter reporter;
	private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
	private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
	private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();

	// private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
	// private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();

	private WorkflowProvenanceItem workflowItem;

	@Override
	public void configure(String o) {
	}

	/**
	 * A set of provenance events for a particular owning process has been
	 * finished with so you can remove all the {@link IterationProvenanceItem}s
	 * from the map
	 */
	@Override
	public void finishedWith(String owningProcess) {
		processToIndexes.remove(owningProcess);
	}

	@Override
	public String getConfiguration() {
		return null;
	}

	protected Map<String, IterationProvenanceItem> getIndexesByProcess(
			String owningProcess) {
		synchronized (processToIndexes) {
			Map<String, IterationProvenanceItem> indexes = processToIndexes
					.get(owningProcess);
			if (indexes == null) {
				indexes = new HashMap<>();
				processToIndexes.put(owningProcess, indexes);
			}
			return indexes;
		}
	}

	protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
		String owningProcess = event.getOwningProcess();
		int[] originalIndex = event.getIndex();
		int[] index = event.getIndex();
		String indexStr = indexStr(index);
		Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
		IterationProvenanceItem iterationProvenanceItem = null;
		synchronized (indexes) {
			// find the iteration item for the int index eg [1]
			iterationProvenanceItem = indexes.get(indexStr);
			// if it is null then strip the index and look again

			while (iterationProvenanceItem == null) {
				try {
					index = removeLastIndex(index);
					iterationProvenanceItem = indexes.get(indexStr(index));
					/*
					 * if we have a 'parent' iteration then create a new
					 * iteration for the original index and link it to the
					 * activity and the input data
					 * 
					 * FIXME should this be linked to the parent iteration
					 * instead?
					 */
					if (iterationProvenanceItem != null) {
						// set the index to the one from the event
						IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
						iterationProvenanceItem1.setIteration(originalIndex);
						iterationProvenanceItem1.setProcessId(owningProcess);
						iterationProvenanceItem1.setIdentifier(UUID
								.randomUUID().toString());
						iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
						iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
						iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
						iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());

//						for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
//								.entrySet()) {
//							List<Object> value = entrySet.getValue();
//							int[] newIndex = (int[]) value.get(0);
//							String owner = (String) value.get(1);
//							String indexString = indexStr(newIndex);
//							String indexString2 = indexStr(index);
//
//							if (owningProcess.equalsIgnoreCase(owner)
//									&& indexString
//											.equalsIgnoreCase(indexString2))
//								iterationProvenanceItem1.setParentId(entrySet
//										.getKey().getIdentifier());
//						}
//						for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
//								.entrySet()) {
//							List<Object> value = entrySet.getValue();
//							int[] newIndex = (int[]) value.get(0);
//							String owner = (String) value.get(1);
//							String indexString = indexStr(newIndex);
//							String indexString2 = indexStr(index);
//							if (owningProcess.equalsIgnoreCase(owner)
//									&& indexString
//											.equalsIgnoreCase(indexString2))
//								iterationProvenanceItem1
//										.setInputDataItem(entrySet.getKey());
//						}

						// for (ActivityProvenanceItem item :
						// activityProvenanceItemList) {
						// if (owningProcess.equalsIgnoreCase(item
						// .getProcessId())) {
						// iterationProvenanceItem1.setParentId(item
						// .getIdentifier());
						// }
						// }
						// for (InputDataProvenanceItem item :
						// inputDataProvenanceItemList) {
						// if (owningProcess.equalsIgnoreCase(item
						// .getProcessId())) {
						// iterationProvenanceItem1.setInputDataItem(item);
						// }
						// indexes.put(indexStr, iterationProvenanceItem1);
						// return iterationProvenanceItem1;
						// // }
						// }

						// add this new iteration item to the map
						getIndexesByProcess(event.getOwningProcess()).put(
								indexStr(event.getIndex()),
								iterationProvenanceItem1);
						return iterationProvenanceItem1;
					}
					/*
					 * if we have not found an iteration items and the index is
					 * [] then something is wrong remove the last index in the
					 * int array before we go back through the while
					 */
				} catch (IllegalStateException e) {
					logger
							.warn("Cannot find a parent iteration with index [] for owning process: "
									+ owningProcess
									+ "Workflow invocation is in an illegal state");
					throw e;
				}
			}

			// if (iterationProvenanceItem == null) {
			// logger.info("Iteration item was null for: "
			// + event.getOwningProcess() + " " + event.getIndex());
			// System.out.println("Iteration item was null for: "
			// + event.getOwningProcess() + " " + event.getIndex());
			// iterationProvenanceItem = new IterationProvenanceItem(index);
			// iterationProvenanceItem.setProcessId(owningProcess);
			// iterationProvenanceItem.setIdentifier(UUID.randomUUID()
			// .toString());
			// // for (ActivityProvenanceItem
			// item:activityProvenanceItemList)
			// // {
			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
			// // iterationProvenanceItem.setParentId(item.getIdentifier());
			// // }
			// // }
			// // for (InputDataProvenanceItem item:
			// // inputDataProvenanceItemList) {
			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
			// // iterationProvenanceItem.setInputDataItem(item);
			// // }
			// // }
			// indexes.put(indexStr, iterationProvenanceItem);

		}
		return iterationProvenanceItem;
	}

	private String indexStr(int[] index) {
		StringBuilder indexStr = new StringBuilder();
		for (int ind : index)
			indexStr.append(":").append(ind);
		return indexStr.toString();
	}

	/**
	 * Remove the last index of the int array in the form 1:2:3 etc
	 * 
	 * @param index
	 * @return
	 */
	@SuppressWarnings("unused")
	private String[] stripLastIndex(int[] index) {
		// will be in form :1:2:3
		return indexStr(index).split(":");
	}

	/**
	 * Remove the last value in the int array
	 * 
	 * @param index
	 * @return
	 */
	private int[] removeLastIndex(int[] index) {
		if (index.length == 0)
			throw new IllegalStateException(
					"There is no parent iteration of index [] for this result");
		int[] newIntArray = new int[index.length - 1];
		for (int i = 0; i < index.length - 1; i++)
			newIntArray[i] = index[i];
		return newIntArray;
	}

	private static String uuid() {
		return UUID.randomUUID().toString();
	}

	/**
	 * Create an {@link ErrorProvenanceItem} and send across to the
	 * {@link ProvenanceConnector}
	 */
	@Override
	public void receiveError(DispatchErrorEvent errorEvent) {
		IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
		// get using errorEvent.getOwningProcess();
		
		ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
		errorItem.setCause(errorEvent
				.getCause());
		errorItem.setErrorType(errorEvent
				.getFailureType().toString());
		errorItem.setMessage(errorEvent.getMessage());
		
		errorItem.setProcessId(errorEvent.getOwningProcess());
		errorItem.setIdentifier(uuid());
		errorItem.setParentId(iterationProvItem.getIdentifier());
		// iterationProvItem.setErrorItem(errorItem);
		// FIXME don't need to add to the processor item earlier
		getReporter().addProvenanceItem(errorItem);
		super.receiveError(errorEvent);
	}

	/**
	 * Create the {@link ProvenanceItem}s and send them all across to the
	 * {@link ProvenanceConnector} except for the
	 * {@link IterationProvenanceItem}, this one is told what it's inputs are
	 * but has to wait until the results are received before being sent across.
	 * Each item has a unique identifier and also knows who its parent item is
	 */
	@Override
	public void receiveJob(DispatchJobEvent jobEvent) {
			try {
			// FIXME do we need this ProcessProvenanceItem?
			ProcessProvenanceItem provenanceItem;
			String[] split = jobEvent.getOwningProcess().split(":");
			provenanceItem = new ProcessProvenanceItem();
			String parentDataflowId = workflowItem.getParentId();
			provenanceItem.setWorkflowId(parentDataflowId);
			provenanceItem.setFacadeID(split[0]);
			provenanceItem.setDataflowID(split[1]);
			provenanceItem.setProcessId(jobEvent.getOwningProcess());
			provenanceItem.setIdentifier(uuid());
			provenanceItem.setParentId(workflowItem.getIdentifier());
			ProcessorProvenanceItem processorProvItem;
			processorProvItem = new ProcessorProvenanceItem();
			processorProvItem.setWorkflowId(parentDataflowId);
			processorProvItem.setProcessId(jobEvent
					.getOwningProcess());
			processorProvItem.setIdentifier(uuid());
			processorProvItem.setParentId(provenanceItem.getIdentifier());
			provenanceItem.setProcessId(jobEvent.getOwningProcess());
			getReporter().addProvenanceItem(provenanceItem);
			getReporter().addProvenanceItem(processorProvItem);
	
			IterationProvenanceItem iterationProvItem = null;
			iterationProvItem = new IterationProvenanceItem();
			iterationProvItem.setWorkflowId(parentDataflowId);
			iterationProvItem.setIteration(jobEvent.getIndex());
			iterationProvItem.setIdentifier(uuid());
			
			ReferenceService referenceService = jobEvent.getContext()
					.getReferenceService();
	
			InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
			inputDataItem.setDataMap(jobEvent.getData());
			inputDataItem.setReferenceService(referenceService);
			inputDataItem.setIdentifier(uuid());
			inputDataItem.setParentId(iterationProvItem.getIdentifier());
			inputDataItem.setProcessId(jobEvent.getOwningProcess());
	
			List<Object> inputIndexOwnerList = new ArrayList<>();
			inputIndexOwnerList.add(jobEvent.getIndex());
			inputIndexOwnerList.add(jobEvent.getOwningProcess());
			inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
	
			// inputDataProvenanceItemList.add(inputDataItem);
			iterationProvItem.setInputDataItem(inputDataItem);
			iterationProvItem.setIteration(jobEvent.getIndex());
			iterationProvItem.setProcessId(jobEvent.getOwningProcess());
	
			for (Activity<?> activity : jobEvent.getActivities())
				if (activity instanceof AsynchronousActivity) {
					ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
					activityProvItem.setWorkflowId(parentDataflowId);
					activityProvItem.setIdentifier(uuid());
					iterationProvItem.setParentId(activityProvItem.getIdentifier());
					// getConnector().addProvenanceItem(iterationProvItem);
					activityProvItem.setParentId(processorProvItem.getIdentifier());
					// processorProvItem.setActivityProvenanceItem(activityProvItem);
					activityProvItem.setProcessId(jobEvent.getOwningProcess());
					List<Object> activityIndexOwnerList = new ArrayList<>();
					activityIndexOwnerList.add(jobEvent.getOwningProcess());
					activityIndexOwnerList.add(jobEvent.getIndex());
					activityProvenanceItemMap.put(activityProvItem,
							inputIndexOwnerList);
					// activityProvenanceItemList.add(activityProvItem);
					// activityProvItem.setIterationProvenanceItem(iterationProvItem);
					getReporter().addProvenanceItem(activityProvItem);
					break;
				}
			getIndexesByProcess(jobEvent.getOwningProcess()).put(
					indexStr(jobEvent.getIndex()), iterationProvItem);
			iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
			getReporter().addProvenanceItem(iterationProvItem);
		} catch (RuntimeException ex) {
			logger.error("Could not store provenance for " + jobEvent, ex);
		}
		
		super.receiveJob(jobEvent);
	}

	@Override
	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
		super.receiveJobQueue(jobQueueEvent);
	}

	/**
	 * Populate an {@link OutputDataProvenanceItem} with the results and attach
	 * it to the appropriate {@link IterationProvenanceItem}. Then send the
	 * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
	 */
	@Override
	public void receiveResult(DispatchResultEvent resultEvent) {
		try {
			// FIXME use the connector from the result event context
			IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
			iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
			
			ReferenceService referenceService = resultEvent.getContext()
					.getReferenceService();

			OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
			outputDataItem.setDataMap(resultEvent.getData());
			outputDataItem.setReferenceService(referenceService);
			outputDataItem.setIdentifier(uuid());
			outputDataItem.setProcessId(resultEvent.getOwningProcess());
			outputDataItem.setParentId(iterationProvItem.getIdentifier());
			iterationProvItem.setOutputDataItem(outputDataItem);
			
			getReporter().addProvenanceItem(iterationProvItem);
			// getConnector().addProvenanceItem(outputDataItem);
	
			// PM -- testing
			// add xencoding of data value here??
	//		Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
	//		for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
	//			// create a simpler bean that we can serialize?
	//			
	//			T2Reference ref = entry.getValue();
	//			
	//			SimplerT2Reference t2RefBean = new SimplerT2Reference();
	//			t2RefBean.setReferenceType(ref.getReferenceType());
	//			t2RefBean.setDepth(ref.getDepth());
	//			t2RefBean.setLocalPart(ref.getLocalPart());
	//			t2RefBean.setNamespacePart(ref.getNamespacePart());
	//						
	//			System.out.println("data ref: "+ref);
	//			String serializedInput = SerializeParam(t2RefBean);
	//			System.out.println("serialized reference:" + serializedInput);
	//			
	//			System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
//		}
		} catch (Exception ex) {
			logger.error("Could not store provenance for "
					+ resultEvent.getOwningProcess() + " "
					+ Arrays.toString(resultEvent.getIndex()), ex);
			// But don't break super.receiveResult() !!
		}
		super.receiveResult(resultEvent);
	}

	@Override
	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
		super.receiveResultCompletion(completionEvent);
	}

	/**
	 * Tell this layer what {@link ProvenanceConnector} implementation is being
	 * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
	 * the connector from the result events context where possible
	 * 
	 * @param connector
	 */
	public void setReporter(ProvenanceReporter connector) {
		this.reporter = connector;
	}

	public ProvenanceReporter getReporter() {
		return reporter;
	}

	/**
	 * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
	 * enacted this layer has to know about the {@link WorkflowProvenanceItem}
	 * 
	 * @param workflowItem
	 */
	public void setWorkflow(WorkflowProvenanceItem workflowItem) {
		this.workflowItem = workflowItem;
	}

	// TODO is this unused?
	public static String SerializeParam(Object ParamValue) {
		ByteArrayOutputStream BStream = new ByteArrayOutputStream();
		XMLEncoder encoder = new XMLEncoder(BStream);
		encoder.writeObject(ParamValue);
		encoder.close();
		return BStream.toString();
	}

	// TODO is this unused?
	public static Object DeserializeParam(String SerializedParam) {
		InputStream IStream = new ByteArrayInputStream(
				SerializedParam.getBytes());
		XMLDecoder decoder = new XMLDecoder(IStream);
		Object output = decoder.readObject();
		decoder.close();
		return output;
	}
}
