blob: cc24919064a4d537aa7a7b9b9980eaefc9ff2dbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.hops.rewrite;
import java.util.ArrayList;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.DataOp;
import org.apache.sysds.hops.FunctionOp;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.HopsException;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.common.Types.DataType;
/**
* Rule: BlockSizeAndReblock. For all statement blocks, determine
* "optimal" block size, and place reblock Hops. For now, we just
* use BlockSize 1K x 1K and do reblock after Persistent Reads and
* before Persistent Writes.
*/
public class RewriteBlockSizeAndReblock extends HopRewriteRule
{
@Override
public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state)
{
if( roots == null )
return null;
//maintain rewrite status
if( isReblockValid() )
state.setBlocksize(ConfigurationManager.getBlocksize());
//perform reblock and blocksize rewrite
for( Hop h : roots )
rule_BlockSizeAndReblock(h, ConfigurationManager.getBlocksize());
return roots;
}
@Override
public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state)
{
if( root == null )
return null;
//maintain rewrite status
if( isReblockValid() )
state.setBlocksize(ConfigurationManager.getBlocksize());
//perform reblock and blocksize rewrite
rule_BlockSizeAndReblock(root, ConfigurationManager.getBlocksize());
return root;
}
private void rule_BlockSizeAndReblock(Hop hop, final int blocksize)
{
// Go to the source(s) of the DAG
for (Hop hi : hop.getInput()) {
if (!hi.isVisited())
rule_BlockSizeAndReblock(hi, blocksize);
}
boolean canReblock = isReblockValid();
if (hop instanceof DataOp)
{
DataOp dop = (DataOp) hop;
// if block size does not match
if( canReblock &&
( (dop.getDataType() == DataType.MATRIX && (dop.getBlocksize() != blocksize))
||(dop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode() && (dop.getInputFormatType()==FileFormat.TEXT
|| dop.getInputFormatType()==FileFormat.CSV))) )
{
if( dop.getOp() == OpOpData.PERSISTENTREAD)
{
// insert reblock after the hop
dop.setRequiresReblock(true);
dop.setBlocksize(blocksize);
}
else if( dop.getOp() == OpOpData.PERSISTENTWRITE )
{
if (dop.getBlocksize() == -1)
{
// if this dataop is for cell output, then no reblock is needed
// as (A) all jobtypes can produce block2cell and cell2cell and
// (B) we don't generate an explicit instruction for it (the info
// is conveyed through OutputInfo.
}
else if (dop.getInput().get(0).requiresReblock() && dop.getInput().get(0).getParent().size() == 1)
{
// if a reblock is feeding into this, then use it if this is
// the only parent, otherwise new Reblock
dop.getInput().get(0).setBlocksize(dop.getBlocksize());
}
else
{
// insert reblock after the hop
dop.setRequiresReblock(true);
dop.setBlocksize(blocksize);
}
}
else if (dop.getOp().isTransient()) {
if ( DMLScript.getGlobalExecMode() == ExecMode.SINGLE_NODE ) {
// simply copy the values from its input
dop.setBlocksize(hop.getInput().get(0).getBlocksize());
}
else {
// by default, all transient reads and writes are in blocked format
dop.setBlocksize(blocksize);
}
}
else if (dop.getOp() == OpOpData.FEDERATED) {
dop.setBlocksize(blocksize);
}
else {
throw new HopsException(hop.printErrorLocation() + "unexpected non-scalar Data HOP in reblock.\n");
}
}
}
else //NO DATAOP
{
// TODO: following two lines are commented, and the subsequent hack is used instead!
//set_rows_per_block(GLOBAL_BLOCKSIZE);
//set_cols_per_block(GLOBAL_BLOCKSIZE);
/*
* Handle hops whose output dimensions are unknown!
*
* Constraint C1:
* Currently, only ctable() and groupedAggregate() fall into this category.
* The MR jobs for both these functions run in "cell" mode and hence make their
* blocking dimensions to (-1,-1).
*
* Constraint C2:
* Blocking dimensions are not applicable for hops that produce scalars.
* CMCOV and GroupedAgg jobs always run in "cell" mode, and hence they
* produce output in cell format.
*
* Constraint C3:
* Remaining hops will get their blocking dimensions from their input hops.
*/
if ( hop.requiresReblock() ) {
hop.setBlocksize(blocksize);
}
// Constraint C1:
// Constraint C2:
else if ( hop.getDataType() == DataType.SCALAR ) {
hop.setBlocksize(-1);
}
// Constraint C3:
else {
if ( !canReblock ) {
hop.setBlocksize(-1);
}
else {
hop.setBlocksize(blocksize);
// Functions may return multiple outputs, as defined in array of outputs in FunctionOp.
// Reblock properties need to be set for each output.
if ( hop instanceof FunctionOp ) {
FunctionOp fop = (FunctionOp) hop;
if ( fop.getOutputs() != null) {
for(Hop out : fop.getOutputs()) {
out.setBlocksize(blocksize);
}
}
}
}
// if any input is not blocked then the output of current Hop should not be blocked
for ( Hop h : hop.getInput() ) {
if ( h.getDataType() == DataType.MATRIX && h.getBlocksize() == -1) {
hop.setBlocksize(-1);
break;
}
}
}
}
hop.setVisited();
}
private static boolean isReblockValid() {
return ( DMLScript.getGlobalExecMode() != ExecMode.SINGLE_NODE);
}
}