| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.controlprogram.context; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.sysds.api.DMLScript; |
| import org.apache.sysds.common.Types; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.ValueType; |
| import org.apache.sysds.conf.ConfigurationManager; |
| import org.apache.sysds.hops.OptimizerUtils; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.controlprogram.LocalVariableMap; |
| import org.apache.sysds.runtime.controlprogram.Program; |
| import org.apache.sysds.runtime.controlprogram.caching.CacheBlock; |
| import org.apache.sysds.runtime.controlprogram.caching.CacheableData; |
| import org.apache.sysds.runtime.controlprogram.caching.FrameObject; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysds.runtime.controlprogram.caching.TensorObject; |
| import org.apache.sysds.runtime.data.TensorBlock; |
| import org.apache.sysds.runtime.instructions.Instruction; |
| import org.apache.sysds.runtime.instructions.cp.CPOperand; |
| import org.apache.sysds.runtime.instructions.cp.Data; |
| import org.apache.sysds.runtime.instructions.cp.ListObject; |
| import org.apache.sysds.runtime.instructions.cp.ScalarObject; |
| import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory; |
| import org.apache.sysds.runtime.instructions.gpu.context.GPUContext; |
| import org.apache.sysds.runtime.instructions.gpu.context.GPUObject; |
| import org.apache.sysds.runtime.lineage.Lineage; |
| import org.apache.sysds.runtime.lineage.LineageItem; |
| import org.apache.sysds.runtime.matrix.data.FrameBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.matrix.data.Pair; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.meta.MatrixCharacteristics; |
| import org.apache.sysds.runtime.meta.MetaData; |
| import org.apache.sysds.runtime.meta.MetaDataFormat; |
| import org.apache.sysds.runtime.util.HDFSTool; |
| import org.apache.sysds.utils.Statistics; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| public class ExecutionContext { |
| protected static final Log LOG = LogFactory.getLog(ExecutionContext.class.getName()); |
| |
| //program reference (e.g., function repository) |
| protected Program _prog = null; |
| |
| //symbol table |
| protected LocalVariableMap _variables; |
| protected boolean _autoCreateVars; |
| |
| //lineage map, cache, prepared dedup blocks |
| protected Lineage _lineage; |
| |
| /** |
| * List of {@link GPUContext}s owned by this {@link ExecutionContext} |
| */ |
| protected List<GPUContext> _gpuContexts = new ArrayList<>(); |
| |
| protected ExecutionContext() { |
| //protected constructor to force use of ExecutionContextFactory |
| this( true, DMLScript.LINEAGE, null ); |
| } |
| |
| protected ExecutionContext( boolean allocateVariableMap, boolean allocateLineage, Program prog ) { |
| //protected constructor to force use of ExecutionContextFactory |
| _variables = allocateVariableMap ? new LocalVariableMap() : null; |
| _autoCreateVars = false; |
| _lineage = allocateLineage ? new Lineage() : null; |
| _prog = prog; |
| } |
| |
| public ExecutionContext(LocalVariableMap vars) { |
| _variables = vars; |
| _autoCreateVars = false; |
| _lineage = null; |
| _prog = null; |
| } |
| |
| public Program getProgram(){ |
| return _prog; |
| } |
| |
| public void setProgram(Program prog) { |
| _prog = prog; |
| } |
| |
| public LocalVariableMap getVariables() { |
| return _variables; |
| } |
| |
| public void setVariables(LocalVariableMap vars) { |
| _variables = vars; |
| } |
| |
| public Lineage getLineage() { |
| return _lineage; |
| } |
| |
| public void setLineage(Lineage lineage) { |
| _lineage = lineage; |
| } |
| |
| public boolean isAutoCreateVars() { |
| return _autoCreateVars; |
| } |
| |
| public void setAutoCreateVars(boolean flag) { |
| _autoCreateVars = flag; |
| } |
| |
| /** |
| * Get the i-th GPUContext |
| * @param index index of the GPUContext |
| * @return a valid GPUContext or null if the indexed GPUContext does not exist. |
| */ |
| public GPUContext getGPUContext(int index) { |
| try { |
| return _gpuContexts.get(index); |
| } catch (IndexOutOfBoundsException e){ |
| return null; |
| } |
| } |
| |
| /** |
| * Sets the list of GPUContexts |
| * @param gpuContexts a collection of GPUContexts |
| */ |
| public void setGPUContexts(List<GPUContext> gpuContexts){ |
| _gpuContexts = gpuContexts; |
| } |
| |
| /** |
| * Gets the list of GPUContexts |
| * @return a list of GPUContexts |
| */ |
| public List<GPUContext> getGPUContexts() { |
| return _gpuContexts; |
| } |
| |
| /** |
| * Gets the number of GPUContexts |
| * @return number of GPUContexts |
| */ |
| public int getNumGPUContexts() { |
| return _gpuContexts.size(); |
| } |
| |
| /* ------------------------------------------------------- |
| * Methods to handle variables and associated data objects |
| * ------------------------------------------------------- |
| */ |
| |
| public Data getVariable(String name) { |
| return _variables.get(name); |
| } |
| |
| public Data getVariable(CPOperand operand) { |
| return operand.getDataType().isScalar() ? |
| getScalarInput(operand) : getVariable(operand.getName()); |
| } |
| |
| public void setVariable(String name, Data val) { |
| _variables.put(name, val); |
| } |
| |
| public boolean containsVariable(CPOperand operand) { |
| return containsVariable(operand.getName()); |
| } |
| |
| public boolean containsVariable(String name) { |
| return _variables.keySet().contains(name); |
| } |
| |
| public Data removeVariable(String name) { |
| return _variables.remove(name); |
| } |
| |
| public void setMetaData(String fname, MetaData md) { |
| _variables.get(fname).setMetaData(md); |
| } |
| |
| public MetaData getMetaData(String varname) { |
| Data tmp = _variables.get(varname); |
| if( tmp == null ) |
| throw new DMLRuntimeException(getNonExistingVarError(varname)); |
| return tmp.getMetaData(); |
| } |
| |
| public boolean isMatrixObject(String varname) { |
| Data dat = getVariable(varname); |
| return (dat!= null && dat instanceof MatrixObject); |
| } |
| |
| public MatrixObject getMatrixObject(CPOperand input) { |
| return getMatrixObject(input.getName()); |
| } |
| |
| public MatrixObject getMatrixObject(String varname) { |
| Data dat = getVariable(varname); |
| |
| //error handling if non existing or no matrix |
| if( dat == null ) |
| throw new DMLRuntimeException(getNonExistingVarError(varname)); |
| if( !(dat instanceof MatrixObject) ) |
| throw new DMLRuntimeException("Variable '"+varname+"' is not a matrix: "+dat.getClass().getName()); |
| |
| return (MatrixObject) dat; |
| } |
| |
| public TensorObject getTensorObject(String varname) { |
| Data dat = getVariable(varname); |
| |
| //error handling if non existing or no matrix |
| if( dat == null ) |
| throw new DMLRuntimeException(getNonExistingVarError(varname)); |
| if( !(dat instanceof TensorObject) ) |
| throw new DMLRuntimeException("Variable '"+varname+"' is not a tensor."); |
| |
| return (TensorObject) dat; |
| } |
| |
| public boolean isFrameObject(String varname) { |
| Data dat = getVariable(varname); |
| return (dat!= null && dat instanceof FrameObject); |
| } |
| |
| public FrameObject getFrameObject(CPOperand input) { |
| return getFrameObject(input.getName()); |
| } |
| |
| public FrameObject getFrameObject(String varname) { |
| Data dat = getVariable(varname); |
| //error handling if non existing or no matrix |
| if( dat == null ) |
| throw new DMLRuntimeException(getNonExistingVarError(varname)); |
| if( !(dat instanceof FrameObject) ) |
| throw new DMLRuntimeException("Variable '"+varname+"' is not a frame."); |
| return (FrameObject) dat; |
| } |
| |
| public CacheableData<?> getCacheableData(CPOperand input) { |
| return getCacheableData(input.getName()); |
| } |
| |
| public CacheableData<?> getCacheableData(String varname) { |
| Data dat = getVariable(varname); |
| //error handling if non existing or no matrix |
| if( dat == null ) |
| throw new DMLRuntimeException(getNonExistingVarError(varname)); |
| if( !(dat instanceof CacheableData<?>) ) |
| throw new DMLRuntimeException("Variable '"+varname+"' is not a matrix, tensor or frame."); |
| return (CacheableData<?>) dat; |
| } |
| |
| public void releaseCacheableData(String varname) { |
| getCacheableData(varname).release(); |
| } |
| |
| public DataCharacteristics getDataCharacteristics(String varname) { |
| return getMetaData(varname).getDataCharacteristics(); |
| } |
| |
| /** |
| * Pins a matrix variable into memory and returns the internal matrix block. |
| * |
| * @param varName variable name |
| * @return matrix block |
| */ |
| public MatrixBlock getMatrixInput(String varName) { |
| return getMatrixObject(varName).acquireRead(); |
| } |
| |
| /** |
| * Pins a matrix variable into memory and returns the internal matrix block. |
| * |
| * @param varName variable name |
| * @return matrix block |
| */ |
| public TensorBlock getTensorInput(String varName) { |
| return getTensorObject(varName).acquireRead(); |
| } |
| |
| public void setMetaData(String varName, long nrows, long ncols) { |
| MatrixObject mo = getMatrixObject(varName); |
| if(mo.getNumRows() == nrows && mo.getNumColumns() == ncols) |
| return; |
| MetaData oldMetaData = mo.getMetaData(); |
| if( oldMetaData == null || !(oldMetaData instanceof MetaDataFormat) ) |
| throw new DMLRuntimeException("Metadata not available"); |
| MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, (int) mo.getBlocksize()); |
| mo.setMetaData(new MetaDataFormat(mc, ((MetaDataFormat)oldMetaData).getFileFormat())); |
| } |
| |
| /** |
| * Compares two potential dimensions d1 and d2 and return the one which is not -1. |
| * This method is useful when the dimensions are not known at compile time, but are known at runtime. |
| * |
| * @param d1 dimension1 |
| * @param d2 dimension1 |
| * @return valid d1 or d2 |
| */ |
| private static long validateDimensions(long d1, long d2) { |
| if(d1 >= 0 && d2 >= 0 && d1 != d2) { |
| throw new DMLRuntimeException("Incorrect dimensions:" + d1 + " != " + d2); |
| } |
| return Math.max(d1, d2); |
| } |
| |
| /** |
| * Allocates a dense matrix on the GPU (for output) |
| * @param varName name of the output matrix (known by this {@link ExecutionContext}) |
| * @param numRows number of rows of matrix object |
| * @param numCols number of columns of matrix object |
| * @return a pair containing the wrapping {@link MatrixObject} and a boolean indicating whether a cuda memory allocation took place (as opposed to the space already being allocated) |
| */ |
| public Pair<MatrixObject, Boolean> getDenseMatrixOutputForGPUInstruction(String varName, long numRows, long numCols) { |
| MatrixObject mo = allocateGPUMatrixObject(varName, numRows, numCols); |
| boolean allocated = mo.getGPUObject(getGPUContext(0)).acquireDeviceModifyDense(); |
| mo.getDataCharacteristics().setNonZeros(-1); |
| return new Pair<>(mo, allocated); |
| } |
| |
| /** |
| * Allocates a sparse matrix in CSR format on the GPU. |
| * Assumes that mat.getNumRows() returns a valid number |
| * |
| * @param varName variable name |
| * @param numRows number of rows of matrix object |
| * @param numCols number of columns of matrix object |
| * @param nnz number of non zeroes |
| * @return matrix object |
| */ |
| public Pair<MatrixObject, Boolean> getSparseMatrixOutputForGPUInstruction(String varName, long numRows, long numCols, long nnz) { |
| MatrixObject mo = allocateGPUMatrixObject(varName, numRows, numCols); |
| mo.getDataCharacteristics().setNonZeros(nnz); |
| boolean allocated = mo.getGPUObject(getGPUContext(0)).acquireDeviceModifySparse(); |
| return new Pair<>(mo, allocated); |
| } |
| |
| /** |
| * Allocates the {@link GPUObject} for a given LOPS Variable (eg. _mVar3) |
| * @param varName variable name |
| * @param numRows number of rows of matrix object |
| * @param numCols number of columns of matrix object |
| * @return matrix object |
| */ |
| public MatrixObject allocateGPUMatrixObject(String varName, long numRows, long numCols) { |
| MatrixObject mo = getMatrixObject(varName); |
| long dim1 = -1; long dim2 = -1; |
| |
| try { |
| dim1 = validateDimensions(mo.getNumRows(), numRows); |
| dim2 = validateDimensions(mo.getNumColumns(), numCols); |
| } |
| catch(DMLRuntimeException e) { |
| throw new DMLRuntimeException("Incorrect dimensions given to allocateGPUMatrixObject: [" + numRows + "," + |
| numCols + "], " + "[" + mo.getNumRows() + "," + mo.getNumColumns() + "]", e); |
| } |
| |
| if(dim1 != mo.getNumRows() || dim2 != mo.getNumColumns()) { |
| // Set unknown dimensions |
| mo.getDataCharacteristics().setDimension(dim1, dim2); |
| } |
| |
| if( mo.getGPUObject(getGPUContext(0)) == null ) { |
| GPUObject newGObj = getGPUContext(0).createGPUObject(mo); |
| mo.setGPUObject(getGPUContext(0), newGObj); |
| } |
| |
| // The lock is added here for an output block |
| // so that any block currently in use is not deallocated by eviction on the GPU |
| mo.getGPUObject(getGPUContext(0)).addWriteLock(); |
| return mo; |
| } |
| |
| public MatrixObject getMatrixInputForGPUInstruction(String varName, String opcode) { |
| GPUContext gCtx = getGPUContext(0); |
| MatrixObject mo = getMatrixObject(varName); |
| if(mo == null) { |
| throw new DMLRuntimeException("No matrix object available for variable:" + varName); |
| } |
| |
| if( mo.getGPUObject(gCtx) == null ) { |
| GPUObject newGObj = gCtx.createGPUObject(mo); |
| mo.setGPUObject(gCtx, newGObj); |
| } |
| // No need to perform acquireRead here because it is performed in copyFromHostToDevice |
| mo.getGPUObject(gCtx).acquireDeviceRead(opcode); |
| return mo; |
| } |
| |
| /** |
| * Unpins a currently pinned matrix variable and update fine-grained statistics. |
| * |
| * @param varName variable name |
| */ |
| public void releaseMatrixInput(String varName) { |
| getMatrixObject(varName).release(); |
| } |
| |
| public void releaseMatrixInput(String... varNames) { |
| for( String varName : varNames ) |
| releaseMatrixInput(varName); |
| } |
| |
| public void releaseMatrixInputForGPUInstruction(String varName) { |
| getMatrixObject(varName).getGPUObject(getGPUContext(0)).releaseInput(); |
| } |
| |
| /** |
| * Pins a frame variable into memory and returns the internal frame block. |
| * |
| * @param varName variable name |
| * @return frame block |
| */ |
| public FrameBlock getFrameInput(String varName) { |
| return getFrameObject(varName).acquireRead(); |
| } |
| |
| /** |
| * Unpins a currently pinned frame variable. |
| * |
| * @param varName variable name |
| */ |
| public void releaseFrameInput(String varName) { |
| getFrameObject(varName).release(); |
| } |
| |
| public void releaseTensorInput(String varName) { |
| getTensorObject(varName).release(); |
| } |
| |
| public void releaseTensorInput(String... varNames) { |
| for( String varName : varNames ) |
| releaseTensorInput(varName); |
| } |
| |
| public ScalarObject getScalarInput(CPOperand input) { |
| return input.isLiteral() ? input.getLiteral() : |
| getScalarInput(input.getName(), input.getValueType(), false); |
| } |
| |
| public ScalarObject getScalarInput(String name, ValueType vt, boolean isLiteral) { |
| if ( isLiteral ) { |
| return ScalarObjectFactory.createScalarObject(vt, name); |
| } |
| else { |
| Data obj = getVariable(name); |
| if (obj == null) |
| throw new DMLRuntimeException("Unknown variable: " + name); |
| return (ScalarObject) obj; |
| } |
| } |
| |
| public void setScalarOutput(String varName, ScalarObject so) { |
| setVariable(varName, so); |
| } |
| |
| public ListObject getListObject(CPOperand input) { |
| return getListObject(input.getName()); |
| } |
| |
| public ListObject getListObject(String name) { |
| Data dat = getVariable(name); |
| //error handling if non existing or no list |
| if (dat == null) |
| throw new DMLRuntimeException(getNonExistingVarError(name)); |
| if (!(dat instanceof ListObject)) |
| throw new DMLRuntimeException("Variable '" + name + "' is not a list."); |
| return (ListObject) dat; |
| } |
| |
| private List<MatrixObject> getMatricesFromList(ListObject lo) { |
| List<MatrixObject> ret = new ArrayList<>(); |
| for (Data e : lo.getData()) { |
| if (e instanceof MatrixObject) |
| ret.add((MatrixObject)e); |
| else if (e instanceof ListObject) |
| ret.addAll(getMatricesFromList((ListObject)e)); |
| else |
| throw new DMLRuntimeException("List must contain only matrices or lists for rbind/cbind."); |
| } |
| return ret; |
| } |
| |
| public void releaseMatrixOutputForGPUInstruction(String varName) { |
| MatrixObject mo = getMatrixObject(varName); |
| if(mo.getGPUObject(getGPUContext(0)) == null || !mo.getGPUObject(getGPUContext(0)).isAllocated()) { |
| throw new DMLRuntimeException("No output is allocated on GPU"); |
| } |
| setMetaData(varName, new MetaDataFormat(mo.getDataCharacteristics(), FileFormat.BINARY)); |
| mo.getGPUObject(getGPUContext(0)).releaseOutput(); |
| } |
| |
| public void setMatrixOutput(String varName, MatrixBlock outputData) { |
| if( isAutoCreateVars() && !containsVariable(varName) ) |
| setVariable(varName, createMatrixObject(outputData)); |
| MatrixObject mo = getMatrixObject(varName); |
| mo.acquireModify(outputData); |
| mo.release(); |
| setVariable(varName, mo); |
| } |
| |
| public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag) { |
| if( isAutoCreateVars() && !containsVariable(varName) ) |
| setVariable(varName, createMatrixObject(outputData)); |
| if( flag.isInPlace() ) { |
| //modify metadata to carry update status |
| MatrixObject mo = getMatrixObject(varName); |
| mo.setUpdateType( flag ); |
| } |
| setMatrixOutput(varName, outputData); |
| } |
| |
| public void setTensorOutput(String varName, TensorBlock outputData) { |
| TensorObject to = getTensorObject(varName); |
| to.acquireModify(outputData); |
| to.release(); |
| setVariable(varName, to); |
| } |
| |
| public void setFrameOutput(String varName, FrameBlock outputData) { |
| if( isAutoCreateVars() && !containsVariable(varName) ) |
| setVariable(varName, createFrameObject(outputData)); |
| FrameObject fo = getFrameObject(varName); |
| fo.acquireModify(outputData); |
| fo.release(); |
| setVariable(varName, fo); |
| } |
| |
| public static CacheableData<?> createCacheableData(CacheBlock cb) { |
| if( cb instanceof MatrixBlock ) |
| return createMatrixObject((MatrixBlock) cb); |
| else if( cb instanceof FrameBlock ) |
| return createFrameObject((FrameBlock) cb); |
| return null; |
| } |
| |
| public static MatrixObject createMatrixObject(MatrixBlock mb) { |
| MatrixObject ret = new MatrixObject(Types.ValueType.FP64, |
| OptimizerUtils.getUniqueTempFileName()); |
| ret.acquireModify(mb); |
| ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics( |
| mb.getNumRows(), mb.getNumColumns()), FileFormat.BINARY)); |
| ret.getMetaData().getDataCharacteristics() |
| .setBlocksize(ConfigurationManager.getBlocksize()); |
| ret.release(); |
| return ret; |
| } |
| |
| public static FrameObject createFrameObject(FrameBlock fb) { |
| FrameObject ret = new FrameObject(OptimizerUtils.getUniqueTempFileName()); |
| ret.acquireModify(fb); |
| ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics( |
| fb.getNumRows(), fb.getNumColumns()), FileFormat.BINARY)); |
| ret.release(); |
| return ret; |
| } |
| |
| public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs) { |
| return getMatrixInputs(inputs, false); |
| } |
| |
| public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs, boolean includeList) { |
| List<MatrixBlock> ret = Arrays.stream(inputs).filter(in -> in.isMatrix()) |
| .map(in -> getMatrixInput(in.getName())).collect(Collectors.toList()); |
| |
| if (includeList) { |
| List<ListObject> lolist = Arrays.stream(inputs).filter(in -> in.isList()) |
| .map(in -> getListObject(in.getName())).collect(Collectors.toList()); |
| for (ListObject lo : lolist) |
| ret.addAll( getMatricesFromList(lo).stream() |
| .map(mo -> mo.acquireRead()).collect(Collectors.toList())); |
| } |
| |
| return ret; |
| } |
| |
| public List<ScalarObject> getScalarInputs(CPOperand[] inputs) { |
| return Arrays.stream(inputs).filter(in -> in.isScalar()) |
| .map(in -> getScalarInput(in)).collect(Collectors.toList()); |
| } |
| |
| public void releaseMatrixInputs(CPOperand[] inputs) { |
| releaseMatrixInputs(inputs, false); |
| } |
| |
| public void releaseMatrixInputs(CPOperand[] inputs, boolean includeList) { |
| Arrays.stream(inputs).filter(in -> in.isMatrix()) |
| .forEach(in -> releaseMatrixInput(in.getName())); |
| |
| if (includeList) { |
| List<ListObject> lolist = Arrays.stream(inputs).filter(in -> in.isList()) |
| .map(in -> getListObject(in.getName())).collect(Collectors.toList()); |
| for (ListObject lo : lolist) |
| getMatricesFromList(lo).stream().forEach(mo -> mo.release()); |
| } |
| } |
| |
| /** |
| * Pin a given list of variables i.e., set the "clean up" state in |
| * corresponding matrix objects, so that the cached data inside these |
| * objects is not cleared and the corresponding HDFS files are not |
| * deleted (through rmvar instructions). |
| * |
| * This is necessary for: function input variables, parfor result variables, |
| * parfor shared inputs that are passed to functions. |
| * |
| * The function returns the OLD "clean up" state of matrix objects. |
| * |
| * @param varList variable list |
| * @return indicator vector of old cleanup state of matrix objects |
| */ |
| public boolean[] pinVariables(List<String> varList) |
| { |
| //analyze list variables |
| int nlist = 0; |
| int nlistItems = 0; |
| for( int i=0; i<varList.size(); i++ ) { |
| Data dat = _variables.get(varList.get(i)); |
| if( dat instanceof ListObject ) { |
| nlistItems += ((ListObject)dat).getNumCacheableData(); |
| nlist++; |
| } |
| } |
| |
| //2-pass approach since multiple vars might refer to same matrix object |
| boolean[] varsState = new boolean[varList.size()-nlist+nlistItems]; |
| |
| //step 1) get current information |
| for( int i=0, pos=0; i<varList.size(); i++ ) { |
| Data dat = _variables.get(varList.get(i)); |
| if( dat instanceof CacheableData<?> ) |
| varsState[pos++] = ((CacheableData<?>)dat).isCleanupEnabled(); |
| else if( dat instanceof ListObject ) |
| for( Data dat2 : ((ListObject)dat).getData() ) |
| if( dat2 instanceof CacheableData<?> ) |
| varsState[pos++] = ((CacheableData<?>)dat2).isCleanupEnabled(); |
| } |
| |
| //step 2) pin variables |
| for( int i=0; i<varList.size(); i++ ) { |
| Data dat = _variables.get(varList.get(i)); |
| if( dat instanceof CacheableData<?> ) |
| ((CacheableData<?>)dat).enableCleanup(false); |
| else if( dat instanceof ListObject ) |
| for( Data dat2 : ((ListObject)dat).getData() ) |
| if( dat2 instanceof CacheableData<?> ) |
| ((CacheableData<?>)dat2).enableCleanup(false); |
| } |
| |
| return varsState; |
| } |
| |
| /** |
| * Unpin the a given list of variables by setting their "cleanup" status |
| * to the values specified by <code>varsStats</code>. |
| * |
| * Typical usage: |
| * <code> |
| * oldStatus = pinVariables(varList); |
| * ... |
| * unpinVariables(varList, oldStatus); |
| * </code> |
| * |
| * i.e., a call to unpinVariables() is preceded by pinVariables(). |
| * |
| * @param varList variable list |
| * @param varsState variable state |
| */ |
| public void unpinVariables(List<String> varList, boolean[] varsState) { |
| for( int i=0, pos=0; i<varList.size(); i++ ) { |
| Data dat = _variables.get(varList.get(i)); |
| if( dat instanceof CacheableData<?> ) |
| ((CacheableData<?>)dat).enableCleanup(varsState[pos++]); |
| else if( dat instanceof ListObject ) |
| for( Data dat2 : ((ListObject)dat).getData() ) |
| if( dat2 instanceof CacheableData<?> ) |
| ((CacheableData<?>)dat2).enableCleanup(varsState[pos++]); |
| } |
| } |
| |
| /** |
| * NOTE: No order guaranteed, so keep same list for pin and unpin. |
| * |
| * @return list of all variable names. |
| */ |
| public ArrayList<String> getVarList() { |
| return new ArrayList<>(_variables.keySet()); |
| } |
| |
| /** |
| * NOTE: No order guaranteed, so keep same list for pin and unpin. |
| * |
| * @return list of all variable names of partitioned matrices. |
| */ |
| public ArrayList<String> getVarListPartitioned() { |
| ArrayList<String> ret = new ArrayList<>(); |
| for( String var : _variables.keySet() ) { |
| Data dat = _variables.get(var); |
| if( dat instanceof MatrixObject |
| && ((MatrixObject)dat).isPartitioned() ) |
| ret.add(var); |
| } |
| return ret; |
| } |
| |
| public final void cleanupDataObject(Data dat) { |
| if( dat == null ) return; |
| if ( dat instanceof CacheableData ) |
| cleanupCacheableData( (CacheableData<?>)dat ); |
| else if( dat instanceof ListObject ) |
| for( Data dat2 : ((ListObject)dat).getData() ) |
| if( dat2 instanceof CacheableData<?> ) |
| cleanupCacheableData( (CacheableData<?>)dat2 ); |
| } |
| |
| public void cleanupCacheableData(CacheableData<?> mo) { |
| if (DMLScript.JMLC_MEM_STATISTICS) |
| Statistics.removeCPMemObject(System.identityHashCode(mo)); |
| //early abort w/o scan of symbol table if no cleanup required |
| boolean fileExists = (mo.isHDFSFileExists() && mo.getFileName() != null); |
| if( !CacheableData.isCachingActive() && !fileExists ) |
| return; |
| |
| try { |
| //compute ref count only if matrix cleanup actually necessary |
| if ( mo.isCleanupEnabled() && !getVariables().hasReferences(mo) ) { |
| mo.clearData(); //clean cached data |
| if( fileExists ) { |
| HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName()); |
| HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName()+".mtd"); |
| } |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| public void traceLineage(Instruction inst) { |
| if( _lineage == null ) |
| throw new DMLRuntimeException("Lineage Trace unavailable."); |
| _lineage.trace(inst, this); |
| } |
| |
| public LineageItem getLineageItem(CPOperand input) { |
| if( _lineage == null ) |
| throw new DMLRuntimeException("Lineage Trace unavailable."); |
| return _lineage.get(input); |
| } |
| |
| public LineageItem getOrCreateLineageItem(CPOperand input) { |
| if( _lineage == null ) |
| throw new DMLRuntimeException("Lineage Trace unavailable."); |
| return _lineage.getOrCreate(input); |
| } |
| |
| private static String getNonExistingVarError(String varname) { |
| return "Variable '" + varname + "' does not exist in the symbol table."; |
| } |
| |
| @Override |
| public String toString(){ |
| StringBuilder sb = new StringBuilder(); |
| sb.append(super.toString()); |
| if(_prog != null) |
| sb.append("\nProgram: " + _prog.toString()); |
| if(_variables != null) |
| sb.append("\nLocalVariableMap: " + _variables.toString()); |
| if(_lineage != null) |
| sb.append("\nLineage: " + _lineage.toString()); |
| return sb.toString(); |
| } |
| } |