/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.runtime.controlprogram.context;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.utils.Statistics;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ExecutionContext {
	protected static final Log LOG = LogFactory.getLog(ExecutionContext.class.getName());

	//program reference (e.g., function repository)
	protected Program _prog = null;
	
	//symbol table
	protected LocalVariableMap _variables;

	//lineage map, cache, prepared dedup blocks
	protected Lineage _lineage;

	/**
	 * List of {@link GPUContext}s owned by this {@link ExecutionContext}
	 */
	protected List<GPUContext> _gpuContexts = new ArrayList<>();
	
	protected ExecutionContext() {
		//protected constructor to force use of ExecutionContextFactory
		this( true, DMLScript.LINEAGE, null );
	}

	protected ExecutionContext( boolean allocateVariableMap, boolean allocateLineage, Program prog ) {
		//protected constructor to force use of ExecutionContextFactory
		_variables = allocateVariableMap ? new LocalVariableMap() : null;
		_lineage = allocateLineage ? new Lineage() : null;
		_prog = prog;
	}

	public ExecutionContext(LocalVariableMap vars) {
		_variables = vars;
		_lineage = null;
		_prog = null;
	}

	public Program getProgram(){
		return _prog;
	}

	public void setProgram(Program prog) {
		_prog = prog;
	}
	
	public LocalVariableMap getVariables() {
		return _variables;
	}
	
	public void setVariables(LocalVariableMap vars) {
		_variables = vars;
	}

	public Lineage getLineage() {
		return _lineage;
	}

	public void setLineage(Lineage lineage) {
		_lineage = lineage;
	}

	/**
	 * Get the i-th GPUContext
	 * @param index index of the GPUContext
	 * @return a valid GPUContext or null if the indexed GPUContext does not exist.
	 */
	public GPUContext getGPUContext(int index) {
		try {
			return _gpuContexts.get(index);
		} catch (IndexOutOfBoundsException e){
			return null;
		}
	}

	/**
	 * Sets the list of GPUContexts
	 * @param gpuContexts a collection of GPUContexts
	 */
	public void setGPUContexts(List<GPUContext> gpuContexts){
		_gpuContexts = gpuContexts;
	}

	/**
	 * Gets the list of GPUContexts
	 * @return a list of GPUContexts
	 */
	public List<GPUContext> getGPUContexts() {
		return _gpuContexts;
	}

	/**
	 * Gets the number of GPUContexts
	 * @return number of GPUContexts
	 */
	public int getNumGPUContexts() {
		return _gpuContexts.size();
	}

	/* -------------------------------------------------------
	 * Methods to handle variables and associated data objects
	 * -------------------------------------------------------
	 */
	
	public Data getVariable(String name) {
		return _variables.get(name);
	}
	
	public Data getVariable(CPOperand operand) {
		return operand.getDataType().isScalar() ?
			getScalarInput(operand) : getVariable(operand.getName());
	}
	
	public void setVariable(String name, Data val) {
		_variables.put(name, val);
	}
	
	public boolean containsVariable(CPOperand operand) {
		return containsVariable(operand.getName());
	}
	
	public boolean containsVariable(String name) {
		return _variables.keySet().contains(name);
	}

	public Data removeVariable(String name) {
		return _variables.remove(name);
	}

	public void setMetaData(String fname, MetaData md) {
		_variables.get(fname).setMetaData(md);
	}
	
	public MetaData getMetaData(String varname) {
		Data tmp = _variables.get(varname);
		if( tmp == null )
			throw new DMLRuntimeException(getNonExistingVarError(varname));
		return tmp.getMetaData();
	}
	
	public boolean isMatrixObject(String varname) {
		Data dat = getVariable(varname);
		return (dat!= null && dat instanceof MatrixObject);
	}
	
	public MatrixObject getMatrixObject(CPOperand input) {
		return getMatrixObject(input.getName());
	}

	public MatrixObject getMatrixObject(String varname) {
		Data dat = getVariable(varname);
		
		//error handling if non existing or no matrix
		if( dat == null )
			throw new DMLRuntimeException(getNonExistingVarError(varname));
		if( !(dat instanceof MatrixObject) )
			throw new DMLRuntimeException("Variable '"+varname+"' is not a matrix: "+dat.getClass().getName());
		
		return (MatrixObject) dat;
	}

	public TensorObject getTensorObject(String varname) {
		Data dat = getVariable(varname);

		//error handling if non existing or no matrix
		if( dat == null )
			throw new DMLRuntimeException(getNonExistingVarError(varname));
		if( !(dat instanceof TensorObject) )
			throw new DMLRuntimeException("Variable '"+varname+"' is not a tensor.");

		return (TensorObject) dat;
	}
	
	public boolean isFrameObject(String varname) {
		Data dat = getVariable(varname);
		return (dat!= null && dat instanceof FrameObject);
	}
	
	public FrameObject getFrameObject(CPOperand input) {
		return getFrameObject(input.getName());
	}
	
	public FrameObject getFrameObject(String varname) {
		Data dat = getVariable(varname);
		//error handling if non existing or no matrix
		if( dat == null )
			throw new DMLRuntimeException(getNonExistingVarError(varname));
		if( !(dat instanceof FrameObject) )
			throw new DMLRuntimeException("Variable '"+varname+"' is not a frame.");
		return (FrameObject) dat;
	}

	public CacheableData<?> getCacheableData(CPOperand input) {
		return getCacheableData(input.getName());
	}
	
	public CacheableData<?> getCacheableData(String varname) {
		Data dat = getVariable(varname);
		//error handling if non existing or no matrix
		if( dat == null )
			throw new DMLRuntimeException(getNonExistingVarError(varname));
		if( !(dat instanceof CacheableData<?>) )
			throw new DMLRuntimeException("Variable '"+varname+"' is not a matrix, tensor or frame.");
		return (CacheableData<?>) dat;
	}

	public void releaseCacheableData(String varname) {
		getCacheableData(varname).release();
	}

	public DataCharacteristics getDataCharacteristics(String varname) {
		return getMetaData(varname).getDataCharacteristics();
	}
	
	/**
	 * Pins a matrix variable into memory and returns the internal matrix block.
	 *
	 * @param varName variable name
	 * @return matrix block
	 */
	public MatrixBlock getMatrixInput(String varName) {
		return getMatrixObject(varName).acquireRead();
	}

	/**
	 * Pins a matrix variable into memory and returns the internal matrix block.
	 *
	 * @param varName variable name
	 * @return matrix block
	 */
	public TensorBlock getTensorInput(String varName) {
		return getTensorObject(varName).acquireRead();
	}

	public void setMetaData(String varName, long nrows, long ncols) {
		MatrixObject mo = getMatrixObject(varName);
		if(mo.getNumRows() == nrows && mo.getNumColumns() == ncols)
			return;
		MetaData oldMetaData = mo.getMetaData();
		if( oldMetaData == null || !(oldMetaData instanceof MetaDataFormat) )
			throw new DMLRuntimeException("Metadata not available");
		MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, (int) mo.getBlocksize());
		mo.setMetaData(new MetaDataFormat(mc, ((MetaDataFormat)oldMetaData).getFileFormat()));
	}
	
	/**
	 * Compares two potential dimensions d1 and d2 and return the one which is not -1.
	 * This method is useful when the dimensions are not known at compile time, but are known at runtime.
	 *
	 * @param d1 dimension1
	 * @param d2 dimension1
	 * @return valid d1 or d2
	 */
	private static long validateDimensions(long d1, long d2) {
		if(d1 >= 0 && d2 >= 0 && d1 != d2) {
			throw new DMLRuntimeException("Incorrect dimensions:" + d1 + " != " + d2);
		}
		return Math.max(d1, d2);
	}

	/**
	 * Allocates a dense matrix on the GPU (for output)
	 * @param varName	name of the output matrix (known by this {@link ExecutionContext})
	 * @param numRows number of rows of matrix object
	 * @param numCols number of columns of matrix object
	 * @return a pair containing the wrapping {@link MatrixObject} and a boolean indicating whether a cuda memory allocation took place (as opposed to the space already being allocated)
	 */
	public Pair<MatrixObject, Boolean> getDenseMatrixOutputForGPUInstruction(String varName, long numRows, long numCols) {
		MatrixObject mo = allocateGPUMatrixObject(varName, numRows, numCols);
		boolean allocated = mo.getGPUObject(getGPUContext(0)).acquireDeviceModifyDense();
		mo.getDataCharacteristics().setNonZeros(-1);
		return new Pair<>(mo, allocated);
	}

	/**
	 * Allocates a sparse matrix in CSR format on the GPU.
	 * Assumes that mat.getNumRows() returns a valid number
	 *
	 * @param varName variable name
	 * @param numRows number of rows of matrix object
	 * @param numCols number of columns of matrix object
	 * @param nnz number of non zeroes
	 * @return matrix object
	 */
	public Pair<MatrixObject, Boolean> getSparseMatrixOutputForGPUInstruction(String varName, long numRows, long numCols, long nnz) {
		MatrixObject mo = allocateGPUMatrixObject(varName, numRows, numCols);
		mo.getDataCharacteristics().setNonZeros(nnz);
				boolean allocated = mo.getGPUObject(getGPUContext(0)).acquireDeviceModifySparse();
		return new Pair<>(mo, allocated);
	}

	/**
	 * Allocates the {@link GPUObject} for a given LOPS Variable (eg. _mVar3)
	 * @param varName variable name
	 * @param numRows number of rows of matrix object
	 * @param numCols number of columns of matrix object
	 * @return matrix object
	 */
	public MatrixObject allocateGPUMatrixObject(String varName, long numRows, long numCols) {
		MatrixObject mo = getMatrixObject(varName);
		long dim1 = -1; long dim2 = -1;

		try {
			dim1 = validateDimensions(mo.getNumRows(), numRows);
			dim2 = validateDimensions(mo.getNumColumns(), numCols);
		}
		catch(DMLRuntimeException e) {
			throw new DMLRuntimeException("Incorrect dimensions given to allocateGPUMatrixObject: [" + numRows + "," +
					numCols + "], " + "[" + mo.getNumRows() + "," + mo.getNumColumns() + "]", e);
		}

		if(dim1 != mo.getNumRows() || dim2 != mo.getNumColumns()) {
			// Set unknown dimensions
			mo.getDataCharacteristics().setDimension(dim1, dim2);
		}

		if( mo.getGPUObject(getGPUContext(0)) == null ) {
			GPUObject newGObj = getGPUContext(0).createGPUObject(mo);
			mo.setGPUObject(getGPUContext(0), newGObj);
		}

		// The lock is added here for an output block
		// so that any block currently in use is not deallocated by eviction on the GPU
		mo.getGPUObject(getGPUContext(0)).addWriteLock();
		return mo;
	}

	public MatrixObject getMatrixInputForGPUInstruction(String varName, String opcode) {
		GPUContext gCtx = getGPUContext(0);
		MatrixObject mo = getMatrixObject(varName);
		if(mo == null) {
			throw new DMLRuntimeException("No matrix object available for variable:" + varName);
		}

		if( mo.getGPUObject(gCtx) == null ) {
			GPUObject newGObj = gCtx.createGPUObject(mo);
			mo.setGPUObject(gCtx, newGObj);
		}
		// No need to perform acquireRead here because it is performed in copyFromHostToDevice
		mo.getGPUObject(gCtx).acquireDeviceRead(opcode);
		return mo;
	}
	
	/**
	 * Unpins a currently pinned matrix variable and update fine-grained statistics.
	 *
	 * @param varName variable name
	 */
	public void releaseMatrixInput(String varName) {
		getMatrixObject(varName).release();
	}
	
	public void releaseMatrixInput(String... varNames) {
		for( String varName : varNames )
			releaseMatrixInput(varName);
	}
	
	public void releaseMatrixInputForGPUInstruction(String varName) {
		getMatrixObject(varName).getGPUObject(getGPUContext(0)).releaseInput();
	}
	
	/**
	 * Pins a frame variable into memory and returns the internal frame block.
	 *
	 * @param varName variable name
	 * @return frame block
	 */
	public FrameBlock getFrameInput(String varName) {
		return getFrameObject(varName).acquireRead();
	}
	
	/**
	 * Unpins a currently pinned frame variable.
	 *
	 * @param varName variable name
	 */
	public void releaseFrameInput(String varName) {
		getFrameObject(varName).release();
	}

	public void releaseTensorInput(String varName) {
		getTensorObject(varName).release();
	}

	public void releaseTensorInput(String... varNames) {
		for( String varName : varNames )
			releaseTensorInput(varName);
	}

	public ScalarObject getScalarInput(CPOperand input) {
		return input.isLiteral() ? input.getLiteral() :
			getScalarInput(input.getName(), input.getValueType(), false);
	}
	
	public ScalarObject getScalarInput(String name, ValueType vt, boolean isLiteral) {
		if ( isLiteral ) {
			return ScalarObjectFactory.createScalarObject(vt, name);
		}
		else {
			Data obj = getVariable(name);
			if (obj == null)
				throw new DMLRuntimeException("Unknown variable: " + name);
			return (ScalarObject) obj;
		}
	}

	public void setScalarOutput(String varName, ScalarObject so) {
		setVariable(varName, so);
	}

	public ListObject getListObject(CPOperand input) {
		return getListObject(input.getName());
	}
	
	public ListObject getListObject(String name) {
		Data dat = getVariable(name);
		//error handling if non existing or no list
		if (dat == null)
			throw new DMLRuntimeException(getNonExistingVarError(name));
		if (!(dat instanceof ListObject))
			throw new DMLRuntimeException("Variable '" + name + "' is not a list.");
		return (ListObject) dat;
	}
	
	private List<MatrixObject> getMatricesFromList(ListObject lo) {
		List<MatrixObject> ret = new ArrayList<>();
		for (Data e : lo.getData()) {
			if (e instanceof MatrixObject)
				ret.add((MatrixObject)e);
			else if (e instanceof ListObject)
				ret.addAll(getMatricesFromList((ListObject)e));
			else
				throw new DMLRuntimeException("List must contain only matrices or lists for rbind/cbind.");
		}
		return ret;
	}

	public void releaseMatrixOutputForGPUInstruction(String varName) {
		MatrixObject mo = getMatrixObject(varName);
		if(mo.getGPUObject(getGPUContext(0)) == null || !mo.getGPUObject(getGPUContext(0)).isAllocated()) {
			throw new DMLRuntimeException("No output is allocated on GPU");
		}
		setMetaData(varName, new MetaDataFormat(mo.getDataCharacteristics(), FileFormat.BINARY));
		mo.getGPUObject(getGPUContext(0)).releaseOutput();
	}
	
	public void setMatrixOutput(String varName, MatrixBlock outputData) {
		MatrixObject mo = getMatrixObject(varName);
		mo.acquireModify(outputData);
		mo.release();
		setVariable(varName, mo);
	}

	public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag) {
		if( flag.isInPlace() ) {
			//modify metadata to carry update status
			MatrixObject mo = getMatrixObject(varName);
			mo.setUpdateType( flag );
		}
		setMatrixOutput(varName, outputData);
	}

	public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag, String opcode) {
		setMatrixOutput(varName, outputData, flag);
	}

	public void setTensorOutput(String varName, TensorBlock outputData) {
		TensorObject to = getTensorObject(varName);
		to.acquireModify(outputData);
		to.release();
		setVariable(varName, to);
	}
	
	public void setFrameOutput(String varName, FrameBlock outputData) {
		FrameObject fo = getFrameObject(varName);
		fo.acquireModify(outputData);
		fo.release();
		setVariable(varName, fo);
	}
	
	public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs) {
		return getMatrixInputs(inputs, false);
	}
	
	public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs, boolean includeList) {
		List<MatrixBlock> ret = Arrays.stream(inputs).filter(in -> in.isMatrix())
			.map(in -> getMatrixInput(in.getName())).collect(Collectors.toList());
		
		if (includeList) {
			List<ListObject> lolist = Arrays.stream(inputs).filter(in -> in.isList())
				.map(in -> getListObject(in.getName())).collect(Collectors.toList());
			for (ListObject lo : lolist)
				ret.addAll( getMatricesFromList(lo).stream()
					.map(mo -> mo.acquireRead()).collect(Collectors.toList()));
		}
		
		return ret;
	}
	
	public List<ScalarObject> getScalarInputs(CPOperand[] inputs) {
		return Arrays.stream(inputs).filter(in -> in.isScalar())
			.map(in -> getScalarInput(in)).collect(Collectors.toList());
	}
	
	public void releaseMatrixInputs(CPOperand[] inputs) {
		releaseMatrixInputs(inputs, false);
	}

	public void releaseMatrixInputs(CPOperand[] inputs, boolean includeList) {
		Arrays.stream(inputs).filter(in -> in.isMatrix())
			.forEach(in -> releaseMatrixInput(in.getName()));

		if (includeList) {
			List<ListObject> lolist = Arrays.stream(inputs).filter(in -> in.isList())
				.map(in -> getListObject(in.getName())).collect(Collectors.toList());
			for (ListObject lo : lolist)
				getMatricesFromList(lo).stream().forEach(mo -> mo.release());
		}
	}
	
	/**
	 * Pin a given list of variables i.e., set the "clean up" state in
	 * corresponding matrix objects, so that the cached data inside these
	 * objects is not cleared and the corresponding HDFS files are not
	 * deleted (through rmvar instructions).
	 *
	 * This is necessary for: function input variables, parfor result variables,
	 * parfor shared inputs that are passed to functions.
	 *
	 * The function returns the OLD "clean up" state of matrix objects.
	 *
	 * @param varList variable list
	 * @return indicator vector of old cleanup state of matrix objects
	 */
	public boolean[] pinVariables(List<String> varList)
	{
		//analyze list variables
		int nlist = 0;
		int nlistItems = 0;
		for( int i=0; i<varList.size(); i++ ) {
			Data dat = _variables.get(varList.get(i));
			if( dat instanceof ListObject ) {
				nlistItems += ((ListObject)dat).getNumCacheableData();
				nlist++;
			}
		}
		
		//2-pass approach since multiple vars might refer to same matrix object
		boolean[] varsState = new boolean[varList.size()-nlist+nlistItems];
		
		//step 1) get current information
		for( int i=0, pos=0; i<varList.size(); i++ ) {
			Data dat = _variables.get(varList.get(i));
			if( dat instanceof CacheableData<?>  )
				varsState[pos++] = ((CacheableData<?>)dat).isCleanupEnabled();
			else if( dat instanceof ListObject )
				for( Data dat2 : ((ListObject)dat).getData() )
					if( dat2 instanceof CacheableData<?> )
						varsState[pos++] = ((CacheableData<?>)dat2).isCleanupEnabled();
		}
		
		//step 2) pin variables
		for( int i=0; i<varList.size(); i++ ) {
			Data dat = _variables.get(varList.get(i));
			if( dat instanceof CacheableData<?> )
				((CacheableData<?>)dat).enableCleanup(false);
			else if( dat instanceof ListObject )
				for( Data dat2 : ((ListObject)dat).getData() )
					if( dat2 instanceof CacheableData<?> )
						((CacheableData<?>)dat2).enableCleanup(false);
		}
		
		return varsState;
	}
	
	/**
	 * Unpin the a given list of variables by setting their "cleanup" status
	 * to the values specified by <code>varsStats</code>.
	 *
	 * Typical usage:
	 *    <code>
	 *    oldStatus = pinVariables(varList);
	 *    ...
	 *    unpinVariables(varList, oldStatus);
	 *    </code>
	 *
	 * i.e., a call to unpinVariables() is preceded by pinVariables().
	 *
	 * @param varList variable list
	 * @param varsState variable state
	 */
	public void unpinVariables(List<String> varList, boolean[] varsState) {
		for( int i=0, pos=0; i<varList.size(); i++ ) {
			Data dat = _variables.get(varList.get(i));
			if( dat instanceof CacheableData<?> )
				((CacheableData<?>)dat).enableCleanup(varsState[pos++]);
			else if( dat instanceof ListObject )
				for( Data dat2 : ((ListObject)dat).getData() )
					if( dat2 instanceof CacheableData<?> )
						((CacheableData<?>)dat2).enableCleanup(varsState[pos++]);
		}
	}
	
	/**
	 * NOTE: No order guaranteed, so keep same list for pin and unpin.
	 *
	 * @return list of all variable names.
	 */
	public ArrayList<String> getVarList() {
		return new ArrayList<>(_variables.keySet());
	}
	
	/**
	 * NOTE: No order guaranteed, so keep same list for pin and unpin.
	 *
	 * @return list of all variable names of partitioned matrices.
	 */
	public ArrayList<String> getVarListPartitioned() {
		ArrayList<String> ret = new ArrayList<>();
		for( String var : _variables.keySet() ) {
			Data dat = _variables.get(var);
			if( dat instanceof MatrixObject
				&& ((MatrixObject)dat).isPartitioned() )
				ret.add(var);
		}
		return ret;
	}
	
	public final void cleanupDataObject(Data dat) {
		if( dat == null ) return;
		if ( dat instanceof CacheableData )
			cleanupCacheableData( (CacheableData<?>)dat );
		else if( dat instanceof ListObject )
			for( Data dat2 : ((ListObject)dat).getData() )
				if( dat2 instanceof CacheableData<?> )
					cleanupCacheableData( (CacheableData<?>)dat2 );
	}
	
	public void cleanupCacheableData(CacheableData<?> mo) {
		if (DMLScript.JMLC_MEM_STATISTICS)
			Statistics.removeCPMemObject(System.identityHashCode(mo));
		//early abort w/o scan of symbol table if no cleanup required
		boolean fileExists = (mo.isHDFSFileExists() && mo.getFileName() != null);
		if( !CacheableData.isCachingActive() && !fileExists )
			return;
		
		try {
			//compute ref count only if matrix cleanup actually necessary
			if ( mo.isCleanupEnabled() && !getVariables().hasReferences(mo) )  {
				mo.clearData(); //clean cached data
				if( fileExists ) {
					HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName());
					HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName()+".mtd");
				}
			}
		}
		catch(Exception ex) {
			throw new DMLRuntimeException(ex);
		}
	}

	public void traceLineage(Instruction inst) {
		if( _lineage == null )
			throw new DMLRuntimeException("Lineage Trace unavailable.");
		_lineage.trace(inst, this);
	}

	public LineageItem getLineageItem(CPOperand input) {
		if( _lineage == null )
			throw new DMLRuntimeException("Lineage Trace unavailable.");
		return _lineage.get(input);
	}

	public LineageItem getOrCreateLineageItem(CPOperand input) {
		if( _lineage == null )
			throw new DMLRuntimeException("Lineage Trace unavailable.");
		return _lineage.getOrCreate(input);
	}
	
	private static String getNonExistingVarError(String varname) {
		return "Variable '" + varname + "' does not exist in the symbol table.";
	}

	@Override
	public String toString(){
		StringBuilder sb = new StringBuilder();
		sb.append(super.toString());
		if(_prog != null)
			sb.append("\nProgram: " + _prog.toString());
		if(_variables != null)
			sb.append("\nLocalVariableMap: " + _variables.toString());
		if(_lineage != null)
			sb.append("\nLineage: " + _lineage.toString());
		return sb.toString();
	}
}
