blob: fef99d7dd23781e6eda90e7edc301e3ef7f82619 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor;
import java.util.LinkedList;
import java.util.List;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.parfor.Task.TaskType;
import org.apache.sysds.runtime.instructions.cp.IntObject;
/**
* This factoring task partitioner virtually iterates over the given FOR loop (from, to, incr),
* creates iterations and group them to tasks. Note that the task size is used here.
* The tasks are created with decreasing size for good load balance of heterogeneous tasks.
*
*
* See the original paper for details:
* [Susan Flynn Hummel, Edith Schonberg, Lawrence E. Flynn:
* Factoring: a practical and robust method for scheduling parallel loops.
* SC 1991: 610-632]
*
*/
public class TaskPartitionerFactoring extends TaskPartitioner
{
private int _numThreads = -1;
public TaskPartitionerFactoring( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
{
super(taskSize, iterVarName, fromVal, toVal, incrVal);
_numThreads = numThreads;
}
@Override
public List<Task> createTasks()
{
LinkedList<Task> tasks = new LinkedList<>();
long lFrom = _fromVal.getLongValue();
long lTo = _toVal.getLongValue();
long lIncr = _incrVal.getLongValue();
int P = _numThreads; // number of parallel workers
long N = _numIter; // total number of iterations
long R = N; // remaining number of iterations
long K = -1; // next _numThreads task sizes
TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
for( long i = lFrom; i<=lTo; )
{
K = determineNextBatchSize(R, P);
R -= (K * P);
type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ?
TaskType.RANGE : TaskType.SET;
//for each logical processor
for( int j=0; j<P; j++ )
{
if( i > lTo ) //no more iterations
break;
//create new task and add to list of tasks
Task lTask = new Task(_iterVarName, type);
tasks.addLast(lTask);
// add iterations to task
if( type == TaskType.SET ) {
//value based tasks
for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
lTask.addIteration(new IntObject(i));
}
else {
//determine end of task
long to = Math.min( i+(K-1)*lIncr, lTo );
//range based tasks
lTask.addIteration(new IntObject(i)); //from
lTask.addIteration(new IntObject(to)); //to
lTask.addIteration(new IntObject(lIncr)); //increment
i = to + lIncr;
}
}
}
return tasks;
}
@Override
public long createTasks(LocalTaskQueue<Task> queue)
{
long numCreatedTasks = 0;
long lFrom = _fromVal.getLongValue();
long lTo = _toVal.getLongValue();
long lIncr = _incrVal.getLongValue();
int P = _numThreads; // number of parallel workers
long N = _numIter; // total number of iterations
long R = N; // remaining number of iterations
long K = -1; //next _numThreads task sizes
TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
try
{
for( long i = lFrom; i<=lTo; )
{
K = determineNextBatchSize(R, P);
R -= (K * P);
type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ?
TaskType.RANGE : TaskType.SET;
//for each logical processor
for( int j=0; j<P; j++ )
{
if( i > lTo ) //no more iterations
break;
//create new task and add to list of tasks
Task lTask = new Task(_iterVarName, type);
// add iterations to task
if( type == TaskType.SET )
{
//value based tasks
for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
lTask.addIteration(new IntObject(i));
}
else
{
//determine end of task
long to = Math.min( i+(K-1)*lIncr, lTo );
//range based tasks
lTask.addIteration(new IntObject(i)); //from
lTask.addIteration(new IntObject(to)); //to
lTask.addIteration(new IntObject(lIncr)); //increment
i = to + lIncr;
}
//add task to queue (after all iteration added for preventing raise conditions)
queue.enqueueTask( lTask );
numCreatedTasks++;
}
}
// mark end of task input stream
queue.closeInput();
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
return numCreatedTasks;
}
/**
* Computes the task size (number of iterations per task) for the next numThreads tasks
* given the number of remaining iterations R, and the number of Threads.
*
* NOTE: x can be set to different values, but the original paper argues for x=2.
*
* @param R ?
* @param P ?
* @return next batch task size
*/
protected long determineNextBatchSize(long R, int P)
{
int x = 2;
long K = (long) Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
if( K < 1 ) //account for rounding errors
K = 1;
return K;
}
}