blob: 3d591efee6757b43055bc6c82203e69a14cf44af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.test.functions.frame;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Test;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongWritableFrameToLongFrameFunction;
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.FrameReader;
import org.apache.sysds.runtime.io.FrameReaderFactory;
import org.apache.sysds.runtime.io.FrameWriter;
import org.apache.sysds.runtime.io.FrameWriterFactory;
import org.apache.sysds.runtime.io.InputOutputInfo;
import org.apache.sysds.runtime.io.MatrixReader;
import org.apache.sysds.runtime.io.MatrixReaderFactory;
import org.apache.sysds.runtime.io.MatrixWriter;
import org.apache.sysds.runtime.io.MatrixWriterFactory;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.CollectionUtils;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
public class FrameConverterTest extends AutomatedTestBase
{
private final static String TEST_DIR = "functions/frame/";
private final static String TEST_NAME = "FrameConv";
private final static String TEST_CLASS_DIR = TEST_DIR + FrameConverterTest.class.getSimpleName() + "/";
private final static int rows = 1593;
private final static ValueType[] schemaStrings = new ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING};
private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.FP64, ValueType.INT64, ValueType.BOOLEAN};
private final static List<ValueType> schemaMixedLargeListStr = Collections.nCopies(200, ValueType.STRING);
private final static List<ValueType> schemaMixedLargeListDble = Collections.nCopies(200, ValueType.FP64);
private final static List<ValueType> schemaMixedLargeListInt = Collections.nCopies(200, ValueType.INT64);
private final static List<ValueType> schemaMixedLargeListBool = Collections.nCopies(200, ValueType.BOOLEAN);
private static final List<ValueType> schemaMixedLargeList = CollectionUtils.asList(
schemaMixedLargeListStr, schemaMixedLargeListDble, schemaMixedLargeListInt, schemaMixedLargeListBool);
private static final ValueType[] schemaMixedLarge = schemaMixedLargeList.toArray(new ValueType[0]);
private static final List<ValueType> schemaMixedLargeListDFrame = CollectionUtils.asList(
schemaMixedLargeListStr.subList(0, 100), schemaMixedLargeListDble.subList(0, 100),
schemaMixedLargeListInt.subList(0, 100), schemaMixedLargeListBool.subList(0, 100));
private static final ValueType[] schemaMixedLargeDFrame = schemaMixedLargeListDFrame.toArray(new ValueType[0]);
//NOTE: moderate number of columns to workaround https://issues.apache.org/jira/browse/SPARK-16845
private enum ConvType {
CSV2BIN,
BIN2CSV,
TXTCELL2BIN,
BIN2TXTCELL,
MAT2BIN,
BIN2MAT,
DFRM2BIN,
BIN2DFRM,
}
private static String separator = ",";
@Override
public void setUp() {
TestUtils.clearAssertionInformation();
addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
}
@Test
public void testFrameStringsCsvBinSpark() {
runFrameConverterTest(schemaStrings, ConvType.CSV2BIN);
}
@Test
public void testFrameMixedCsvBinSpark() {
runFrameConverterTest(schemaMixed, ConvType.CSV2BIN);
}
@Test
public void testFrameStringsBinCsvSpark() {
runFrameConverterTest(schemaStrings, ConvType.BIN2CSV);
}
@Test
public void testFrameMixedBinCsvSpark() {
runFrameConverterTest(schemaMixed, ConvType.BIN2CSV);
}
@Test
public void testFrameStringsTxtCellBinSpark() {
runFrameConverterTest(schemaStrings, ConvType.TXTCELL2BIN);
}
@Test
public void testFrameMixedTxtCellBinSpark() {
runFrameConverterTest(schemaMixed, ConvType.TXTCELL2BIN);
}
@Test
public void testFrameStringsBinTxtCellSpark() {
runFrameConverterTest(schemaStrings, ConvType.BIN2TXTCELL);
}
@Test
public void testFrameMixedBinTxtCellSpark() {
runFrameConverterTest(schemaMixed, ConvType.BIN2TXTCELL);
}
@Test
public void testFrameStringsMatrixBinSpark() {
runFrameConverterTest(schemaStrings, ConvType.MAT2BIN);
}
@Test
public void testFrameMixedMatrixBinSpark() {
runFrameConverterTest(schemaMixed, ConvType.MAT2BIN);
}
@Test
public void testFrameStringsBinMatrixSpark() {
runFrameConverterTest(schemaStrings, ConvType.BIN2MAT);
}
@Test
public void testFrameMixedBinMatrixSpark() {
runFrameConverterTest(schemaMixed, ConvType.BIN2MAT);
}
@Test
public void testFrameMixedMultiColBlkMatrixBinSpark() {
runFrameConverterTest(schemaMixedLarge, ConvType.MAT2BIN);
}
@Test
public void testFrameMixedMultiColBlkBinMatrixSpark() {
runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT);
}
@Test
public void testFrameMixedDFrameBinSpark() {
runFrameConverterTest(schemaMixedLargeDFrame, ConvType.DFRM2BIN);
}
@Test
public void testFrameMixedBinDFrameSpark() {
runFrameConverterTest(schemaMixedLargeDFrame, ConvType.BIN2DFRM);
}
private void runFrameConverterTest( ValueType[] schema, ConvType type)
{
ExecMode platformOld = rtplatform;
DMLScript.setGlobalExecMode(ExecMode.SPARK);
boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
try
{
TestConfiguration config = getTestConfiguration(TEST_NAME);
loadTestConfiguration(config);
//data generation
double[][] A = getRandomMatrix(rows, schema.length, -10, 10, 0.9, 2373);
//prepare input/output infos
FileFormat oinfo = null;
FileFormat iinfo = null;
switch( type ) {
case CSV2BIN:
case DFRM2BIN:
oinfo = FileFormat.CSV;
iinfo = FileFormat.BINARY;
break;
case BIN2CSV:
oinfo = FileFormat.BINARY;
iinfo = FileFormat.CSV;
break;
case TXTCELL2BIN:
oinfo = FileFormat.TEXT;
iinfo = FileFormat.BINARY;
break;
case BIN2TXTCELL:
oinfo = FileFormat.BINARY;
iinfo = FileFormat.TEXT;
break;
case MAT2BIN:
case BIN2DFRM:
oinfo = FileFormat.BINARY;
iinfo = FileFormat.BINARY;
break;
case BIN2MAT:
oinfo = FileFormat.BINARY;
iinfo = FileFormat.BINARY;
break;
default:
throw new RuntimeException("Unsuported converter type: "+type.toString());
}
if(type == ConvType.MAT2BIN || type == ConvType.BIN2MAT)
runMatrixConverterAndVerify(schema, A, type, iinfo, oinfo);
else
runConverterAndVerify(schema, A, type, iinfo, oinfo);
}
catch(Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
finally
{
DMLScript.setGlobalExecMode(platformOld);
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
}
}
private void runConverterAndVerify( ValueType[] schema, double[][] A, ConvType type, FileFormat iinfo, FileFormat oinfo )
throws IOException
{
try
{
//initialize the frame data.
FrameBlock frame1 = new FrameBlock(schema);
initFrameData(frame1, A, schema);
//write frame data to hdfs
FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo);
writer.writeFrameToHDFS(frame1, input("A"), rows, schema.length);
//run converter under test
MatrixCharacteristics mc = new MatrixCharacteristics(rows, schema.length, -1, -1);
runConverter(type, mc, null, Arrays.asList(schema), input("A"), output("B"));
//read frame data from hdfs
FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
FrameBlock frame2 = reader.readFrameFromHDFS(output("B"), rows, schema.length);
//verify input and output frame
verifyFrameData(frame1, frame2);
}
catch(Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
finally {
HDFSTool.deleteFileIfExistOnHDFS(input("A"));
HDFSTool.deleteFileIfExistOnHDFS(output("B"));
}
}
private void runMatrixConverterAndVerify( ValueType[] schema, double[][] A, ConvType type, FileFormat iinfo, FileFormat oinfo )
throws IOException
{
try
{
MatrixCharacteristics mcMatrix = new MatrixCharacteristics(rows, schema.length, 1000, -1);
MatrixCharacteristics mcFrame = new MatrixCharacteristics(rows, schema.length, -1, -1);
MatrixBlock matrixBlock1 = null;
FrameBlock frame1 = null;
if(type == ConvType.MAT2BIN) {
//initialize the matrix (dense) data.
matrixBlock1 = new MatrixBlock(rows, schema.length, false);
matrixBlock1.init(A, rows, schema.length);
//write matrix data to hdfs
MatrixWriter matWriter = MatrixWriterFactory.createMatrixWriter(oinfo);
matWriter.writeMatrixToHDFS(matrixBlock1, input("A"), rows, schema.length,
mcMatrix.getBlocksize(), mcMatrix.getNonZeros());
}
else {
//initialize the frame data.
frame1 = new FrameBlock(schema);
initFrameData(frame1, A, schema);
//write frame data to hdfs
FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo);
writer.writeFrameToHDFS(frame1, input("A"), rows, schema.length);
}
//run converter under test
runConverter(type, mcFrame, mcMatrix, Arrays.asList(schema), input("A"), output("B"));
if(type == ConvType.MAT2BIN) {
//read frame data from hdfs
FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
FrameBlock frame2 = reader.readFrameFromHDFS(output("B"), rows, schema.length);
//verify input and output frame/matrix
verifyFrameMatrixData(frame2, matrixBlock1);
}
else {
//read matrix data from hdfs
MatrixReader matReader = MatrixReaderFactory.createMatrixReader(iinfo);
MatrixBlock matrixBlock2 = matReader.readMatrixFromHDFS(output("B"), rows, schema.length,
mcMatrix.getBlocksize(), mcMatrix.getNonZeros());
//verify input and output frame/matrix
verifyFrameMatrixData(frame1, matrixBlock2);
}
}
catch(Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
finally {
HDFSTool.deleteFileIfExistOnHDFS(input("A"));
HDFSTool.deleteFileIfExistOnHDFS(output("B"));
}
}
private static void initFrameData(FrameBlock frame, double[][] data, ValueType[] lschema) {
Object[] row1 = new Object[lschema.length];
for( int i=0; i<rows; i++ ) {
for( int j=0; j<lschema.length; j++ )
data[i][j] = UtilFunctions.objectToDouble(lschema[j],
row1[j] = UtilFunctions.doubleToObject(lschema[j], data[i][j]));
frame.appendRow(row1);
}
}
private static void verifyFrameData(FrameBlock frame1, FrameBlock frame2) {
for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<frame1.getNumColumns(); j++ ) {
String val1 = UtilFunctions.objectToString(frame1.get(i, j));
String val2 = UtilFunctions.objectToString(frame2.get(i, j));
if( UtilFunctions.compareTo(ValueType.STRING, val1, val2) != 0)
Assert.fail("The original data for cell ("+ i + "," + j + ") is " + val1 +
", not same as the converted value " + val2);
}
}
private static void verifyFrameMatrixData(FrameBlock frame, MatrixBlock matrix) {
for ( int i=0; i<frame.getNumRows(); i++ )
for( int j=0; j<frame.getNumColumns(); j++ ) {
Object val1 = UtilFunctions.doubleToObject(frame.getSchema()[j],
UtilFunctions.objectToDouble(frame.getSchema()[j], frame.get(i, j)));
Object val2 = UtilFunctions.doubleToObject(frame.getSchema()[j], matrix.getValue(i, j));
if(( UtilFunctions.compareTo(frame.getSchema()[j], val1, val2)) != 0)
Assert.fail("Frame value for cell ("+ i + "," + j + ") is " + val1 +
", is not same as matrix value " + val2);
}
}
@SuppressWarnings({ "unchecked", "resource", "cast" })
private static void runConverter(ConvType type, MatrixCharacteristics mc, MatrixCharacteristics mcMatrix,
List<ValueType> schema, String fnameIn, String fnameOut) throws IOException
{
SparkExecutionContext sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
JavaSparkContext sc = sec.getSparkContext();
ValueType[] lschema = schema.toArray(new ValueType[0]);
HDFSTool.deleteFileIfExistOnHDFS(fnameOut);
switch( type ) {
case CSV2BIN: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.CSV);
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<LongWritable,Text> rddIn = (JavaPairRDD<LongWritable,Text>) sc
.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.keyClass, iinfo.valueClass);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
.csvToBinaryBlock(sc, rddIn, mc, null, false, separator, false, 0, UtilFunctions.defaultNaString)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
}
case BIN2CSV: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new CopyFrameBlockPairFunction(false));
FileFormatPropertiesCSV fprop = new FileFormatPropertiesCSV();
JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToCsv(rddIn2, mc, fprop, true);
rddOut.saveAsTextFile(fnameOut);
break;
}
case TXTCELL2BIN: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.TEXT);
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<LongWritable,Text> rddIn = (JavaPairRDD<LongWritable,Text>)
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.keyClass, iinfo.valueClass);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
.textCellToBinaryBlock(sc, rddIn, mc, lschema)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
}
case BIN2TXTCELL: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new CopyFrameBlockPairFunction(false));
JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToTextCell(rddIn2, mc);
rddOut.saveAsTextFile(fnameOut);
break;
}
case MAT2BIN: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<MatrixIndexes,MatrixBlock> rddIn = (JavaPairRDD<MatrixIndexes,MatrixBlock>) sc
.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.keyClass, iinfo.valueClass);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils.matrixBlockToBinaryBlock(sc, rddIn, mcMatrix);
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
}
case BIN2MAT: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<Long, FrameBlock> rddIn = sc
.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
.mapToPair(new LongWritableFrameToLongFrameFunction());
JavaPairRDD<MatrixIndexes,MatrixBlock> rddOut = FrameRDDConverterUtils.binaryBlockToMatrixBlock(rddIn, mc, mcMatrix);
rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
break;
}
case DFRM2BIN: {
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
//Create DataFrame
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, lschema);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, dfSchema);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
.dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
}
case BIN2DFRM: {
InputOutputInfo iinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, FileFormat.BINARY);
JavaPairRDD<Long, FrameBlock> rddIn = sc
.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
.mapToPair(new LongWritableFrameToLongFrameFunction());
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, rddIn, mc, lschema);
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
.dataFrameToBinaryBlock(sc, df, mc, true)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
}
default:
throw new RuntimeException("Unsuported converter type: "+type.toString());
}
sec.close();
}
}