blob: b2b6e0cb79c48b2b1a10d443d8fb29292fd34bd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.lops;
import java.util.HashMap;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.lops.LopProperties.ExecType;
import org.apache.sysds.parser.DataExpression;
/**
* Lop to represent data objects. Data objects represent matrices, vectors,
* variables, literals. Can be for both input and output.
*/
public class Data extends Lop
{
public static final String PREAD_PREFIX = "pREAD";
private final FileFormat formatType;
private final OpOpData _op;
private final boolean literal_var;
private HashMap<String, Lop> _inputParams;
/**
* Method to create literal LOPs.
*
* @param vt value type
* @param literalValue literal value
* @return literal low-level operator
*/
public static Data createLiteralLop(ValueType vt, String literalValue) {
// All literals have default format type of TEXT
return new Data(OpOpData.PERSISTENTREAD, null, null, null, literalValue, DataType.SCALAR, vt, FileFormat.BINARY);
}
/**
* Constructor to setup read or write LOP
* In case of write: <code>input</code> must be provided. This will always be added as the first element in <code>input</code> array.
* For literals: this function is invoked through the static method <code>createLiteralLop</code>.
*
* @param op operation type
* @param input low-level operator
* @param inputParametersLops input lops
* @param name string name
* @param literal string literal
* @param dt data type
* @param vt value type
* @param fmt file format
*/
public Data(OpOpData op, Lop input, HashMap<String, Lop>
inputParametersLops, String name, String literal, DataType dt, ValueType vt, FileFormat fmt)
{
super(Lop.Type.Data, dt, vt);
_op = op;
literal_var = (literal != null);
// Either <code>name</code> or <code>literal</code> can be non-null.
if(literal_var){
if ( _op.isTransient() )
throw new LopsException("Invalid parameter values while setting up a Data LOP -- transient flag is invalid for a literal.");
getOutputParameters().setLabel(literal);
}
else if(name != null) {
if ( _op.isTransient() )
getOutputParameters().setLabel(name); // tvar+name
else {
String code = _op == OpOpData.FUNCTIONOUTPUT ? "" :
_op.isRead() ? "pREAD" : "pWRITE";
getOutputParameters().setLabel(code+name);
}
}
else {
throw new LopsException("Invalid parameter values while setting up a Data LOP -- the lop must have either literal value or a name.");
}
// WRITE operation must have an input Lops, we always put this
// input Lops as the first element of WRITE input. The parameters of
// WRITE operation are then put as the following input elements.
if(input != null && op.isWrite()) {
addInput(input);
input.addOutput(this);
}
_inputParams = inputParametersLops;
if ( _inputParams != null ) {
for (Lop lop : inputParametersLops.values()) {
addInput(lop);
lop.addOutput(this);
}
if ( inputParametersLops.get(DataExpression.IO_FILENAME)!= null
&& inputParametersLops.get(DataExpression.IO_FILENAME) instanceof Data )
{
OutputParameters outParams = ((Data)inputParametersLops.get(DataExpression.IO_FILENAME)).getOutputParameters();
String fName = outParams.getLabel();
this.getOutputParameters().setFile_name(fName);
}
}
//set output format
formatType = fmt;
outParams.setFormat(fmt);
setLopProperties();
}
private void setLopProperties() {
lps.setProperties ( inputs, ExecType.INVALID);
}
/**
* Data-Lop-specific method to set the execution type for persistent write.
* TODO: split lops into MR/CP lop.
*
* @param et execution type
*/
public void setExecType( ExecType et ) {
lps.execType = et;
}
/**
* method to get format type for input, output files.
* @return file format
*/
public FileFormat getFileFormatType() {
return formatType ;
}
@Override
public String toString() {
return getID() + ":" + "File_Name: " + getOutputParameters().getFile_name() + " "
+ "Label: " + getOutputParameters().getLabel() + " " + "Operation: = " + _op + " "
+ "Format: " + outParams.getFormat() + " Datatype: " + getDataType() + " Valuetype: " + getValueType()
+ " num_rows = " + getOutputParameters().getNumRows() + " num_cols = " + getOutputParameters().getNumCols()
+ " UpdateInPlace: " + getOutputParameters().getUpdateType();
}
/**
* method to get operation type, i.e. read/write.
* @return operation type
*/
public OpOpData getOperationType() {
return _op;
}
/**
* method to get inputParams
* @return input parameters
*/
public HashMap<String, Lop> getInputParams(){
return _inputParams;
}
public Lop getNamedInputLop(String name) {
return _inputParams.get(name);
}
public Lop getNamedInputLop(String name, String defaultVal) {
if( _inputParams.containsKey(name) )
return _inputParams.get(name);
else
return Data.createLiteralLop(ValueType.STRING, defaultVal);
}
/**
* method to check if this data lop represents a literal.
* @return true if data lop is a literal
*/
public boolean isLiteral() {
return literal_var;
}
public boolean getBooleanValue() {
if(literal_var) {
return Boolean.parseBoolean(getOutputParameters().getLabel());
}
else
throw new LopsException("Cannot obtain the value of a non-literal variable at compile time.");
}
public double getDoubleValue() {
if(literal_var) {
return Double.parseDouble(getOutputParameters().getLabel());
}
else
throw new LopsException("Cannot obtain the value of a non-literal variable at compile time.");
}
public long getLongValue() {
if(literal_var) {
ValueType vt = getValueType();
switch(vt) {
case INT64:
return Long.parseLong(getOutputParameters().getLabel());
case FP64:
return (long) Double.parseDouble(getOutputParameters().getLabel());
default:
throw new LopsException("Encountered a non-numeric value " + (vt) + ", while a numeric value is expected.");
}
}
else
throw new LopsException("Can not obtain the value of a non-literal variable at compile time.");
}
public String getStringValue() {
if(literal_var) {
return getOutputParameters().getLabel();
}
else
throw new LopsException("Cannot obtain the value of a non-literal variable at compile time.");
}
public boolean isPersistentWrite() {
return _op == OpOpData.PERSISTENTWRITE;
}
public boolean isPersistentRead() {
return _op == OpOpData.PERSISTENTREAD
&& !literal_var;
}
/**
* Method to get CP instructions for reading/writing scalars and matrices from/to HDFS.
* This method generates CP read/write instructions.
*/
@Override
public String getInstructions(String input1, String input2)
{
if ( getOutputParameters().getFile_name() == null && _op.isRead() )
throw new LopsException(this.printErrorLocation() + "Data.getInstructions(): Exepecting a SCALAR data type, encountered " + getDataType());
StringBuilder sb = new StringBuilder();
if(this.getExecType() == ExecType.SPARK)
sb.append( "SPARK" );
else
sb.append( "CP" );
sb.append( OPERAND_DELIMITOR );
if (_op.isRead()) {
sb.append( "read" );
sb.append( OPERAND_DELIMITOR );
sb.append ( this.prepInputOperand(input1) );
}
else if (_op.isWrite()) {
sb.append( "write" );
sb.append( OPERAND_DELIMITOR );
sb.append ( getInputs().get(0).prepInputOperand(input1) );
}
else
throw new LopsException(this.printErrorLocation() + "In Data Lop, Unknown operation: " + _op);
sb.append( OPERAND_DELIMITOR );
Lop fnameLop = _inputParams.get(DataExpression.IO_FILENAME);
boolean literal = (fnameLop instanceof Data && ((Data)fnameLop).isLiteral());
sb.append ( prepOperand(input2, DataType.SCALAR, ValueType.STRING, literal) );
// attach outputInfo in case of matrices
OutputParameters oparams = getOutputParameters();
if ( _op.isWrite() ) {
sb.append( OPERAND_DELIMITOR );
String fmt = null;
if ( getDataType() == DataType.MATRIX || getDataType() == DataType.FRAME )
fmt = oparams.getFormat().toString();
else // scalars will always be written in text format
fmt = FileFormat.TEXT.toString();
sb.append( prepOperand(fmt, DataType.SCALAR, ValueType.STRING, true));
if(oparams.getFormat() == FileFormat.CSV) {
Data headerLop = (Data) getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW);
Data delimLop = (Data) getNamedInputLop(DataExpression.DELIM_DELIMITER);
Data sparseLop = (Data) getNamedInputLop(DataExpression.DELIM_SPARSE);
if (headerLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_HAS_HEADER_ROW
+ " must be a literal for a seq operation.");
if (delimLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_DELIMITER
+ " must be a literal for a seq operation.");
if (sparseLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_SPARSE
+ " must be a literal for a seq operation.");
sb.append(OPERAND_DELIMITOR);
sb.append(headerLop.getBooleanValue());
sb.append(OPERAND_DELIMITOR);
sb.append(delimLop.getStringValue());
sb.append(OPERAND_DELIMITOR);
sb.append(sparseLop.getBooleanValue());
if ( this.getExecType() == ExecType.SPARK )
{
sb.append(OPERAND_DELIMITOR);
sb.append(true); //isInputMatrixBlock
}
}
if(oparams.getFormat() == FileFormat.LIBSVM) {
Data sparseLop = (Data) getNamedInputLop(DataExpression.DELIM_SPARSE);
if (sparseLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_SPARSE
+ " must be a literal for a seq operation.");
sb.append(OPERAND_DELIMITOR);
sb.append(sparseLop.getBooleanValue());
}
}
if (_op.isWrite()) {
sb.append(OPERAND_DELIMITOR);
Lop descriptionLop = getInputParams().get(DataExpression.DESCRIPTIONPARAM);
if (descriptionLop != null) {
boolean descLiteral = (descriptionLop instanceof Data && ((Data) descriptionLop).isLiteral());
sb.append(prepOperand(descriptionLop.getOutputParameters().getLabel(), DataType.SCALAR,
ValueType.STRING, descLiteral));
} else {
sb.append(prepOperand("", DataType.SCALAR, ValueType.STRING, true));
}
}
return sb.toString();
}
/**
* Method to generate createvar instruction that updates symbol table with metadata, hdfsfile name, etc.
*
*/
@Override
public String getInstructions() {
return getCreateVarInstructions(getOutputParameters().getFile_name(), getOutputParameters().getLabel());
}
@Override
public String getInstructions(String outputFileName) {
return getCreateVarInstructions(outputFileName, getOutputParameters().getLabel() );
}
public String getCreateVarInstructions(String outputFileName, String outputLabel) {
if ( getDataType() == DataType.MATRIX || getDataType() == DataType.FRAME ) {
if ( _op.isTransient() )
throw new LopsException("getInstructions() should not be called for transient nodes.");
OutputParameters oparams = getOutputParameters();
StringBuilder sb = new StringBuilder();
sb.append( "CP" );
sb.append( OPERAND_DELIMITOR );
sb.append( "createvar" );
sb.append( OPERAND_DELIMITOR );
sb.append( outputLabel );
sb.append( OPERAND_DELIMITOR );
sb.append( outputFileName );
sb.append( OPERAND_DELIMITOR );
sb.append( false );
sb.append( OPERAND_DELIMITOR );
sb.append( getDataType() );
sb.append( OPERAND_DELIMITOR ); // only persistent reads come here!
sb.append( oparams.getFormat().toString() );
sb.append( OPERAND_DELIMITOR );
sb.append( oparams.getNumRows() );
sb.append( OPERAND_DELIMITOR );
sb.append( oparams.getNumCols() );
sb.append( OPERAND_DELIMITOR );
sb.append( oparams.getBlocksize() );
sb.append( OPERAND_DELIMITOR );
sb.append( oparams.getNnz() );
sb.append( OPERAND_DELIMITOR );
sb.append( oparams.getUpdateType().toString().toLowerCase() );
// Format-specific properties
if ( oparams.getFormat() == FileFormat.CSV ) {
sb.append( OPERAND_DELIMITOR );
sb.append( createVarCSVHelper() );
}
// Format-specific properties
if ( oparams.getFormat() == FileFormat.LIBSVM ) {
sb.append( createVarLIBSVMHelper() );
}
// Frame-specific properties
if( getDataType()==DataType.FRAME ) {
Data schema = (Data) getNamedInputLop(DataExpression.SCHEMAPARAM);
sb.append( OPERAND_DELIMITOR );
sb.append( (schema!=null) ? schema.prepScalarLabel() : "*" );
}
return sb.toString();
}
else {
throw new LopsException(this.printErrorLocation() + "In Data Lop, Unexpected data type " + getDataType());
}
}
/**
* Helper function that attaches CSV format-specific properties to createvar instruction.
* The set of properties that are attached for a READ operation is different from that for a WRITE operation.
*
* @return instruction with csv format properties appended
*/
private String createVarCSVHelper() {
StringBuilder sb = new StringBuilder();
if ( _op.isRead() ) {
Data headerLop = (Data) getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW);
Data delimLop = (Data) getNamedInputLop(DataExpression.DELIM_DELIMITER);
Data fillLop = (Data) getNamedInputLop(DataExpression.DELIM_FILL);
Data fillValueLop = (Data) getNamedInputLop(DataExpression.DELIM_FILL_VALUE);
Lop naLop = getNamedInputLop(DataExpression.DELIM_NA_STRINGS);
sb.append(headerLop.getBooleanValue());
sb.append(OPERAND_DELIMITOR);
sb.append(delimLop.getStringValue());
sb.append(OPERAND_DELIMITOR);
sb.append(fillLop.getBooleanValue());
sb.append(OPERAND_DELIMITOR);
sb.append(fillValueLop.getDoubleValue());
if ( naLop != null ) {
sb.append(OPERAND_DELIMITOR);
if(naLop instanceof Nary){
Nary naLops = (Nary) naLop;
for(Lop na : naLops.getInputs()){
sb.append(((Data)na).getStringValue());
sb.append(DataExpression.DELIM_NA_STRING_SEP);
}
} else if (naLop instanceof Data){
sb.append(((Data)naLop).getStringValue());
}
}
}
else { // (operation == OperationTypes.WRITE)
Data headerLop = (Data) getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW);
Data delimLop = (Data) getNamedInputLop(DataExpression.DELIM_DELIMITER);
Data sparseLop = (Data) getNamedInputLop(DataExpression.DELIM_SPARSE);
if (headerLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_HAS_HEADER_ROW
+ " must be a literal for a seq operation.");
if (delimLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_DELIMITER
+ " must be a literal for a seq operation.");
if (sparseLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_SPARSE
+ " must be a literal for a seq operation.");
sb.append(headerLop.getBooleanValue());
sb.append(OPERAND_DELIMITOR);
sb.append(delimLop.getStringValue());
sb.append(OPERAND_DELIMITOR);
sb.append(sparseLop.getBooleanValue());
}
return sb.toString();
}
private String createVarLIBSVMHelper() {
StringBuilder sb = new StringBuilder();
if ( _op.isRead() ) {
Data naLop = (Data) getNamedInputLop(DataExpression.DELIM_NA_STRINGS);
if ( naLop != null ) {
sb.append(OPERAND_DELIMITOR);
sb.append(naLop.getStringValue());
}
}
else { // (operation == OperationTypes.WRITE)
Data sparseLop = (Data) getNamedInputLop(DataExpression.DELIM_SPARSE);
if (sparseLop.isVariable())
throw new LopsException(this.printErrorLocation()
+ "Parameter " + DataExpression.DELIM_SPARSE
+ " must be a literal for a seq operation.");
sb.append(OPERAND_DELIMITOR);
sb.append(sparseLop.getBooleanValue());
}
return sb.toString();
}
}