blob: a36a4676426746d8bbcc2addbcc45c0a6212a092 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.instructions.spark;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.instructions.spark.data.LazyIterableIterator;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
public class MatrixAppendMSPInstruction extends AppendMSPInstruction {
protected MatrixAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out,
boolean cbind, String opcode, String istr) {
super(op, in1, in2, offset, out, cbind, opcode, istr);
}
@Override
public void processInstruction(ExecutionContext ec) {
// map-only append (rhs must be vector and fit in mapper mem)
SparkExecutionContext sec = (SparkExecutionContext)ec;
checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
DataCharacteristics mc1 = sec.getDataCharacteristics(input1.getName());
DataCharacteristics mc2 = sec.getDataCharacteristics(input2.getName());
int blen = mc1.getBlocksize();
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
long off = sec.getScalarInput( _offset).getLongValue();
//execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
if( preservesPartitioning(mc1, mc2, _cbind) ) {
out = in1.mapPartitionsToPair(
new MapSideAppendPartitionFunction(in2, _cbind, off, blen), true);
}
else {
out = in1.flatMapToPair(
new MapSideAppendFunction(in2, _cbind, off, blen));
}
//put output RDD handle into symbol table
updateBinaryAppendOutputDataCharacteristics(sec, _cbind);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.addLineageBroadcast(output.getName(), input2.getName());
}
private static boolean preservesPartitioning(DataCharacteristics mcIn1, DataCharacteristics mcIn2, boolean cbind )
{
//determine if append is partitioning-preserving based on number of input and output blocks
//with awareness of zero number of rows or columns
long ncblksIn1 = cbind ? mcIn1.getNumColBlocks() : mcIn1.getNumRowBlocks();
long ncblksOut = cbind ?
Math.max((long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getBlocksize()),1) :
Math.max((long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getBlocksize()),1);
//mappend is partitioning-preserving if in-block append (e.g., common case of colvector append)
return (ncblksIn1 == ncblksOut);
}
private static class MapSideAppendFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 2738541014432173450L;
private final PartitionedBroadcast<MatrixBlock> _pm;
private final boolean _cbind;
private final int _blen;
private final long _lastBlockColIndex;
public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int blen)
{
_pm = binput;
_cbind = cbind;
_blen = blen;
//check for boundary block
_lastBlockColIndex = Math.max((long)Math.ceil(
(double)offset/(cbind ? blen : blen)),1);
}
@Override
public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv)
throws Exception
{
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<>();
IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv);
MatrixIndexes ix = in1.getIndexes();
//case 1: pass through of non-boundary blocks
if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex )
{
ret.add( kv );
}
//case 2: pass through full input block and rhs block
else if( _cbind && in1.getValue().getNumColumns() == _blen
|| !_cbind && in1.getValue().getNumRows() == _blen)
{
//output lhs block
ret.add( kv );
//output shallow copy of rhs block
if( _cbind ) {
ret.add( new Tuple2<>(new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
_pm.getBlock((int)ix.getRowIndex(), 1)) );
}
else { //rbind
ret.add( new Tuple2<>(new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
_pm.getBlock(1, (int)ix.getColumnIndex())) );
}
}
//case 3: append operation on boundary block
else
{
//allocate space for the output value
ArrayList<IndexedMatrixValue> outlist=new ArrayList<>(2);
IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
outlist.add(first);
MatrixBlock value_in2 = null;
if( _cbind ) {
value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1);
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_blen) {
IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
outlist.add(second);
}
}
else { //rbind
value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex());
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_blen) {
IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
outlist.add(second);
}
}
OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _blen, _cbind, true, 0);
ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
}
return ret.iterator();
}
}
private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 5767240739761027220L;
private PartitionedBroadcast<MatrixBlock> _pm = null;
private boolean _cbind = true;
private long _lastBlockColIndex = -1;
public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int blen)
{
_pm = binput;
_cbind = cbind;
//check for boundary block
_lastBlockColIndex = Math.max((long)Math.ceil(
(double)offset/(cbind ? blen : blen)),1);
}
@Override
public LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
throws Exception
{
return new MapAppendPartitionIterator(arg0);
}
/**
* Lazy mappend iterator to prevent materialization of entire partition output in-memory.
* The implementation via mapPartitions is required to preserve partitioning information,
* which is important for performance.
*/
private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
{
public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
super(in);
}
@Override
protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
throws Exception
{
MatrixIndexes ix = arg._1();
MatrixBlock in1 = arg._2();
//case 1: pass through of non-boundary blocks
if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
return arg;
}
//case 3: append operation on boundary block
else {
int rowix = _cbind ? (int)ix.getRowIndex() : 1;
int colix = _cbind ? 1 : (int)ix.getColumnIndex();
MatrixBlock in2 = _pm.getBlock(rowix, colix);
MatrixBlock out = in1.append(in2, new MatrixBlock(), _cbind);
return new Tuple2<>(ix, out);
}
}
}
}
}