| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.instructions.spark.utils; |
| |
| import org.apache.hadoop.io.Text; |
| import org.apache.spark.SparkContext; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.api.java.function.Function; |
| import org.apache.spark.api.java.function.PairFlatMapFunction; |
| import org.apache.spark.ml.linalg.Vector; |
| import org.apache.spark.ml.linalg.VectorUDT; |
| import org.apache.spark.ml.linalg.Vectors; |
| import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; |
| import org.apache.spark.mllib.linalg.distributed.MatrixEntry; |
| import org.apache.spark.mllib.util.NumericParser; |
| import org.apache.spark.sql.Dataset; |
| import org.apache.spark.sql.Row; |
| import org.apache.spark.sql.RowFactory; |
| import org.apache.spark.sql.SparkSession; |
| import org.apache.spark.sql.types.DataTypes; |
| import org.apache.spark.sql.types.StructField; |
| import org.apache.spark.sql.types.StructType; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.instructions.spark.data.ReblockBuffer; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixCell; |
| import org.apache.sysds.runtime.matrix.data.MatrixIndexes; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.util.FastStringTokenizer; |
| import scala.Tuple2; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| |
| /** |
| * NOTE: These are experimental converter utils. Once thoroughly tested, they |
| * can be moved to RDDConverterUtils. |
| */ |
| @SuppressWarnings("unused") |
| public class RDDConverterUtilsExt |
| { |
| public enum RDDConverterTypes { |
| TEXT_TO_MATRIX_CELL, |
| MATRIXENTRY_TO_MATRIXCELL |
| } |
| |
| public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(JavaSparkContext sc, |
| CoordinateMatrix input, DataCharacteristics mcIn, boolean outputEmptyBlocks) |
| { |
| //convert matrix entry rdd to binary block rdd (w/ partial blocks) |
| JavaPairRDD<MatrixIndexes, MatrixBlock> out = input.entries().toJavaRDD() |
| .mapPartitionsToPair(new MatrixEntryToBinaryBlockFunction(mcIn)); |
| |
| //inject empty blocks (if necessary) |
| if( outputEmptyBlocks && mcIn.mightHaveEmptyBlocks() ) { |
| out = out.union( |
| SparkUtils.getEmptyBlockRDD(sc, mcIn) ); |
| } |
| |
| //aggregate partial matrix blocks |
| out = RDDAggregateUtils.mergeByKey(out, false); |
| |
| return out; |
| } |
| |
| @SuppressWarnings("resource") |
| public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(SparkContext sc, |
| CoordinateMatrix input, DataCharacteristics mcIn, boolean outputEmptyBlocks) { |
| return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), input, mcIn, true); |
| } |
| |
| public static Dataset<Row> projectColumns(Dataset<Row> df, ArrayList<String> columns) { |
| ArrayList<String> columnToSelect = new ArrayList<String>(); |
| for(int i = 1; i < columns.size(); i++) { |
| columnToSelect.add(columns.get(i)); |
| } |
| return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); |
| } |
| |
| public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) { |
| copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, (long)rlen, (long)clen); |
| } |
| public static void copyRowBlocks(MatrixBlock mb, long rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) { |
| copyRowBlocks(mb, rowIndex, ret, (long)numRowsPerBlock, (long)rlen, (long)clen); |
| } |
| public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) { |
| copyRowBlocks(mb, (long)rowIndex, ret, numRowsPerBlock, rlen, clen); |
| } |
| public static void copyRowBlocks(MatrixBlock mb, long rowIndex, MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) { |
| // TODO: Double-check if synchronization is required here. |
| // synchronized (RDDConverterUtilsExt.class) { |
| ret.copy((int)(rowIndex*numRowsPerBlock), (int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, false); |
| // } |
| } |
| |
| public static void postProcessAfterCopying(MatrixBlock ret) { |
| ret.recomputeNonZeros(); |
| ret.examSparsity(); |
| } |
| |
| public static class AddRowID implements Function<Tuple2<Row,Long>, Row> { |
| private static final long serialVersionUID = -3733816995375745659L; |
| |
| @Override |
| public Row call(Tuple2<Row, Long> arg0) throws Exception { |
| int oldNumCols = arg0._1.length(); |
| Object [] fields = new Object[oldNumCols + 1]; |
| for(int i = 0; i < oldNumCols; i++) { |
| fields[i] = arg0._1.get(i); |
| } |
| fields[oldNumCols] = new Double(arg0._2 + 1); |
| return RowFactory.create(fields); |
| } |
| |
| } |
| |
| /** |
| * Add element indices as new column to DataFrame |
| * |
| * @param df input data frame |
| * @param sparkSession the Spark Session |
| * @param nameOfCol name of index column |
| * @return new data frame |
| */ |
| public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SparkSession sparkSession, String nameOfCol) { |
| StructField[] oldSchema = df.schema().fields(); |
| StructField[] newSchema = new StructField[oldSchema.length + 1]; |
| for(int i = 0; i < oldSchema.length; i++) { |
| newSchema[i] = oldSchema[i]; |
| } |
| newSchema[oldSchema.length] = DataTypes.createStructField(nameOfCol, DataTypes.DoubleType, false); |
| // JavaRDD<Row> newRows = df.rdd().toJavaRDD().map(new AddRowID()); |
| JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID()); |
| return sparkSession.createDataFrame(newRows, new StructType(newSchema)); |
| } |
| |
| private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> |
| { |
| private static final long serialVersionUID = 4907483236186747224L; |
| |
| private IJVToBinaryBlockFunctionHelper helper = null; |
| public MatrixEntryToBinaryBlockFunction(DataCharacteristics mc) { |
| helper = new IJVToBinaryBlockFunctionHelper(mc); |
| } |
| |
| @Override |
| public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<MatrixEntry> arg0) throws Exception { |
| return helper.convertToBinaryBlock(arg0, RDDConverterTypes.MATRIXENTRY_TO_MATRIXCELL).iterator(); |
| } |
| |
| } |
| |
| private static class IJVToBinaryBlockFunctionHelper implements Serializable { |
| private static final long serialVersionUID = -7952801318564745821L; |
| //internal buffer size (aligned w/ default matrix block size) |
| private static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB) |
| private int _bufflen = -1; |
| |
| private long _rlen = -1; |
| private long _clen = -1; |
| private int _blen = -1; |
| |
| public IJVToBinaryBlockFunctionHelper(DataCharacteristics mc) { |
| if(!mc.dimsKnown()) |
| throw new DMLRuntimeException("The dimensions need to be known in given DataCharacteristics for given input RDD"); |
| _rlen = mc.getRows(); |
| _clen = mc.getCols(); |
| _blen = mc.getBlocksize(); |
| _blen = mc.getBlocksize(); |
| //determine upper bounded buffer len |
| _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE); |
| } |
| |
| // ---------------------------------------------------- |
| // Can extend this by having type hierarchy |
| public Tuple2<MatrixIndexes, MatrixCell> textToMatrixCell(Text txt) { |
| FastStringTokenizer st = new FastStringTokenizer(' '); |
| //get input string (ignore matrix market comments) |
| String strVal = txt.toString(); |
| if( strVal.startsWith("%") ) |
| return null; |
| |
| //parse input ijv triple |
| st.reset( strVal ); |
| long row = st.nextLong(); |
| long col = st.nextLong(); |
| double val = st.nextDouble(); |
| MatrixIndexes indx = new MatrixIndexes(row, col); |
| MatrixCell cell = new MatrixCell(val); |
| return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell); |
| } |
| |
| public Tuple2<MatrixIndexes, MatrixCell> matrixEntryToMatrixCell(MatrixEntry entry) { |
| MatrixIndexes indx = new MatrixIndexes(entry.i(), entry.j()); |
| MatrixCell cell = new MatrixCell(entry.value()); |
| return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell); |
| } |
| |
| // ---------------------------------------------------- |
| |
| Iterable<Tuple2<MatrixIndexes, MatrixBlock>> convertToBinaryBlock(Object arg0, RDDConverterTypes converter) throws Exception { |
| ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); |
| ReblockBuffer rbuff = new ReblockBuffer(_bufflen, _rlen, _clen, _blen); |
| |
| Iterator<?> iter = (Iterator<?>) arg0; |
| while( iter.hasNext() ) { |
| Tuple2<MatrixIndexes, MatrixCell> cell = null; |
| switch(converter) { |
| case MATRIXENTRY_TO_MATRIXCELL: |
| cell = matrixEntryToMatrixCell((MatrixEntry) iter.next()); |
| break; |
| |
| case TEXT_TO_MATRIX_CELL: |
| cell = textToMatrixCell((Text) iter.next()); |
| break; |
| |
| default: |
| throw new Exception("Invalid converter for IJV data:" + converter.toString()); |
| } |
| |
| if(cell == null) { |
| continue; |
| } |
| |
| //flush buffer if necessary |
| if( rbuff.getSize() >= rbuff.getCapacity() ) |
| flushBufferToList(rbuff, ret); |
| |
| //add value to reblock buffer |
| rbuff.appendCell(cell._1.getRowIndex(), cell._1.getColumnIndex(), cell._2.getValue()); |
| } |
| |
| //final flush buffer |
| flushBufferToList(rbuff, ret); |
| |
| return ret; |
| } |
| |
| private static void flushBufferToList( ReblockBuffer rbuff, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) |
| throws IOException, DMLRuntimeException |
| { |
| rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies |
| .map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b)); |
| } |
| } |
| |
| /** |
| * Convert a dataframe of comma-separated string rows to a dataframe of |
| * ml.linalg.Vector rows. |
| * |
| * <p> |
| * Example input rows:<br> |
| * |
| * <code> |
| * ((1.2, 4.3, 3.4))<br> |
| * (1.2, 3.4, 2.2)<br> |
| * [[1.2, 34.3, 1.2, 1.25]]<br> |
| * [1.2, 3.4]<br> |
| * </code> |
| * |
| * @param sparkSession |
| * Spark Session |
| * @param inputDF |
| * dataframe of comma-separated row strings to convert to |
| * dataframe of ml.linalg.Vector rows |
| * @return dataframe of ml.linalg.Vector rows |
| */ |
| public static Dataset<Row> stringDataFrameToVectorDataFrame(SparkSession sparkSession, Dataset<Row> inputDF) |
| { |
| StructField[] oldSchema = inputDF.schema().fields(); |
| StructField[] newSchema = new StructField[oldSchema.length]; |
| for (int i = 0; i < oldSchema.length; i++) { |
| String colName = oldSchema[i].name(); |
| newSchema[i] = DataTypes.createStructField(colName, new VectorUDT(), true); |
| } |
| |
| // converter |
| class StringToVector implements Function<Tuple2<Row, Long>, Row> { |
| private static final long serialVersionUID = -4733816995375745659L; |
| |
| @Override |
| public Row call(Tuple2<Row, Long> arg0) throws Exception { |
| Row oldRow = arg0._1; |
| int oldNumCols = oldRow.length(); |
| if (oldNumCols > 1) { |
| throw new DMLRuntimeException("The row must have at most one column"); |
| } |
| |
| // parse the various strings. i.e |
| // ((1.2, 4.3, 3.4)) or (1.2, 3.4, 2.2) |
| // [[1.2, 34.3, 1.2, 1.2]] or [1.2, 3.4] |
| Object[] fields = new Object[oldNumCols]; |
| ArrayList<Object> fieldsArr = new ArrayList<Object>(); |
| for (int i = 0; i < oldRow.length(); i++) { |
| Object ci = oldRow.get(i); |
| if (ci == null) { |
| fieldsArr.add(null); |
| } else if (ci instanceof String) { |
| String cis = (String) ci; |
| StringBuffer sb = new StringBuffer(cis.trim()); |
| for (int nid = 0; i < 2; i++) { // remove two level |
| // nesting |
| if ((sb.charAt(0) == '(' && sb.charAt(sb.length() - 1) == ')') |
| || (sb.charAt(0) == '[' && sb.charAt(sb.length() - 1) == ']')) { |
| sb.deleteCharAt(0); |
| sb.setLength(sb.length() - 1); |
| } |
| } |
| // have the replace code |
| String ncis = "[" + sb.toString().replaceAll(" *, *", ",") + "]"; |
| |
| try { |
| // ncis [ ] will always result in double array return type |
| double[] doubles = (double[]) NumericParser.parse(ncis); |
| Vector dense = Vectors.dense(doubles); |
| fieldsArr.add(dense); |
| } catch (Exception e) { // can't catch SparkException here in Java apparently |
| throw new DMLRuntimeException("Error converting to double array. " + e.getMessage(), e); |
| } |
| |
| } else { |
| throw new DMLRuntimeException("Only String is supported"); |
| } |
| } |
| Row row = RowFactory.create(fieldsArr.toArray()); |
| return row; |
| } |
| } |
| |
| // output DF |
| JavaRDD<Row> newRows = inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector()); |
| Dataset<Row> outDF = sparkSession.createDataFrame(newRows.rdd(), DataTypes.createStructType(newSchema)); |
| return outDF; |
| } |
| } |