blob: ffd6de1046e6aace6e594bf71697d47db58b6831 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.api.mlcontext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
/**
* Frame encapsulates a SystemDS frame.
*
*/
public class Frame {
private FrameObject frameObject;
private SparkExecutionContext sparkExecutionContext;
private JavaPairRDD<Long, FrameBlock> binaryBlocks;
private FrameMetadata frameMetadata;
public Frame(FrameObject frameObject, SparkExecutionContext sparkExecutionContext) {
this.frameObject = frameObject;
this.sparkExecutionContext = sparkExecutionContext;
this.frameMetadata = new FrameMetadata(frameObject.getDataCharacteristics());
}
/**
* Convert a Spark DataFrame to a SystemDS binary-block representation.
*
* @param dataFrame
* the Spark DataFrame
* @param frameMetadata
* frame metadata, such as number of rows and columns
*/
public Frame(Dataset<Row> dataFrame, FrameMetadata frameMetadata) {
this.frameMetadata = frameMetadata;
binaryBlocks = MLContextConversionUtil.dataFrameToFrameBinaryBlocks(dataFrame, frameMetadata);
}
/**
* Convert a Spark DataFrame to a SystemDS binary-block representation,
* specifying the number of rows and columns.
*
* @param dataFrame
* the Spark DataFrame
* @param numRows
* the number of rows
* @param numCols
* the number of columns
*/
public Frame(Dataset<Row> dataFrame, long numRows, long numCols) {
this(dataFrame, new FrameMetadata(numRows, numCols, ConfigurationManager.getBlocksize()));
}
/**
* Convert a Spark DataFrame to a SystemDS binary-block representation.
*
* @param dataFrame
* the Spark DataFrame
*/
public Frame(Dataset<Row> dataFrame) {
this(dataFrame, new FrameMetadata());
}
/**
* Create a Frame, specifying the SystemDS binary-block frame and its
* metadata.
*
* @param binaryBlocks
* the {@code JavaPairRDD<Long, FrameBlock>} frame
* @param frameMetadata
* frame metadata, such as number of rows and columnss
*/
public Frame(JavaPairRDD<Long, FrameBlock> binaryBlocks, FrameMetadata frameMetadata) {
this.binaryBlocks = binaryBlocks;
this.frameMetadata = frameMetadata;
}
/**
* Obtain the frame as a SystemDS FrameObject.
*
* @return the frame as a SystemDS FrameObject
*/
public FrameObject toFrameObject() {
return frameObject;
}
/**
* Obtain the frame as a two-dimensional String array
*
* @return the frame as a two-dimensional String array
*/
public String[][] to2DStringArray() {
return MLContextConversionUtil.frameObjectTo2DStringArray(frameObject);
}
/**
* Obtain the frame as a {@code JavaRDD<String>} in IJV format
*
* @return the frame as a {@code JavaRDD<String>} in IJV format
*/
public JavaRDD<String> toJavaRDDStringIJV() {
return MLContextConversionUtil.frameObjectToJavaRDDStringIJV(frameObject);
}
/**
* Obtain the frame as a {@code JavaRDD<String>} in CSV format
*
* @return the frame as a {@code JavaRDD<String>} in CSV format
*/
public JavaRDD<String> toJavaRDDStringCSV() {
return MLContextConversionUtil.frameObjectToJavaRDDStringCSV(frameObject, ",");
}
/**
* Obtain the frame as a {@code RDD<String>} in CSV format
*
* @return the frame as a {@code RDD<String>} in CSV format
*/
public RDD<String> toRDDStringCSV() {
return MLContextConversionUtil.frameObjectToRDDStringCSV(frameObject, ",");
}
/**
* Obtain the frame as a {@code RDD<String>} in IJV format
*
* @return the frame as a {@code RDD<String>} in IJV format
*/
public RDD<String> toRDDStringIJV() {
return MLContextConversionUtil.frameObjectToRDDStringIJV(frameObject);
}
/**
* Obtain the frame as a {@code DataFrame}
*
* @return the frame as a {@code DataFrame}
*/
public Dataset<Row> toDF() {
return MLContextConversionUtil.frameObjectToDataFrame(frameObject, sparkExecutionContext);
}
/**
* Obtain the frame as a {@code JavaPairRDD<Long, FrameBlock>}
*
* @return the frame as a {@code JavaPairRDD<Long, FrameBlock>}
*/
public JavaPairRDD<Long, FrameBlock> toBinaryBlocks() {
if (binaryBlocks != null) {
return binaryBlocks;
} else if (frameObject != null) {
binaryBlocks = MLContextConversionUtil.frameObjectToBinaryBlocks(frameObject, sparkExecutionContext);
DataCharacteristics mc = frameObject.getDataCharacteristics();
frameMetadata = new FrameMetadata(mc);
return binaryBlocks;
}
throw new MLContextException("No binary blocks or FrameObject found");
}
/**
* Obtain the frame metadata
*
* @return the frame metadata
*/
public FrameMetadata getFrameMetadata() {
return frameMetadata;
}
@Override
public String toString() {
return frameObject.toString();
}
/**
* Whether or not this frame contains data as binary blocks
*
* @return {@code true} if data as binary blocks are present, {@code false}
* otherwise.
*/
public boolean hasBinaryBlocks() {
return (binaryBlocks != null);
}
/**
* Whether or not this frame contains data as a FrameObject
*
* @return {@code true} if data as binary blocks are present, {@code false}
* otherwise.
*/
public boolean hasFrameObject() {
return (frameObject != null);
}
}