blob: ccf61c8fa78c3381d3af5151b22282c3c66b5e88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.test.functions.mlcontext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
public class DataFrameMatrixConversionTest extends AutomatedTestBase
{
private final static String TEST_DIR = "functions/mlcontext/";
private final static String TEST_NAME = "DataFrameConversion";
private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/";
private final static int rows1 = 2245;
private final static int rows3 = 7;
private final static int cols1 = 745;
private final static int cols2 = 1264;
private final static int cols3 = 10038;
private final static double sparsity1 = 0.9;
private final static double sparsity2 = 0.1;
private final static double eps=0.0000000001;
private static SparkSession spark;
private static JavaSparkContext sc;
@BeforeClass
public static void setUpClass() {
spark = createSystemDSSparkSession("DataFrameMatrixConversionTest", "local");
sc = new JavaSparkContext(spark.sparkContext());
LazyWriteBuffer.init();
}
@Override
public void setUp() {
addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
}
@Test
public void testVectorConversionSingleDense() {
testDataFrameConversion(true, cols1, true, false);
}
@Test
public void testVectorConversionSingleDenseUnknown() {
testDataFrameConversion(true, cols1, true, true);
}
@Test
public void testVectorConversionSingleSparse() {
testDataFrameConversion(true, cols1, false, false);
}
@Test
public void testVectorConversionSingleSparseUnknown() {
testDataFrameConversion(true, cols1, false, true);
}
@Test
public void testVectorConversionMultiDense() {
testDataFrameConversion(true, cols2, true, false);
}
@Test
public void testVectorConversionMultiDenseUnknown() {
testDataFrameConversion(true, cols2, true, true);
}
@Test
public void testVectorConversionMultiSparse() {
testDataFrameConversion(true, cols2, false, false);
}
@Test
public void testVectorConversionMultiSparseUnknown() {
testDataFrameConversion(true, cols2, false, true);
}
@Test
public void testRowConversionSingleDense() {
testDataFrameConversion(false, cols1, true, false);
}
@Test
public void testRowConversionSingleDenseUnknown() {
testDataFrameConversion(false, cols1, true, true);
}
@Test
public void testRowConversionSingleSparse() {
testDataFrameConversion(false, cols1, false, false);
}
@Test
public void testRowConversionSingleSparseUnknown() {
testDataFrameConversion(false, cols1, false, true);
}
@Test
public void testRowConversionMultiDense() {
testDataFrameConversion(false, cols2, true, false);
}
@Test
public void testRowConversionMultiDenseUnknown() {
testDataFrameConversion(false, cols2, true, true);
}
@Test
public void testRowConversionMultiSparse() {
testDataFrameConversion(false, cols2, false, false);
}
@Test
public void testRowConversionMultiSparseUnknown() {
testDataFrameConversion(false, cols2, false, true);
}
@Test
public void testVectorConversionWideDense() {
testDataFrameConversion(true, cols3, true, false);
}
@Test
public void testVectorConversionWideDenseUnknown() {
testDataFrameConversion(true, cols3, true, true);
}
@Test
public void testVectorConversionWideSparse() {
testDataFrameConversion(true, cols3, false, false);
}
@Test
public void testVectorConversionWideSparseUnknown() {
testDataFrameConversion(true, cols3, false, true);
}
@Test
public void testVectorConversionMultiUltraSparse() {
testDataFrameConversionUltraSparse(true, false);
}
@Test
public void testVectorConversionMultiUltraSparseUnknown() {
testDataFrameConversionUltraSparse(true, true);
}
@Test
public void testRowConversionMultiUltraSparse() {
testDataFrameConversionUltraSparse(false, false);
}
@Test
public void testRowConversionMultiUltraSparseUnknown() {
testDataFrameConversionUltraSparse(false, true);
}
private void testDataFrameConversion(boolean vector, int cols, boolean dense, boolean unknownDims) {
boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
ExecMode oldPlatform = DMLScript.getGlobalExecMode();
try
{
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
DMLScript.setGlobalExecMode(ExecMode.HYBRID);
//generate input data and setup metadata
int rows = (cols == cols3) ? rows3 : rows1;
double sparsity = dense ? sparsity1 : sparsity2;
double[][] A = getRandomMatrix(rows, cols, -10, 10, sparsity, 2373);
MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
int blksz = ConfigurationManager.getBlocksize();
MatrixCharacteristics mc1 = new MatrixCharacteristics(rows, cols, blksz, mbA.getNonZeros());
MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
//get binary block input rdd
JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz);
//matrix - dataframe - matrix conversion
Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(spark, in, mc1, vector);
df = ( rows==rows3 ) ? df.repartition(rows) : df;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
//get output matrix block
MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows, cols, blksz, -1);
//compare matrix blocks
double[][] B = DataConverter.convertToDoubleMatrix(mbB);
TestUtils.compareMatrices(A, B, rows, cols, eps);
}
catch( Exception ex ) {
throw new RuntimeException(ex);
}
finally {
DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
DMLScript.setGlobalExecMode(oldPlatform);
}
}
private void testDataFrameConversionUltraSparse(boolean vector, boolean unknownDims) {
boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
ExecMode oldPlatform = DMLScript.getGlobalExecMode();
try
{
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
DMLScript.setGlobalExecMode(ExecMode.HYBRID);
//generate input data and setup metadata
double[][] A = getRandomMatrix(rows1, 1, -10, 10, 0.7, 2373);
MatrixBlock mbA0 = DataConverter.convertToMatrixBlock(A);
MatrixBlock mbA = LibMatrixReorg.diag(mbA0, new MatrixBlock(rows1, rows1, true));
int blksz = ConfigurationManager.getBlocksize();
MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, rows1, blksz, mbA.getNonZeros());
MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
//get binary block input rdd
JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz);
//matrix - dataframe - matrix conversion
Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(spark, in, mc1, vector);
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
//get output matrix block
MatrixBlock mbB0 = SparkExecutionContext.toMatrixBlock(out, rows1, rows1, blksz, -1);
MatrixBlock mbB = LibMatrixReorg.diag(mbB0, new MatrixBlock(rows1, 1, false));
//compare matrix blocks
double[][] B = DataConverter.convertToDoubleMatrix(mbB);
TestUtils.compareMatrices(A, B, rows1, 1, eps);
}
catch( Exception ex ) {
throw new RuntimeException(ex);
}
finally {
DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
DMLScript.setGlobalExecMode(oldPlatform);
}
}
@AfterClass
public static void tearDownClass() {
// stop underlying spark context to allow single jvm tests (otherwise the
// next test that tries to create a SparkContext would fail)
spark.stop();
sc = null;
spark = null;
LazyWriteBuffer.cleanup();
}
}