blob: 6ba8ae1d854c1e51d6810e737883d14c31e40961 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.hops.rewrite;
import java.util.ArrayList;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.hops.DataOp;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.OptimizerUtils;
/**
* Rule: BlockSizeAndReblock. For all statement blocks, determine
* "optimal" block size, and place reblock Hops. For now, we just
* use BlockSize 1K x 1K and do reblock after Persistent Reads and
* before Persistent Writes.
*/
public class RewriteInjectSparkPReadCheckpointing extends HopRewriteRule
{
@Override
public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state) {
if( !OptimizerUtils.isSparkExecutionMode() )
return roots;
if( roots == null )
return null;
//top-level hops never modified
for( Hop h : roots )
rInjectCheckpointAfterPRead(h);
return roots;
}
@Override
public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) {
//not applicable to predicates (we do not allow persistent reads there)
return root;
}
private void rInjectCheckpointAfterPRead( Hop hop )
{
if(hop.isVisited())
return;
// Inject checkpoints after persistent reads (for binary matrices only), or
// after reblocks that cause expensive shuffling. However, carefully avoid
// unnecessary frame checkpoints (e.g., binary data or csv that do not cause
// shuffle) in order to prevent excessive garbage collection due to possibly
// many small string objects. An alternative would be serialized caching.
boolean isMatrix = hop.getDataType().isMatrix();
boolean isPRead = hop instanceof DataOp && ((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD;
boolean isFrameException = hop.getDataType().isFrame() && isPRead && !((DataOp)hop).getFileFormat().isIJV();
if( (isMatrix && isPRead) || (hop.requiresReblock() && !isFrameException) ) {
//make given hop for checkpointing (w/ default storage level)
//note: we do not recursively process childs here in order to prevent unnecessary checkpoints
hop.setRequiresCheckpoint(true);
}
else {
if( hop.getInput() != null ) {
//process all childs (prevent concurrent modification by index access)
for( int i=0; i<hop.getInput().size(); i++ )
rInjectCheckpointAfterPRead( hop.getInput().get(i) );
}
}
hop.setVisited();
}
}