| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.instructions.spark; |
| |
| import org.apache.commons.lang.ArrayUtils; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.mapred.SequenceFileOutputFormat; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.util.LongAccumulator; |
| import org.apache.sysds.common.Types.DataType; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.ValueType; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; |
| import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; |
| import org.apache.sysds.runtime.instructions.InstructionUtils; |
| import org.apache.sysds.runtime.instructions.cp.CPOperand; |
| import org.apache.sysds.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction; |
| import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils; |
| import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; |
| import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; |
| import org.apache.sysds.runtime.io.FileFormatProperties; |
| import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; |
| import org.apache.sysds.runtime.lineage.LineageItem; |
| import org.apache.sysds.runtime.lineage.LineageItemUtils; |
| import org.apache.sysds.runtime.lineage.LineageTraceable; |
| import org.apache.sysds.runtime.matrix.data.FrameBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixIndexes; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.util.HDFSTool; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Random; |
| |
| public class WriteSPInstruction extends SPInstruction implements LineageTraceable { |
| public CPOperand input1 = null; |
| private CPOperand input2 = null; |
| private CPOperand input3 = null; |
| private CPOperand input4 = null; |
| private FileFormatProperties formatProperties; |
| |
| private WriteSPInstruction(CPOperand in1, CPOperand in2, CPOperand in3, String opcode, String str) { |
| super(SPType.Write, opcode, str); |
| input1 = in1; |
| input2 = in2; |
| input3 = in3; |
| formatProperties = null; // set in case of csv |
| } |
| |
| public static WriteSPInstruction parseInstruction ( String str ) { |
| String[] parts = InstructionUtils.getInstructionPartsWithValueType ( str ); |
| String opcode = parts[0]; |
| |
| if( !opcode.equals("write") ) { |
| throw new DMLRuntimeException("Unsupported opcode"); |
| } |
| |
| // All write instructions have 3 parameters, except in case of delimited/csv file. |
| // Write instructions for csv files also include three additional parameters (hasHeader, delimiter, sparse) |
| if ( parts.length != 5 && parts.length != 9 ) { |
| throw new DMLRuntimeException("Invalid number of operands in write instruction: " + str); |
| } |
| |
| // _mVar2·MATRIX·DOUBLE |
| CPOperand in1 = new CPOperand(parts[1]); |
| CPOperand in2 = new CPOperand(parts[2]); |
| CPOperand in3 = new CPOperand(parts[3]); |
| |
| WriteSPInstruction inst = new WriteSPInstruction(in1, in2, in3, opcode, str); |
| |
| if ( in3.getName().equalsIgnoreCase("csv") ) { |
| boolean hasHeader = Boolean.parseBoolean(parts[4]); |
| String delim = parts[5]; |
| boolean sparse = Boolean.parseBoolean(parts[6]); |
| FileFormatProperties formatProperties = new FileFormatPropertiesCSV(hasHeader, delim, sparse); |
| inst.setFormatProperties(formatProperties); |
| CPOperand in4 = new CPOperand(parts[8]); |
| inst.input4 = in4; |
| } else { |
| FileFormatProperties ffp = new FileFormatProperties(); |
| |
| CPOperand in4 = new CPOperand(parts[4]); |
| inst.input4 = in4; |
| inst.setFormatProperties(ffp); |
| } |
| return inst; |
| } |
| |
| |
| public FileFormatProperties getFormatProperties() { |
| return formatProperties; |
| } |
| |
| public void setFormatProperties(FileFormatProperties prop) { |
| formatProperties = prop; |
| } |
| |
| public CPOperand getInput1() { |
| return input1; |
| } |
| |
| public CPOperand getInput2() { |
| return input2; |
| } |
| |
| @Override |
| public void processInstruction(ExecutionContext ec) { |
| SparkExecutionContext sec = (SparkExecutionContext) ec; |
| |
| //get filename (literal or variable expression) |
| String fname = ec.getScalarInput(input2.getName(), ValueType.STRING, input2.isLiteral()).getStringValue(); |
| String desc = ec.getScalarInput(input4.getName(), ValueType.STRING, input4.isLiteral()).getStringValue(); |
| formatProperties.setDescription(desc); |
| |
| ValueType[] schema = (input1.getDataType()==DataType.FRAME) ? |
| sec.getFrameObject(input1.getName()).getSchema() : null; |
| |
| try |
| { |
| //if the file already exists on HDFS, remove it. |
| HDFSTool.deleteFileIfExistOnHDFS( fname ); |
| |
| //prepare output info according to meta data |
| FileFormat fmt = FileFormat.safeValueOf(input3.getName()); |
| |
| //core matrix/frame write |
| if( input1.getDataType()==DataType.MATRIX ) |
| processMatrixWriteInstruction(sec, fname, fmt); |
| else |
| processFrameWriteInstruction(sec, fname, fmt, schema); |
| } |
| catch(IOException ex) |
| { |
| throw new DMLRuntimeException("Failed to process write instruction", ex); |
| } |
| } |
| |
| protected void processMatrixWriteInstruction(SparkExecutionContext sec, String fname, FileFormat fmt) |
| throws IOException |
| { |
| //get input rdd |
| JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); |
| DataCharacteristics mc = sec.getDataCharacteristics(input1.getName()); |
| |
| if( fmt == FileFormat.MM || fmt == FileFormat.TEXT ) |
| { |
| //piggyback nnz maintenance on write |
| LongAccumulator aNnz = null; |
| if( !mc.nnzKnown() ) { |
| aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); |
| in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); |
| } |
| |
| JavaRDD<String> header = null; |
| if( fmt == FileFormat.MM ) { |
| ArrayList<String> headerContainer = new ArrayList<>(1); |
| // First output MM header |
| String headerStr = "%%MatrixMarket matrix coordinate real general\n" + |
| // output number of rows, number of columns and number of nnz |
| mc.getRows() + " " + mc.getCols() + " " + mc.getNonZeros(); |
| headerContainer.add(headerStr); |
| header = sec.getSparkContext().parallelize(headerContainer); |
| } |
| |
| JavaRDD<String> ijv = RDDConverterUtils.binaryBlockToTextCell(in1, mc); |
| if(header != null) |
| customSaveTextFile(header.union(ijv), fname, true); |
| else |
| customSaveTextFile(ijv, fname, false); |
| |
| if( !mc.nnzKnown() ) |
| mc.setNonZeros( aNnz.value() ); |
| } |
| else if( fmt == FileFormat.CSV ) |
| { |
| if( mc.getRows() == 0 || mc.getCols() == 0 ) { |
| throw new IOException("Write of matrices with zero rows or columns" |
| + " not supported ("+mc.getRows()+"x"+mc.getCols()+")."); |
| } |
| |
| LongAccumulator aNnz = null; |
| |
| //piggyback nnz computation on actual write |
| if( !mc.nnzKnown() ) { |
| aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); |
| in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); |
| } |
| |
| JavaRDD<String> out = RDDConverterUtils.binaryBlockToCsv( |
| in1, mc, (FileFormatPropertiesCSV) formatProperties, true); |
| |
| customSaveTextFile(out, fname, false); |
| |
| if( !mc.nnzKnown() ) |
| mc.setNonZeros(aNnz.value().longValue()); |
| } |
| else if( fmt == FileFormat.BINARY ) { |
| //piggyback nnz computation on actual write |
| LongAccumulator aNnz = null; |
| if( !mc.nnzKnown() ) { |
| aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); |
| in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); |
| } |
| |
| //save binary block rdd on hdfs |
| in1.saveAsHadoopFile(fname, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class); |
| |
| if( !mc.nnzKnown() ) |
| mc.setNonZeros(aNnz.value().longValue()); |
| } |
| else { |
| //unsupported formats: binarycell (not externalized) |
| throw new DMLRuntimeException("Unexpected data format: " + fmt.toString()); |
| } |
| |
| // write meta data file |
| HDFSTool.writeMetaDataFile (fname + ".mtd", ValueType.FP64, mc, fmt, formatProperties); |
| } |
| |
| protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, FileFormat fmt, ValueType[] schema) |
| throws IOException |
| { |
| //get input rdd |
| JavaPairRDD<Long,FrameBlock> in1 = sec |
| .getFrameBinaryBlockRDDHandleForVariable(input1.getName()); |
| DataCharacteristics mc = sec.getDataCharacteristics(input1.getName()); |
| |
| switch(fmt) { |
| case TEXT: { |
| JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToTextCell(in1, mc); |
| customSaveTextFile(out, fname, false); |
| break; |
| } |
| case CSV: { |
| FileFormatPropertiesCSV props = (formatProperties!=null) ?(FileFormatPropertiesCSV) formatProperties : null; |
| JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true); |
| customSaveTextFile(out, fname, false); |
| break; |
| } |
| case BINARY: { |
| JavaPairRDD<LongWritable,FrameBlock> out = in1.mapToPair(new LongFrameToLongWritableFrameFunction()); |
| out.saveAsHadoopFile(fname, LongWritable.class, FrameBlock.class, SequenceFileOutputFormat.class); |
| break; |
| } |
| default: |
| throw new DMLRuntimeException("Unexpected data format: " + fmt.toString()); |
| } |
| |
| // write meta data file |
| HDFSTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), schema, DataType.FRAME, mc, fmt, formatProperties); |
| } |
| |
| private static void customSaveTextFile(JavaRDD<String> rdd, String fname, boolean inSingleFile) { |
| if(inSingleFile) { |
| Random rand = new Random(); |
| String randFName = fname + "_" + rand.nextLong() + "_" + rand.nextLong(); |
| try { |
| while(HDFSTool.existsFileOnHDFS(randFName)) { |
| randFName = fname + "_" + rand.nextLong() + "_" + rand.nextLong(); |
| } |
| |
| rdd.saveAsTextFile(randFName); |
| HDFSTool.mergeIntoSingleFile(randFName, fname); // Faster version :) |
| |
| // rdd.coalesce(1, true).saveAsTextFile(randFName); |
| // MapReduceTool.copyFileOnHDFS(randFName + "/part-00000", fname); |
| } catch (IOException e) { |
| throw new DMLRuntimeException("Cannot merge the output into single file: " + e.getMessage()); |
| } |
| finally { |
| try { |
| // This is to make sure that we donot create random files on HDFS |
| HDFSTool.deleteFileIfExistOnHDFS( randFName ); |
| } catch (IOException e) { |
| throw new DMLRuntimeException("Cannot merge the output into single file: " + e.getMessage()); |
| } |
| } |
| } |
| else { |
| rdd.saveAsTextFile(fname); |
| } |
| } |
| |
| @Override |
| public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) { |
| LineageItem[] ret = LineageItemUtils.getLineage(ec, input1, input2, input3, input4); |
| if (formatProperties != null && formatProperties.getDescription() != null && !formatProperties.getDescription().isEmpty()) |
| ret = (LineageItem[])ArrayUtils.add(ret, new LineageItem(formatProperties.getDescription())); |
| return Pair.of(input1.getName(), new LineageItem(getOpcode(), ret)); |
| } |
| } |