blob: 220226031c69fd84791f0a06808c706e5d8f1eab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.lineage;
import java.util.Map;
import jcuda.Pointer;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
public class LineageCacheEntry {
protected final LineageItem _key;
protected final DataType _dt;
protected MatrixBlock _MBval;
protected ScalarObject _SOval;
protected byte[] _serialBytes; // serialized bytes of a federated response
protected long _computeTime;
protected long _timestamp = 0;
protected LineageCacheStatus _status;
protected LineageCacheEntry _nextEntry;
protected LineageItem _origItem;
private String _outfile = null;
protected double score;
protected GPUPointer _gpuPointer;
protected RDDObject _rddObject;
public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
_key = key;
_dt = dt;
_MBval = Mval;
_SOval = Sval;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
_nextEntry = null;
_origItem = null;
_outfile = null;
_gpuPointer = null;
}
protected synchronized void setCacheStatus(LineageCacheStatus st) {
_status = st;
}
public synchronized MatrixBlock getMBValue() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _MBval;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized ScalarObject getSOValue() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _SOval;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized RDDObject getRDDObject() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _rddObject;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized byte[] getSerializedBytes() {
try {
// wait until other thread completes operation
// in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
// comes here if data is placed or the entry is removed by the running thread
return _serialBytes;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized Pointer getGPUPointer() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _gpuPointer.getPointer();
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized LineageCacheStatus getCacheStatus() {
return _status;
}
protected synchronized void removeAndNotify() {
//Set the status to NOTCACHED (not cached anymore) and wake up the sleeping threads
if (_status != LineageCacheStatus.EMPTY)
return;
_status = LineageCacheStatus.NOTCACHED;
notifyAll();
}
public synchronized long getSize() {
long size = 0;
if (_MBval != null)
size += _MBval.getInMemorySize();
if (_SOval != null)
size += _SOval.getSize();
if (_gpuPointer != null)
size += _gpuPointer.getPointerSize();
if (_rddObject != null)
//Return total cached size in the executors
size += SparkExecutionContext.getMemCachedRDDSize(_rddObject.getRDD().id());
return size;
}
public boolean isNullVal() {
return(_MBval == null && _SOval == null && _gpuPointer == null && _serialBytes == null && _rddObject == null);
}
public boolean isMatrixValue() {
return _dt.isMatrix() && _rddObject == null && _gpuPointer == null;
}
public boolean isScalarValue() {
return _dt.isScalar() && _rddObject == null && _gpuPointer == null;
}
public boolean isLocalObject() {
return isMatrixValue() || isScalarValue();
}
public boolean isRDDPersist() {
return _rddObject != null;
}
public boolean isGPUObject() {
return _gpuPointer!= null;
}
public synchronized boolean isDensePointer() {
if (!isGPUObject())
return false;
return _gpuPointer.isDensepointer();
}
public boolean isSerializedBytes() {
return _dt.isUnknown() && _key.getOpcode().equals(LineageItemUtils.SERIALIZATION_OPCODE);
}
public synchronized void setValue(MatrixBlock val, long computetime) {
_MBval = val;
_gpuPointer = null; //Matrix block and gpu object cannot coexist
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setValue(MatrixBlock val) {
setValue(val, _computeTime);
}
public synchronized void setValue(ScalarObject val, long computetime) {
_SOval = val;
_gpuPointer = null; //scalar and gpu object cannot coexist
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setGPUValue(Pointer ptr, long size, MetaData md, long computetime) {
_gpuPointer = new GPUPointer(ptr, size, md);
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.GPUCACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setRDDValue(RDDObject rdd, long computetime) {
_rddObject = rdd;
_computeTime = computetime;
//_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.TOPERSISTRDD;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setValue(byte[] serialBytes, long computetime) {
_serialBytes = serialBytes;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
// resume all threads waiting for val
notifyAll();
}
public synchronized void copyValueFrom(LineageCacheEntry src, long computetime) {
_MBval = src._MBval;
_SOval = src._SOval;
_gpuPointer = src._gpuPointer;
_rddObject = src._rddObject;
_computeTime = src._computeTime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
// resume all threads waiting for val
notifyAll();
}
public synchronized DataCharacteristics getDataCharacteristics() {
return _gpuPointer.getDataCharacteristics();
}
protected synchronized void setNullValues() {
_MBval = null;
_SOval = null;
_serialBytes = null;
_status = LineageCacheStatus.EMPTY;
}
protected synchronized void setOutfile(String outfile) {
_outfile = outfile;
}
protected synchronized String getOutfile() {
return _outfile;
}
protected synchronized void setTimestamp() {
if (_timestamp != 0)
return;
_timestamp = System.currentTimeMillis() - LineageCacheEviction.getStartTimestamp();
if (_timestamp < 0)
throw new DMLRuntimeException ("Execution timestamp shouldn't be -ve. Key: "+_key);
recomputeScore();
}
protected synchronized void updateTimestamp() {
_timestamp = System.currentTimeMillis() - LineageCacheEviction.getStartTimestamp();
if (_timestamp < 0)
throw new DMLRuntimeException ("Execution timestamp shouldn't be -ve. Key: "+_key);
recomputeScore();
}
protected synchronized void computeScore(Map<LineageItem, Integer> removeList) {
// Set timestamp and compute initial score
setTimestamp();
// Update score to emulate computeTime scaling by #misses
if (removeList.containsKey(_key) && LineageCacheConfig.isCostNsize()) {
double w1 = LineageCacheConfig.WEIGHTS[0];
int missCount = 1 + removeList.get(_key);
if (isLocalObject())
score = score + (w1*(((double)_computeTime)/getSize()) * missCount);
}
}
protected synchronized void initiateScoreSpark(Map<LineageItem, Integer> removeList, long estimatedSize) {
// Set timestamp
_timestamp = System.currentTimeMillis() - LineageCacheEviction.getStartTimestamp();
if (_timestamp < 0)
throw new DMLRuntimeException ("Execution timestamp shouldn't be -ve. Key: "+_key);
// Gather the weights for scoring components
double w1 = LineageCacheConfig.WEIGHTS[0];
double w2 = LineageCacheConfig.WEIGHTS[1];
double w3 = LineageCacheConfig.WEIGHTS[2];
// Generate initial score
int computeGroup = LineageCacheConfig.getComputeGroup(_key.getOpcode());
int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
score = w1*(((double)computeGroup*refCount)/estimatedSize) + w2*getTimestamp() + w3*(((double)1)/getDagHeight());
}
protected synchronized void updateScore(boolean add) {
// Update score to emulate computeTime scaling by cache hit
double w1 = LineageCacheConfig.WEIGHTS[0];
long size = getSize();
int sign = add ? 1: -1;
if(isLocalObject())
score = score + sign * w1 * (((double) _computeTime) / size);
if(isRDDPersist() && size != 0) { //size == 0 means not persisted yet
int computeGroup = LineageCacheConfig.getComputeGroup(_key.getOpcode());
score = score + sign * w1 * (((double) computeGroup) / size);
}
}
protected synchronized long getTimestamp() {
return _timestamp;
}
protected synchronized long getDagHeight() {
return _key.getHeight();
}
protected synchronized double getCostNsize() {
return ((double)_computeTime)/getSize();
}
private void recomputeScore() {
// Gather the weights for scoring components
double w1 = LineageCacheConfig.WEIGHTS[0];
double w2 = LineageCacheConfig.WEIGHTS[1];
double w3 = LineageCacheConfig.WEIGHTS[2];
// Generate scores
long size = getSize();
if (isLocalObject())
score = w1*(((double)_computeTime)/size) + w2*getTimestamp() + w3*(((double)1)/getDagHeight());
if (isRDDPersist() && size != 0) { //size == 0 means not persisted yet
int computeGroup = LineageCacheConfig.getComputeGroup(_key.getOpcode());
int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
score = w1*(((double)computeGroup*refCount)/size) + w2*getTimestamp() + w3*(((double)1)/getDagHeight());
}
}
static class GPUPointer {
private Pointer _pointer;
private long _allocatedSize; //bytes
private MetaData _metadata;
public GPUPointer(Pointer pointer, long size, MetaData metadata) {
_pointer = pointer;
_allocatedSize = size;
_metadata = metadata;
}
protected long getPointerSize() {
return _allocatedSize;
}
protected Pointer getPointer() {
return _pointer;
}
protected DataCharacteristics getDataCharacteristics() {
return _metadata.getDataCharacteristics();
}
protected boolean isDensepointer() {
return true;
// TODO: Support sparse pointer caching
}
}
}