blob: 071a2653ac8d049f332a3081bb015d9f01fba3e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.hops.rewrite;
import java.util.ArrayList;
import org.apache.sysml.hops.DataOp;
import org.apache.sysml.hops.Hop;
import org.apache.sysml.hops.Hop.DataOpTypes;
import org.apache.sysml.hops.HopsException;
import org.apache.sysml.hops.OptimizerUtils;
/**
* Rule: BlockSizeAndReblock. For all statement blocks, determine
* "optimal" block size, and place reblock Hops. For now, we just
* use BlockSize 1K x 1K and do reblock after Persistent Reads and
* before Persistent Writes.
*/
public class RewriteInjectSparkPReadCheckpointing extends HopRewriteRule
{
@Override
public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state)
throws HopsException
{
if( !OptimizerUtils.isSparkExecutionMode() )
return roots;
if( roots == null )
return null;
//top-level hops never modified
for( Hop h : roots )
rInjectCheckpointAfterPRead(h);
return roots;
}
@Override
public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state)
throws HopsException
{
//not applicable to predicates (we do not allow persistent reads there)
return root;
}
/**
*
* @param hop
* @throws HopsException
*/
private void rInjectCheckpointAfterPRead( Hop hop )
throws HopsException
{
if(hop.getVisited() == Hop.VisitStatus.DONE)
return;
// The reblocking is performed after transform, and hence checkpoint only non-transformed reads.
if( (hop instanceof DataOp && ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD && !HopRewriteUtils.hasTransformParents(hop))
|| (hop.requiresReblock())
)
{
//make given hop for checkpointing (w/ default storage level)
//note: we do not recursively process childs here in order to prevent unnecessary checkpoints
hop.setRequiresCheckpoint(true);
}
else
{
//process childs
if( hop.getInput() != null ) {
//process all childs (prevent concurrent modification by index access)
for( int i=0; i<hop.getInput().size(); i++ )
rInjectCheckpointAfterPRead( hop.getInput().get(i) );
}
}
hop.setVisited(Hop.VisitStatus.DONE);
}
}