blob: e0b595e29f7b5cb0fdd599549032579dc2344af1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.lineage;
import java.util.Map;
import jcuda.Pointer;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
public class LineageCacheEntry {
protected final LineageItem _key;
protected final DataType _dt;
protected MatrixBlock _MBval;
protected ScalarObject _SOval;
protected byte[] _serialBytes; // serialized bytes of a federated response
protected long _computeTime;
protected long _timestamp = 0;
protected LineageCacheStatus _status;
protected LineageCacheEntry _nextEntry;
protected LineageItem _origItem;
private String _outfile = null;
protected double score;
protected Pointer _gpuPointer;
protected RDDObject _rddObject;
public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
_key = key;
_dt = dt;
_MBval = Mval;
_SOval = Sval;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
_nextEntry = null;
_origItem = null;
_outfile = null;
_gpuPointer = null;
}
protected synchronized void setCacheStatus(LineageCacheStatus st) {
_status = st;
}
public synchronized MatrixBlock getMBValue() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _MBval;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized ScalarObject getSOValue() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _SOval;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized RDDObject getRDDObject() {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
//comes here if data is placed or the entry is removed by the running thread
return _rddObject;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized byte[] getSerializedBytes() {
try {
// wait until other thread completes operation
// in order to avoid redundant computation
while(_status == LineageCacheStatus.EMPTY) {
wait();
}
// comes here if data is placed or the entry is removed by the running thread
return _serialBytes;
}
catch( InterruptedException ex ) {
throw new DMLRuntimeException(ex);
}
}
public synchronized LineageCacheStatus getCacheStatus() {
return _status;
}
protected synchronized void removeAndNotify() {
//Set the status to NOTCACHED (not cached anymore) and wake up the sleeping threads
if (_status != LineageCacheStatus.EMPTY)
return;
_status = LineageCacheStatus.NOTCACHED;
notifyAll();
}
public synchronized long getSize() {
long size = 0;
if (_MBval != null)
size += _MBval.getInMemorySize();
if (_SOval != null)
size += _SOval.getSize();
if (_gpuPointer!= null)
size += LineageGPUCacheEviction.getPointerSize(_gpuPointer);
return size;
}
public boolean isNullVal() {
return(_MBval == null && _SOval == null && _gpuPointer == null && _serialBytes == null && _rddObject == null);
}
public boolean isMatrixValue() {
return _dt.isMatrix() && _rddObject == null;
}
public boolean isScalarValue() {
return _dt.isScalar() && _rddObject == null;
}
public boolean isRDDPersist() {
return _rddObject != null;
}
public boolean isGPUObject() {
return _gpuPointer != null;
}
public boolean isSerializedBytes() {
return _dt.isUnknown() && _key.getOpcode().equals(LineageItemUtils.SERIALIZATION_OPCODE);
}
public synchronized void setValue(MatrixBlock val, long computetime) {
_MBval = val;
_gpuPointer = null; //Matrix block and gpu pointer cannot coexist
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setValue(MatrixBlock val) {
setValue(val, _computeTime);
}
public synchronized void setValue(ScalarObject val, long computetime) {
_SOval = val;
_gpuPointer = null; //scalar and gpu pointer cannot coexist
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setGPUValue(Pointer ptr, long computetime) {
_gpuPointer = ptr;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.GPUCACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setRDDValue(RDDObject rdd, long computetime) {
_rddObject = rdd;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
}
public synchronized void setValue(byte[] serialBytes, long computetime) {
_serialBytes = serialBytes;
_computeTime = computetime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
// resume all threads waiting for val
notifyAll();
}
public synchronized Pointer getGPUPointer() {
return _gpuPointer;
}
protected synchronized void setNullValues() {
_MBval = null;
_SOval = null;
_serialBytes = null;
_status = LineageCacheStatus.EMPTY;
}
protected synchronized void setOutfile(String outfile) {
_outfile = outfile;
}
protected synchronized String getOutfile() {
return _outfile;
}
protected synchronized void setTimestamp() {
if (_timestamp != 0)
return;
_timestamp = System.currentTimeMillis() - LineageCacheEviction.getStartTimestamp();
if (_timestamp < 0)
throw new DMLRuntimeException ("Execution timestamp shouldn't be -ve. Key: "+_key);
recomputeScore();
}
protected synchronized void updateTimestamp() {
_timestamp = System.currentTimeMillis() - LineageCacheEviction.getStartTimestamp();
if (_timestamp < 0)
throw new DMLRuntimeException ("Execution timestamp shouldn't be -ve. Key: "+_key);
recomputeScore();
}
protected synchronized void computeScore(Map<LineageItem, Integer> removeList) {
// Set timestamp and compute initial score
setTimestamp();
// Update score to emulate computeTime scaling by #misses
if (removeList.containsKey(_key) && LineageCacheConfig.isCostNsize()) {
//score = score * (1 + removeList.get(_key));
double w1 = LineageCacheConfig.WEIGHTS[0];
int missCount = 1 + removeList.get(_key);
score = score + (w1*(((double)_computeTime)/getSize()) * missCount);
}
}
protected synchronized void updateScore() {
// Update score to emulate computeTime scaling by cache hit
//score *= 2;
double w1 = LineageCacheConfig.WEIGHTS[0];
score = score + w1*(((double)_computeTime)/getSize());
}
protected synchronized long getTimestamp() {
return _timestamp;
}
protected synchronized long getDagHeight() {
return _key.getHeight();
}
protected synchronized double getCostNsize() {
return ((double)_computeTime)/getSize();
}
private void recomputeScore() {
// Gather the weights for scoring components
double w1 = LineageCacheConfig.WEIGHTS[0];
double w2 = LineageCacheConfig.WEIGHTS[1];
double w3 = LineageCacheConfig.WEIGHTS[2];
// Generate scores
score = w1*(((double)_computeTime)/getSize()) + w2*getTimestamp() + w3*(((double)1)/getDagHeight());
}
}