blob: 0674cda2e469f93164ddc272f004786fcfa660a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.test.functions.mlcontext;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.linalg.DenseVector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
{
private final static String TEST_DIR = "functions/mlcontext/";
private final static String TEST_NAME = "DataFrameConversion";
private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameVectorFrameConversionTest.class.getSimpleName() + "/";
//schema restriction: single vector included
private final static ValueType[] schemaStrings = new ValueType[]{ValueType.UNKNOWN, ValueType.STRING, ValueType.STRING, ValueType.STRING};
private final static ValueType[] schemaDoubles = new ValueType[]{ValueType.FP64, ValueType.FP64, ValueType.UNKNOWN, ValueType.FP64};
private final static ValueType[] schemaMixed1 = new ValueType[]{ValueType.UNKNOWN, ValueType.INT64, ValueType.STRING, ValueType.FP64, ValueType.INT64};
private final static ValueType[] schemaMixed2 = new ValueType[]{ValueType.STRING, ValueType.UNKNOWN, ValueType.FP64};
private final static int rows1 = 2245;
private final static int colsVector = 7;
private final static double sparsity1 = 0.9;
private final static double sparsity2 = 0.1;
private final static double eps=0.0000000001;
private static SparkSession spark;
private static JavaSparkContext sc;
@BeforeClass
public static void setUpClass() {
spark = createSystemDSSparkSession("DataFrameVectorFrameConversionTest", "local");
sc = new JavaSparkContext(spark.sparkContext());
}
@Override
public void setUp() {
addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
}
@Test
public void testVectorStringsConversionIDDenseUnknown() {
testDataFrameConversion(schemaStrings, true, false, true);
}
@Test
public void testVectorDoublesConversionIDDenseUnknown() {
testDataFrameConversion(schemaDoubles, true, false, true);
}
@Test
public void testVectorMixed1ConversionIDDenseUnknown() {
testDataFrameConversion(schemaMixed1, true, false, true);
}
@Test
public void testVectorMixed2ConversionIDDenseUnknown() {
testDataFrameConversion(schemaMixed2, true, false, true);
}
@Test
public void testVectorStringsConversionIDDense() {
testDataFrameConversion(schemaStrings, true, false, false);
}
@Test
public void testVectorDoublesConversionIDDense() {
testDataFrameConversion(schemaDoubles, true, false, false);
}
@Test
public void testVectorMixed1ConversionIDDense() {
testDataFrameConversion(schemaMixed1, true, false, false);
}
@Test
public void testVectorMixed2ConversionIDDense() {
testDataFrameConversion(schemaMixed2, true, false, false);
}
@Test
public void testVectorStringsConversionIDSparseUnknown() {
testDataFrameConversion(schemaStrings, true, true, true);
}
@Test
public void testVectorDoublesConversionIDSparseUnknown() {
testDataFrameConversion(schemaDoubles, true, true, true);
}
@Test
public void testVectorMixed1ConversionIDSparseUnknown() {
testDataFrameConversion(schemaMixed1, true, true, true);
}
@Test
public void testVectorMixed2ConversionIDSparseUnknown() {
testDataFrameConversion(schemaMixed2, true, true, true);
}
@Test
public void testVectorStringsConversionIDSparse() {
testDataFrameConversion(schemaStrings, true, true, false);
}
@Test
public void testVectorDoublesConversionIDSparse() {
testDataFrameConversion(schemaDoubles, true, true, false);
}
@Test
public void testVectorMixed1ConversionIDSparse() {
testDataFrameConversion(schemaMixed1, true, true, false);
}
@Test
public void testVectorMixed2ConversionIDSparse() {
testDataFrameConversion(schemaMixed2, true, true, false);
}
@Test
public void testVectorStringsConversionDenseUnknown() {
testDataFrameConversion(schemaStrings, false, false, true);
}
@Test
public void testVectorDoublesConversionDenseUnknown() {
testDataFrameConversion(schemaDoubles, false, false, true);
}
@Test
public void testVectorMixed1ConversionDenseUnknown() {
testDataFrameConversion(schemaMixed1, false, false, true);
}
@Test
public void testVectorMixed2ConversionDenseUnknown() {
testDataFrameConversion(schemaMixed2, false, false, true);
}
@Test
public void testVectorStringsConversionDense() {
testDataFrameConversion(schemaStrings, false, false, false);
}
@Test
public void testVectorDoublesConversionDense() {
testDataFrameConversion(schemaDoubles, false, false, false);
}
@Test
public void testVectorMixed1ConversionDense() {
testDataFrameConversion(schemaMixed1, false, false, false);
}
@Test
public void testVectorMixed2ConversionDense() {
testDataFrameConversion(schemaMixed2, false, false, false);
}
@Test
public void testVectorStringsConversionSparseUnknown() {
testDataFrameConversion(schemaStrings, false, true, true);
}
@Test
public void testVectorDoublesConversionSparseUnknown() {
testDataFrameConversion(schemaDoubles, false, true, true);
}
@Test
public void testVectorMixed1ConversionSparseUnknown() {
testDataFrameConversion(schemaMixed1, false, true, true);
}
@Test
public void testVectorMixed2ConversionSparseUnknown() {
testDataFrameConversion(schemaMixed2, false, true, true);
}
@Test
public void testVectorStringsConversionSparse() {
testDataFrameConversion(schemaStrings, false, true, false);
}
@Test
public void testVectorDoublesConversionSparse() {
testDataFrameConversion(schemaDoubles, false, true, false);
}
@Test
public void testVectorMixed1ConversionSparse() {
testDataFrameConversion(schemaMixed1, false, true, false);
}
@Test
public void testVectorMixed2ConversionSparse() {
testDataFrameConversion(schemaMixed2, false, true, false);
}
private void testDataFrameConversion(ValueType[] schema, boolean containsID, boolean dense, boolean unknownDims) {
boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
ExecMode oldPlatform = DMLScript.getGlobalExecMode();
try
{
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
DMLScript.setGlobalExecMode(ExecMode.HYBRID);
//generate input data and setup metadata
int cols = schema.length + colsVector - 1;
double sparsity = dense ? sparsity1 : sparsity2;
double[][] A = TestUtils.round(getRandomMatrix(rows1, cols, -10, 1000, sparsity, 2373));
MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
int blksz = ConfigurationManager.getBlocksize();
MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, mbA.getNonZeros());
MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
//create input data frame
Dataset<Row> df = createDataFrame(spark, mbA, containsID, schema);
//dataframe - frame conversion
JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID);
//get output frame block
FrameBlock fbB = SparkExecutionContext.toFrameBlock(out,
UtilFunctions.nCopies(cols, ValueType.FP64), rows1, cols);
//compare frame blocks
MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB);
double[][] B = DataConverter.convertToDoubleMatrix(mbB);
TestUtils.compareMatrices(A, B, rows1, cols, eps);
}
catch( Exception ex ) {
throw new RuntimeException(ex);
}
finally {
DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
DMLScript.setGlobalExecMode(oldPlatform);
}
}
@SuppressWarnings("resource")
private static Dataset<Row> createDataFrame(SparkSession sparkSession, MatrixBlock mb, boolean containsID, ValueType[] schema) {
//create in-memory list of rows
List<Row> list = new ArrayList<>();
int off = (containsID ? 1 : 0);
int clen = mb.getNumColumns() + off - colsVector + 1;
for( int i=0; i<mb.getNumRows(); i++ ) {
Object[] row = new Object[clen];
if( containsID )
row[0] = (double)i+1;
for( int j=0, j2=0; j<mb.getNumColumns(); j++, j2++ ) {
if( schema[j2] != ValueType.UNKNOWN ) {
row[j2+off] = UtilFunctions
.doubleToObject(schema[j2], mb.quickGetValue(i, j));
}
else {
double[] tmp = DataConverter.convertToDoubleVector(
mb.slice(i, i, j, j+colsVector-1, new MatrixBlock()), false);
row[j2+off] = new DenseVector(tmp);
j += colsVector-1;
}
}
list.add(RowFactory.create(row));
}
//create data frame schema
List<StructField> fields = new ArrayList<>();
if( containsID )
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN,
DataTypes.DoubleType, true));
for( int j=0; j<schema.length; j++ ) {
DataType dt = null;
switch(schema[j]) {
case STRING: dt = DataTypes.StringType; break;
case FP64: dt = DataTypes.DoubleType; break;
case INT64: dt = DataTypes.LongType; break;
case UNKNOWN: dt = new VectorUDT(); break;
default: throw new RuntimeException("Unsupported value type.");
}
fields.add(DataTypes.createStructField("C"+(j+1), dt, true));
}
StructType dfSchema = DataTypes.createStructType(fields);
//create rdd and data frame
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sc.parallelize(list);
return sparkSession.createDataFrame(rowRDD, dfSchema);
}
@AfterClass
public static void tearDownClass() {
// stop underlying spark context to allow single jvm tests (otherwise the
// next test that tries to create a SparkContext would fail)
spark.stop();
sc = null;
spark = null;
}
}