blob: 0978a5ab8d853bdc7604c4e33681eae152d1e926 [file] [log] [blame]
package org.qcri.rheem.apps.crocopr
import org.qcri.rheem.core.function.ExecutionContext
import org.qcri.rheem.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.qcri.rheem.core.util.RheemCollections
/**
* Creates initial page ranks.
*/
class CreateInitialPageRanks(numPagesBroadcast: String)
extends ExtendedSerializableFunction[(Long, Iterable[Long]), (Long, Double)] {
private var initialRank: Double = _
override def open(executionCtx: ExecutionContext): Unit = {
val numPages = RheemCollections.getSingle(executionCtx.getBroadcast[Long](numPagesBroadcast))
this.initialRank = 1d / numPages
}
override def apply(in: (Long, Iterable[Long])) = (in._1, this.initialRank)
}