blob: 3f52929fa7e02d00d23fe359b1150134713437cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.api.mlcontext;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.data.DatasetObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.instructions.spark.functions.ConvertStringToLongTextPair;
import org.apache.sysds.runtime.instructions.spark.functions.CopyTextInputFunction;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import scala.collection.JavaConversions;
import scala.reflect.ClassTag;
/**
* Utility class containing methods to perform data conversions.
*
*/
public class MLContextConversionUtil {
/**
* Convert a two-dimensional double array to a {@code MatrixObject}.
*
* @param variableName
* name of the variable associated with the matrix
* @param doubleMatrix
* matrix of double values
* @return the two-dimensional double matrix converted to a
* {@code MatrixObject}
*/
public static MatrixObject doubleMatrixToMatrixObject(String variableName, double[][] doubleMatrix) {
return doubleMatrixToMatrixObject(variableName, doubleMatrix, null);
}
/**
* Convert a two-dimensional double array to a {@code MatrixObject}.
*
* @param variableName
* name of the variable associated with the matrix
* @param doubleMatrix
* matrix of double values
* @param matrixMetadata
* the matrix metadata
* @return the two-dimensional double matrix converted to a
* {@code MatrixObject}
*/
public static MatrixObject doubleMatrixToMatrixObject(String variableName, double[][] doubleMatrix,
MatrixMetadata matrixMetadata) {
try {
MatrixBlock matrixBlock = DataConverter.convertToMatrixBlock(doubleMatrix);
MatrixCharacteristics mc = (matrixMetadata != null) ? matrixMetadata.asMatrixCharacteristics() :
new MatrixCharacteristics(matrixBlock.getNumRows(), matrixBlock.getNumColumns(),
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize());
MatrixObject matrixObject = new MatrixObject(ValueType.FP64,
OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(mc, FileFormat.BINARY));
matrixObject.acquireModify(matrixBlock);
matrixObject.release();
return matrixObject;
}
catch (DMLRuntimeException e) {
throw new MLContextException("Exception converting double[][] array to MatrixObject", e);
}
}
/**
* Convert a matrix at a URL to a {@code MatrixObject}.
*
* @param url
* the URL to a matrix (in CSV or IJV format)
* @param matrixMetadata
* the matrix metadata
* @return the matrix at a URL converted to a {@code MatrixObject}
*/
public static MatrixObject urlToMatrixObject(URL url, MatrixMetadata matrixMetadata) {
try (InputStream is = url.openStream()) {
List<String> lines = IOUtils.readLines(is);
JavaRDD<String> javaRDD = jsc().parallelize(lines);
if ((matrixMetadata == null) || (matrixMetadata.getMatrixFormat() == MatrixFormat.CSV))
return javaRDDStringCSVToMatrixObject(javaRDD, matrixMetadata);
if (matrixMetadata.getMatrixFormat() == MatrixFormat.IJV)
return javaRDDStringIJVToMatrixObject(javaRDD, matrixMetadata);
return null;
} catch (Exception e) {
throw new MLContextException("Exception converting URL to MatrixObject", e);
}
}
/**
* Convert a {@code MatrixBlock} to a {@code MatrixObject}.
*
* @param variableName
* name of the variable associated with the matrix
* @param matrixBlock
* matrix as a MatrixBlock
* @param matrixMetadata
* the matrix metadata
* @return the {@code MatrixBlock} converted to a {@code MatrixObject}
*/
public static MatrixObject matrixBlockToMatrixObject(String variableName, MatrixBlock matrixBlock,
MatrixMetadata matrixMetadata) {
try {
MatrixCharacteristics mc = (matrixMetadata != null) ?
matrixMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
MatrixObject matrixObject = new MatrixObject(ValueType.FP64,
OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(mc, FileFormat.BINARY));
matrixObject.acquireModify(matrixBlock);
matrixObject.release();
return matrixObject;
} catch (DMLRuntimeException e) {
throw new MLContextException("Exception converting MatrixBlock to MatrixObject", e);
}
}
/**
* Convert a {@code FrameBlock} to a {@code FrameObject}.
*
* @param variableName
* name of the variable associated with the frame
* @param frameBlock
* frame as a FrameBlock
* @param frameMetadata
* the frame metadata
* @return the {@code FrameBlock} converted to a {@code FrameObject}
*/
public static FrameObject frameBlockToFrameObject(String variableName, FrameBlock frameBlock, FrameMetadata frameMetadata) {
try {
MatrixCharacteristics mc = (frameMetadata != null) ?
frameMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
MetaDataFormat mtd = new MetaDataFormat(mc, FileFormat.BINARY);
FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(), mtd,
frameMetadata.getFrameSchema().getSchema().toArray(new ValueType[0]));
frameObject.acquireModify(frameBlock);
frameObject.release();
return frameObject;
} catch (DMLRuntimeException e) {
throw new MLContextException("Exception converting FrameBlock to FrameObject", e);
}
}
/**
* Convert a {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} to a
* {@code MatrixObject}.
*
* @param binaryBlocks
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} representation
* of a binary-block matrix
* @return the {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} matrix
* converted to a {@code MatrixObject}
*/
public static MatrixObject binaryBlocksToMatrixObject(
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks) {
return binaryBlocksToMatrixObject(binaryBlocks, null);
}
/**
* Convert a {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} to a
* {@code MatrixObject}.
*
* @param binaryBlocks
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} representation
* of a binary-block matrix
* @param matrixMetadata
* the matrix metadata
* @return the {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} matrix
* converted to a {@code MatrixObject}
*/
public static MatrixObject binaryBlocksToMatrixObject(
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks, MatrixMetadata matrixMetadata) {
return binaryBlocksToMatrixObject(binaryBlocks, matrixMetadata, true);
}
private static MatrixObject binaryBlocksToMatrixObject(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks,
MatrixMetadata matrixMetadata, boolean copy) {
MatrixCharacteristics mc = (matrixMetadata != null) ?
matrixMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRdd = SparkUtils.copyBinaryBlockMatrix(binaryBlocks, copy);
MatrixObject matrixObject = new MatrixObject(ValueType.FP64,
OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(mc, FileFormat.BINARY));
matrixObject.setRDDHandle(new RDDObject(javaPairRdd));
return matrixObject;
}
/**
* Convert a {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} to a
* {@code MatrixBlock}
*
* @param binaryBlocks
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} representation
* of a binary-block matrix
* @param matrixMetadata
* the matrix metadata
* @return the {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} matrix
* converted to a {@code MatrixBlock}
*/
public static MatrixBlock binaryBlocksToMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks,
MatrixMetadata matrixMetadata) {
try {
MatrixBlock matrixBlock = SparkExecutionContext.toMatrixBlock(binaryBlocks,
matrixMetadata.getNumRows().intValue(), matrixMetadata.getNumColumns().intValue(),
matrixMetadata.getBlocksize(), matrixMetadata.getNumNonZeros());
return matrixBlock;
} catch (DMLRuntimeException e) {
throw new MLContextException("Exception converting binary blocks to MatrixBlock", e);
}
}
/**
* Convert a {@code JavaPairRDD<Long, FrameBlock>} to a {@code FrameObject}.
*
* @param binaryBlocks
* {@code JavaPairRDD<Long, FrameBlock>} representation of a
* binary-block frame
* @return the {@code JavaPairRDD<Long, FrameBlock>} frame converted to a
* {@code FrameObject}
*/
public static FrameObject binaryBlocksToFrameObject(JavaPairRDD<Long, FrameBlock> binaryBlocks) {
return binaryBlocksToFrameObject(binaryBlocks, null);
}
/**
* Convert a {@code JavaPairRDD<Long, FrameBlock>} to a {@code FrameObject}.
*
* @param binaryBlocks
* {@code JavaPairRDD<Long, FrameBlock>} representation of a
* binary-block frame
* @param frameMetadata
* the frame metadata
* @return the {@code JavaPairRDD<Long, FrameBlock>} frame converted to a
* {@code FrameObject}
*/
public static FrameObject binaryBlocksToFrameObject(JavaPairRDD<Long, FrameBlock> binaryBlocks,
FrameMetadata frameMetadata) {
MatrixCharacteristics mc = (frameMetadata != null) ?
frameMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
ValueType[] schema = (frameMetadata != null) ?
frameMetadata.getFrameSchema().getSchema().toArray(new ValueType[0]) :
UtilFunctions.nCopies((int)mc.getCols(), ValueType.STRING);
FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(),
new MetaDataFormat(mc, FileFormat.BINARY), schema);
frameObject.setRDDHandle(new RDDObject(binaryBlocks));
return frameObject;
}
/**
* Convert a {@code DataFrame} to a {@code MatrixObject}.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @return the {@code DataFrame} matrix converted to a converted to a
* {@code MatrixObject}
*/
public static MatrixObject dataFrameToMatrixObject(Dataset<Row> dataFrame) {
return dataFrameToMatrixObject(dataFrame, null);
}
/**
* Convert a {@code DataFrame} to a {@code MatrixObject}.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param matrixMetadata
* the matrix metadata
* @return the {@code DataFrame} matrix converted to a converted to a
* {@code MatrixObject}
*/
public static MatrixObject dataFrameToMatrixObject(Dataset<Row> dataFrame,
MatrixMetadata matrixMetadata) {
matrixMetadata = (matrixMetadata != null) ? matrixMetadata : new MatrixMetadata();
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata);
MatrixObject mo = binaryBlocksToMatrixObject(binaryBlock, matrixMetadata, false);
// keep lineage of original dataset to allow bypassing binary block
// conversion if possible
mo.getRDDHandle().addLineageChild(new DatasetObject(dataFrame,
isDataFrameWithIDColumn(matrixMetadata), isVectorBasedDataFrame(matrixMetadata)));
return mo;
}
/**
* Convert a {@code DataFrame} to a {@code FrameObject}.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @return the {@code DataFrame} matrix converted to a converted to a
* {@code FrameObject}
*/
public static FrameObject dataFrameToFrameObject(Dataset<Row> dataFrame) {
return dataFrameToFrameObject(dataFrame, null);
}
/**
* Convert a {@code DataFrame} to a {@code FrameObject}.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param frameMetadata
* the frame metadata
* @return the {@code DataFrame} frame converted to a converted to a
* {@code FrameObject}
*/
public static FrameObject dataFrameToFrameObject(Dataset<Row> dataFrame,
FrameMetadata frameMetadata) {
try {
// setup meta data and java spark context
if (frameMetadata == null)
frameMetadata = new FrameMetadata();
determineFrameFormatIfNeeded(dataFrame, frameMetadata);
boolean containsID = isDataFrameWithIDColumn(frameMetadata);
MatrixCharacteristics mc = frameMetadata.asMatrixCharacteristics();
if (mc == null)
mc = new MatrixCharacteristics();
// convert data frame and obtain column names / schema
// TODO extend frame schema by column names (right now dropped)
Pair<String[], ValueType[]> ret = new Pair<>();
JavaPairRDD<Long, FrameBlock> binaryBlock = FrameRDDConverterUtils.dataFrameToBinaryBlock(jsc(), dataFrame,
mc, containsID, ret);
frameMetadata.setFrameSchema(new FrameSchema(Arrays.asList(ret.getValue())));
frameMetadata.setMatrixCharacteristics(mc); // required due to meta
// data copy
return MLContextConversionUtil.binaryBlocksToFrameObject(binaryBlock, frameMetadata);
} catch (DMLRuntimeException e) {
throw new MLContextException("Exception converting DataFrame to FrameObject", e);
}
}
/**
* Convert a {@code DataFrame} to a
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} binary-block matrix.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @return the {@code DataFrame} matrix converted to a
* {@code JavaPairRDD<MatrixIndexes,
* MatrixBlock>} binary-block matrix
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(Dataset<Row> dataFrame) {
return dataFrameToMatrixBinaryBlocks(dataFrame, null);
}
/**
* Convert a {@code DataFrame} to a
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} binary-block matrix.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param matrixMetadata
* the matrix metadata
* @return the {@code DataFrame} matrix converted to a
* {@code JavaPairRDD<MatrixIndexes,
* MatrixBlock>} binary-block matrix
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(Dataset<Row> dataFrame,
MatrixMetadata matrixMetadata) {
// handle meta data
determineMatrixFormatIfNeeded(dataFrame, matrixMetadata);
MatrixCharacteristics mc = (matrixMetadata != null && matrixMetadata.asMatrixCharacteristics() != null)
? matrixMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
boolean containsID = isDataFrameWithIDColumn(matrixMetadata);
boolean isVector = isVectorBasedDataFrame(matrixMetadata);
// convert data frame to binary block matrix
JavaPairRDD<MatrixIndexes, MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(jsc(), dataFrame, mc,
containsID, isVector);
// update determined matrix characteristics
if (matrixMetadata != null)
matrixMetadata.setMatrixCharacteristics(mc);
return out;
}
/**
* Convert a {@code DataFrame} to a {@code JavaPairRDD<Long, FrameBlock>}
* binary-block frame.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param frameMetadata
* the frame metadata
* @return the {@code DataFrame} matrix converted to a
* {@code JavaPairRDD<Long,
* FrameBlock>} binary-block frame
*/
public static JavaPairRDD<Long, FrameBlock> dataFrameToFrameBinaryBlocks(Dataset<Row> dataFrame,
FrameMetadata frameMetadata) {
throw new MLContextException("dataFrameToFrameBinaryBlocks is unimplemented");
}
/**
* If the MatrixFormat of the DataFrame has not been explicitly specified,
* attempt to determine the proper MatrixFormat.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param matrixMetadata
* the matrix metadata, if available
*/
public static void determineMatrixFormatIfNeeded(Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) {
if (matrixMetadata == null) {
return;
}
MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
if (matrixFormat != null) {
return;
}
StructType schema = dataFrame.schema();
boolean hasID = false;
try {
schema.fieldIndex(RDDConverterUtils.DF_ID_COLUMN);
hasID = true;
} catch (IllegalArgumentException iae) {
}
StructField[] fields = schema.fields();
MatrixFormat mf = null;
if (hasID) {
if (fields[1].dataType() instanceof VectorUDT) {
mf = MatrixFormat.DF_VECTOR_WITH_INDEX;
} else {
mf = MatrixFormat.DF_DOUBLES_WITH_INDEX;
}
} else {
if (fields[0].dataType() instanceof VectorUDT) {
mf = MatrixFormat.DF_VECTOR;
} else {
mf = MatrixFormat.DF_DOUBLES;
}
}
if (mf == null) {
throw new MLContextException("DataFrame format not recognized as an accepted SystemDS MatrixFormat");
}
matrixMetadata.setMatrixFormat(mf);
}
/**
* If the FrameFormat of the DataFrame has not been explicitly specified,
* attempt to determine the proper FrameFormat.
*
* @param dataFrame
* the Spark {@code DataFrame}
* @param frameMetadata
* the frame metadata, if available
*/
public static void determineFrameFormatIfNeeded(Dataset<Row> dataFrame, FrameMetadata frameMetadata) {
FrameFormat frameFormat = frameMetadata.getFrameFormat();
if (frameFormat != null) {
return;
}
StructType schema = dataFrame.schema();
boolean hasID = false;
try {
schema.fieldIndex(RDDConverterUtils.DF_ID_COLUMN);
hasID = true;
} catch (IllegalArgumentException iae) {
}
FrameFormat ff = hasID ? FrameFormat.DF_WITH_INDEX : FrameFormat.DF;
frameMetadata.setFrameFormat(ff);
}
/**
* Return whether or not the DataFrame has an ID column.
*
* @param matrixMetadata
* the matrix metadata
* @return {@code true} if the DataFrame has an ID column, {@code false}
* otherwise.
*/
public static boolean isDataFrameWithIDColumn(MatrixMetadata matrixMetadata) {
return (matrixMetadata != null && matrixMetadata.getMatrixFormat() != null
&& matrixMetadata.getMatrixFormat().hasIDColumn());
}
/**
* Return whether or not the DataFrame has an ID column.
*
* @param frameMetadata
* the frame metadata
* @return {@code true} if the DataFrame has an ID column, {@code false}
* otherwise.
*/
public static boolean isDataFrameWithIDColumn(FrameMetadata frameMetadata) {
return (frameMetadata != null && frameMetadata.getFrameFormat() != null
&& frameMetadata.getFrameFormat().hasIDColumn());
}
/**
* Return whether or not the DataFrame is vector-based.
*
* @param matrixMetadata
* the matrix metadata
* @return {@code true} if the DataFrame is vector-based, {@code false}
* otherwise.
*/
public static boolean isVectorBasedDataFrame(MatrixMetadata matrixMetadata) {
return (matrixMetadata != null && matrixMetadata.getMatrixFormat() != null
&& matrixMetadata.getMatrixFormat().isVectorBased());
}
/**
* Convert a {@code JavaRDD<String>} in CSV format to a {@code MatrixObject}
*
* @param javaRDD
* the Java RDD of strings
* @return the {@code JavaRDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject javaRDDStringCSVToMatrixObject(JavaRDD<String> javaRDD) {
return javaRDDStringCSVToMatrixObject(javaRDD, null);
}
/**
* Convert a {@code JavaRDD<String>} in CSV format to a {@code MatrixObject}
*
* @param javaRDD
* the Java RDD of strings
* @param matrixMetadata
* matrix metadata
* @return the {@code JavaRDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject javaRDDStringCSVToMatrixObject(JavaRDD<String> javaRDD,
MatrixMetadata matrixMetadata) {
JavaPairRDD<LongWritable, Text> javaPairRDD = javaRDD.mapToPair(new ConvertStringToLongTextPair());
DataCharacteristics mc = (matrixMetadata != null) ? matrixMetadata.asMatrixCharacteristics()
: new MatrixCharacteristics();
MatrixObject matrixObject = new MatrixObject(ValueType.FP64, OptimizerUtils.getUniqueTempFileName(),
new MetaDataFormat(mc, FileFormat.CSV));
JavaPairRDD<LongWritable, Text> javaPairRDD2 = javaPairRDD.mapToPair(new CopyTextInputFunction());
matrixObject.setRDDHandle(new RDDObject(javaPairRDD2));
return matrixObject;
}
/**
* Convert a {@code JavaRDD<String>} in CSV format to a {@code FrameObject}
*
* @param javaRDD
* the Java RDD of strings
* @return the {@code JavaRDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject javaRDDStringCSVToFrameObject(JavaRDD<String> javaRDD) {
return javaRDDStringCSVToFrameObject(javaRDD, null);
}
/**
* Convert a {@code JavaRDD<String>} in CSV format to a {@code FrameObject}
*
* @param javaRDD
* the Java RDD of strings
* @param frameMetadata
* frame metadata
* @return the {@code JavaRDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject javaRDDStringCSVToFrameObject(JavaRDD<String> javaRDD,
FrameMetadata frameMetadata) {
JavaPairRDD<LongWritable, Text> javaPairRDD = javaRDD.mapToPair(new ConvertStringToLongTextPair());
MatrixCharacteristics mc = (frameMetadata != null) ? frameMetadata.asMatrixCharacteristics()
: new MatrixCharacteristics();
JavaPairRDD<LongWritable, Text> javaPairRDDText = javaPairRDD.mapToPair(new CopyTextInputFunction());
FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(),
new MetaDataFormat(mc, FileFormat.BINARY),
frameMetadata.getFrameSchema().getSchema().toArray(new ValueType[0]));
JavaPairRDD<Long, FrameBlock> rdd;
rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc(), javaPairRDDText, mc, frameObject.getSchema(), false,
",", false, -1, UtilFunctions.defaultNaString);
frameObject.setRDDHandle(new RDDObject(rdd));
return frameObject;
}
/**
* Convert a {@code JavaRDD<String>} in IJV format to a {@code MatrixObject}
* . Note that metadata is required for IJV format.
*
* @param javaRDD
* the Java RDD of strings
* @param matrixMetadata
* matrix metadata
* @return the {@code JavaRDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject javaRDDStringIJVToMatrixObject(JavaRDD<String> javaRDD,
MatrixMetadata matrixMetadata) {
JavaPairRDD<LongWritable, Text> javaPairRDD = javaRDD.mapToPair(new ConvertStringToLongTextPair());
MatrixCharacteristics mc = (matrixMetadata != null) ? matrixMetadata.asMatrixCharacteristics()
: new MatrixCharacteristics();
MatrixObject matrixObject = new MatrixObject(ValueType.FP64, OptimizerUtils.getUniqueTempFileName(),
new MetaDataFormat(mc, FileFormat.TEXT));
JavaPairRDD<LongWritable, Text> javaPairRDD2 = javaPairRDD.mapToPair(new CopyTextInputFunction());
matrixObject.setRDDHandle(new RDDObject(javaPairRDD2));
return matrixObject;
}
/**
* Convert a {@code JavaRDD<String>} in IJV format to a {@code FrameObject}
* . Note that metadata is required for IJV format.
*
* @param javaRDD
* the Java RDD of strings
* @param frameMetadata
* frame metadata
* @return the {@code JavaRDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject javaRDDStringIJVToFrameObject(JavaRDD<String> javaRDD,
FrameMetadata frameMetadata) {
JavaPairRDD<LongWritable, Text> javaPairRDD = javaRDD.mapToPair(new ConvertStringToLongTextPair());
MatrixCharacteristics mc = (frameMetadata != null) ? frameMetadata.asMatrixCharacteristics()
: new MatrixCharacteristics();
JavaPairRDD<LongWritable, Text> javaPairRDDText = javaPairRDD.mapToPair(new CopyTextInputFunction());
FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(),
new MetaDataFormat(mc, FileFormat.BINARY),
frameMetadata.getFrameSchema().getSchema().toArray(new ValueType[0]));
JavaPairRDD<Long, FrameBlock> rdd;
try {
ValueType[] lschema = UtilFunctions.nCopies((int) mc.getCols(), ValueType.STRING);
rdd = FrameRDDConverterUtils.textCellToBinaryBlock(jsc(), javaPairRDDText, mc, lschema);
} catch (DMLRuntimeException e) {
e.printStackTrace();
return null;
}
frameObject.setRDDHandle(new RDDObject(rdd));
return frameObject;
}
/**
* Convert a {@code RDD<String>} in CSV format to a {@code MatrixObject}
*
* @param rdd
* the RDD of strings
* @return the {@code RDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject rddStringCSVToMatrixObject(RDD<String> rdd) {
return rddStringCSVToMatrixObject(rdd, null);
}
/**
* Convert a {@code RDD<String>} in CSV format to a {@code MatrixObject}
*
* @param rdd
* the RDD of strings
* @param matrixMetadata
* matrix metadata
* @return the {@code RDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject rddStringCSVToMatrixObject(RDD<String> rdd,
MatrixMetadata matrixMetadata) {
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
return javaRDDStringCSVToMatrixObject(javaRDD, matrixMetadata);
}
/**
* Convert a {@code RDD<String>} in CSV format to a {@code FrameObject}
*
* @param rdd
* the RDD of strings
* @return the {@code RDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject rddStringCSVToFrameObject(RDD<String> rdd) {
return rddStringCSVToFrameObject(rdd, null);
}
/**
* Convert a {@code RDD<String>} in CSV format to a {@code FrameObject}
*
* @param rdd
* the RDD of strings
* @param frameMetadata
* frame metadata
* @return the {@code RDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject rddStringCSVToFrameObject(RDD<String> rdd,
FrameMetadata frameMetadata) {
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
return javaRDDStringCSVToFrameObject(javaRDD, frameMetadata);
}
/**
* Convert a {@code RDD<String>} in IJV format to a {@code MatrixObject}.
* Note that metadata is required for IJV format.
*
* @param rdd
* the RDD of strings
* @param matrixMetadata
* matrix metadata
* @return the {@code RDD<String>} converted to a {@code MatrixObject}
*/
public static MatrixObject rddStringIJVToMatrixObject(RDD<String> rdd,
MatrixMetadata matrixMetadata) {
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
return javaRDDStringIJVToMatrixObject(javaRDD, matrixMetadata);
}
/**
* Convert a {@code RDD<String>} in IJV format to a {@code FrameObject}.
* Note that metadata is required for IJV format.
*
* @param rdd
* the RDD of strings
* @param frameMetadata
* frame metadata
* @return the {@code RDD<String>} converted to a {@code FrameObject}
*/
public static FrameObject rddStringIJVToFrameObject(RDD<String> rdd,
FrameMetadata frameMetadata) {
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
return javaRDDStringIJVToFrameObject(javaRDD, frameMetadata);
}
/**
* Convert a {@code MatrixObject} to a {@code JavaRDD<String>} in CSV
* format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code JavaRDD<String>}
*/
public static JavaRDD<String> matrixObjectToJavaRDDStringCSV(MatrixObject matrixObject) {
List<String> list = matrixObjectToListStringCSV(matrixObject);
return jsc().parallelize(list);
}
/**
* Convert a {@code FrameObject} to a {@code JavaRDD<String>} in CSV format.
*
* @param frameObject
* the {@code FrameObject}
* @param delimiter
* the delimiter
* @return the {@code FrameObject} converted to a {@code JavaRDD<String>}
*/
public static JavaRDD<String> frameObjectToJavaRDDStringCSV(FrameObject frameObject, String delimiter) {
List<String> list = frameObjectToListStringCSV(frameObject, delimiter);
return jsc().parallelize(list);
}
/**
* Convert a {@code MatrixObject} to a {@code JavaRDD<String>} in IJV
* format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code JavaRDD<String>}
*/
public static JavaRDD<String> matrixObjectToJavaRDDStringIJV(MatrixObject matrixObject) {
List<String> list = matrixObjectToListStringIJV(matrixObject);
return jsc().parallelize(list);
}
/**
* Convert a {@code FrameObject} to a {@code JavaRDD<String>} in IJV format.
*
* @param frameObject
* the {@code FrameObject}
* @return the {@code FrameObject} converted to a {@code JavaRDD<String>}
*/
public static JavaRDD<String> frameObjectToJavaRDDStringIJV(FrameObject frameObject) {
List<String> list = frameObjectToListStringIJV(frameObject);
return jsc().parallelize(list);
}
/**
* Convert a {@code MatrixObject} to a {@code RDD<String>} in IJV format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code RDD<String>}
*/
public static RDD<String> matrixObjectToRDDStringIJV(MatrixObject matrixObject) {
// NOTE: The following works when called from Java but does not
// currently work when called from Spark Shell (when you call
// collect() on the RDD<String>).
//
// JavaRDD<String> javaRDD = jsc.parallelize(list);
// RDD<String> rdd = JavaRDD.toRDD(javaRDD);
//
// Therefore, we call parallelize() on the SparkContext rather than
// the JavaSparkContext to produce the RDD<String> for Scala.
List<String> list = matrixObjectToListStringIJV(matrixObject);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
return sc().parallelize(JavaConversions.asScalaBuffer(list), sc().defaultParallelism(), tag);
}
/**
* Convert a {@code FrameObject} to a {@code RDD<String>} in IJV format.
*
* @param frameObject
* the {@code FrameObject}
* @return the {@code FrameObject} converted to a {@code RDD<String>}
*/
public static RDD<String> frameObjectToRDDStringIJV(FrameObject frameObject) {
// NOTE: The following works when called from Java but does not
// currently work when called from Spark Shell (when you call
// collect() on the RDD<String>).
//
// JavaRDD<String> javaRDD = jsc.parallelize(list);
// RDD<String> rdd = JavaRDD.toRDD(javaRDD);
//
// Therefore, we call parallelize() on the SparkContext rather than
// the JavaSparkContext to produce the RDD<String> for Scala.
List<String> list = frameObjectToListStringIJV(frameObject);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
return sc().parallelize(JavaConversions.asScalaBuffer(list), sc().defaultParallelism(), tag);
}
/**
* Convert a {@code MatrixObject} to a {@code RDD<String>} in CSV format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code RDD<String>}
*/
public static RDD<String> matrixObjectToRDDStringCSV(MatrixObject matrixObject) {
// NOTE: The following works when called from Java but does not
// currently work when called from Spark Shell (when you call
// collect() on the RDD<String>).
//
// JavaRDD<String> javaRDD = jsc.parallelize(list);
// RDD<String> rdd = JavaRDD.toRDD(javaRDD);
//
// Therefore, we call parallelize() on the SparkContext rather than
// the JavaSparkContext to produce the RDD<String> for Scala.
List<String> list = matrixObjectToListStringCSV(matrixObject);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
return sc().parallelize(JavaConversions.asScalaBuffer(list), sc().defaultParallelism(), tag);
}
/**
* Convert a {@code FrameObject} to a {@code RDD<String>} in CSV format.
*
* @param frameObject
* the {@code FrameObject}
* @param delimiter
* the delimiter
* @return the {@code FrameObject} converted to a {@code RDD<String>}
*/
public static RDD<String> frameObjectToRDDStringCSV(FrameObject frameObject, String delimiter) {
// NOTE: The following works when called from Java but does not
// currently work when called from Spark Shell (when you call
// collect() on the RDD<String>).
//
// JavaRDD<String> javaRDD = jsc.parallelize(list);
// RDD<String> rdd = JavaRDD.toRDD(javaRDD);
//
// Therefore, we call parallelize() on the SparkContext rather than
// the JavaSparkContext to produce the RDD<String> for Scala.
List<String> list = frameObjectToListStringCSV(frameObject, delimiter);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
return sc().parallelize(JavaConversions.asScalaBuffer(list), sc().defaultParallelism(), tag);
}
/**
* Convert a {@code MatrixObject} to a {@code List<String>} in CSV format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code List<String>}
*/
public static List<String> matrixObjectToListStringCSV(MatrixObject matrixObject) {
MatrixBlock mb = matrixObject.acquireRead();
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
List<String> list = new ArrayList<>();
if ( !mb.isEmptyBlock(false) ) {
if (mb.isInSparseFormat()) {
Iterator<IJV> iter = mb.getSparseBlockIterator();
int prevCellRow = -1;
StringBuilder sb = null;
while (iter.hasNext()) {
IJV cell = iter.next();
int i = cell.getI();
double v = cell.getV();
if (i > prevCellRow) {
if (sb == null) {
sb = new StringBuilder();
} else {
list.add(sb.toString());
sb = new StringBuilder();
}
sb.append(v);
prevCellRow = i;
} else if (i == prevCellRow) {
sb.append(",");
sb.append(v);
}
}
if (sb != null) {
list.add(sb.toString());
}
} else {
for (int i = 0; i < rows; i++) {
StringBuilder sb = new StringBuilder();
for (int j = 0; j < cols; j++) {
if (j > 0) {
sb.append(",");
}
sb.append(mb.getValueDenseUnsafe(i, j));
}
list.add(sb.toString());
}
}
}
matrixObject.release();
return list;
}
/**
* Convert a {@code FrameObject} to a {@code List<String>} in CSV format.
*
* @param frameObject
* the {@code FrameObject}
* @param delimiter
* the delimiter
* @return the {@code FrameObject} converted to a {@code List<String>}
*/
public static List<String> frameObjectToListStringCSV(FrameObject frameObject, String delimiter) {
FrameBlock fb = frameObject.acquireRead();
int rows = fb.getNumRows();
int cols = fb.getNumColumns();
List<String> list = new ArrayList<>();
for (int i = 0; i < rows; i++) {
StringBuilder sb = new StringBuilder();
for (int j = 0; j < cols; j++) {
if (j > 0) {
sb.append(delimiter);
}
if (fb.get(i, j) != null) {
sb.append(fb.get(i, j));
}
}
list.add(sb.toString());
}
frameObject.release();
return list;
}
/**
* Convert a {@code MatrixObject} to a {@code List<String>} in IJV format.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code List<String>}
*/
public static List<String> matrixObjectToListStringIJV(MatrixObject matrixObject) {
MatrixBlock mb = matrixObject.acquireRead();
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
List<String> list = new ArrayList<>();
if (mb.getNonZeros() > 0) {
if (mb.isInSparseFormat()) {
Iterator<IJV> iter = mb.getSparseBlockIterator();
StringBuilder sb = null;
while (iter.hasNext()) {
IJV cell = iter.next();
sb = new StringBuilder();
sb.append(cell.getI() + 1);
sb.append(" ");
sb.append(cell.getJ() + 1);
sb.append(" ");
sb.append(cell.getV());
list.add(sb.toString());
}
} else {
StringBuilder sb = null;
for (int i = 0; i < rows; i++) {
sb = new StringBuilder();
for (int j = 0; j < cols; j++) {
sb = new StringBuilder();
sb.append(i + 1);
sb.append(" ");
sb.append(j + 1);
sb.append(" ");
sb.append(mb.getValueDenseUnsafe(i, j));
list.add(sb.toString());
}
}
}
}
matrixObject.release();
return list;
}
/**
* Convert a {@code FrameObject} to a {@code List<String>} in IJV format.
*
* @param frameObject
* the {@code FrameObject}
* @return the {@code FrameObject} converted to a {@code List<String>}
*/
public static List<String> frameObjectToListStringIJV(FrameObject frameObject) {
FrameBlock fb = frameObject.acquireRead();
int rows = fb.getNumRows();
int cols = fb.getNumColumns();
List<String> list = new ArrayList<>();
StringBuilder sb = null;
for (int i = 0; i < rows; i++) {
sb = new StringBuilder();
for (int j = 0; j < cols; j++) {
if (fb.get(i, j) != null) {
sb = new StringBuilder();
sb.append(i + 1);
sb.append(" ");
sb.append(j + 1);
sb.append(" ");
sb.append(fb.get(i, j));
list.add(sb.toString());
}
}
}
frameObject.release();
return list;
}
/**
* Convert a {@code MatrixObject} to a two-dimensional double array.
*
* @param matrixObject
* the {@code MatrixObject}
* @return the {@code MatrixObject} converted to a {@code double[][]}
*/
public static double[][] matrixObjectTo2DDoubleArray(MatrixObject matrixObject) {
MatrixBlock mb = matrixObject.acquireRead();
double[][] matrix = DataConverter.convertToDoubleMatrix(mb);
matrixObject.release();
return matrix;
}
/**
* Convert a {@code MatrixObject} to a {@code DataFrame}.
*
* @param matrixObject
* the {@code MatrixObject}
* @param sparkExecutionContext
* the Spark execution context
* @param isVectorDF
* is the DataFrame a vector DataFrame?
* @return the {@code MatrixObject} converted to a {@code DataFrame}
*/
public static Dataset<Row> matrixObjectToDataFrame(MatrixObject matrixObject,
SparkExecutionContext sparkExecutionContext, boolean isVectorDF) {
try {
@SuppressWarnings("unchecked")
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
sparkExecutionContext.getRDDHandleForMatrixObject(matrixObject, FileFormat.BINARY);
DataCharacteristics mc = matrixObject.getDataCharacteristics();
return RDDConverterUtils.binaryBlockToDataFrame(spark(), binaryBlocks, mc, isVectorDF);
} catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting matrix object to DataFrame", e);
}
}
/**
* Convert a {@code FrameObject} to a {@code DataFrame}.
*
* @param frameObject
* the {@code FrameObject}
* @param sparkExecutionContext
* the Spark execution context
* @return the {@code FrameObject} converted to a {@code DataFrame}
*/
public static Dataset<Row> frameObjectToDataFrame(FrameObject frameObject,
SparkExecutionContext sparkExecutionContext) {
try {
@SuppressWarnings("unchecked")
JavaPairRDD<Long, FrameBlock> binaryBlockFrame = (JavaPairRDD<Long, FrameBlock>)
sparkExecutionContext.getRDDHandleForFrameObject(frameObject, FileFormat.BINARY);
DataCharacteristics mc = frameObject.getDataCharacteristics();
return FrameRDDConverterUtils.binaryBlockToDataFrame(spark(), binaryBlockFrame, mc,
frameObject.getSchema());
} catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e);
}
}
/**
* Convert a {@code MatrixObject} to a
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}.
*
* @param matrixObject
* the {@code MatrixObject}
* @param sparkExecutionContext
* the Spark execution context
* @return the {@code MatrixObject} converted to a
* {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> matrixObjectToBinaryBlocks(MatrixObject matrixObject,
SparkExecutionContext sparkExecutionContext) {
try {
@SuppressWarnings("unchecked")
JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
sparkExecutionContext.getRDDHandleForMatrixObject(matrixObject, FileFormat.BINARY);
return binaryBlocks;
} catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting matrix object to binary blocks", e);
}
}
/**
* Convert a {@code FrameObject} to a {@code JavaPairRDD<Long, FrameBlock>}.
*
* @param frameObject
* the {@code FrameObject}
* @param sparkExecutionContext
* the Spark execution context
* @return the {@code FrameObject} converted to a
* {@code JavaPairRDD<Long, FrameBlock>}
*/
public static JavaPairRDD<Long, FrameBlock> frameObjectToBinaryBlocks(FrameObject frameObject,
SparkExecutionContext sparkExecutionContext) {
try {
@SuppressWarnings("unchecked")
JavaPairRDD<Long, FrameBlock> binaryBlocks = (JavaPairRDD<Long, FrameBlock>)
sparkExecutionContext.getRDDHandleForFrameObject(frameObject, FileFormat.BINARY);
return binaryBlocks;
} catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting frame object to binary blocks", e);
}
}
/**
* Convert a {@code FrameObject} to a two-dimensional string array.
*
* @param frameObject
* the {@code FrameObject}
* @return the {@code FrameObject} converted to a {@code String[][]}
*/
public static String[][] frameObjectTo2DStringArray(FrameObject frameObject) {
FrameBlock fb = frameObject.acquireRead();
String[][] frame = DataConverter.convertToStringFrame(fb);
frameObject.release();
return frame;
}
/**
* Obtain JavaSparkContext from MLContextProxy.
*
* @return the Java Spark Context
*/
public static JavaSparkContext jsc() {
return MLContextUtil.getJavaSparkContextFromProxy();
}
/**
* Obtain SparkContext from MLContextProxy.
*
* @return the Spark Context
*/
public static SparkContext sc() {
return MLContextUtil.getSparkContextFromProxy();
}
/**
* Obtain SparkSession from MLContextProxy.
*
* @return the Spark Session
*/
public static SparkSession spark() {
return MLContextUtil.getSparkSessionFromProxy();
}
}