blob: cdec954624f5257e0399eeb308e6ce8af7abb444 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.mahout.math
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
import scala.reflect.ClassTag
import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
import collection._
package object drm {
/** Drm row-wise tuple */
type DrmTuple[K] = (K, Vector)
/** Drm block-wise tuple: Array of row keys and the matrix block. */
type BlockifiedDrmTuple[K] = (Array[K], _ <: Matrix)
/** Block-map func */
type BlockMapFunc[S, R] = BlockifiedDrmTuple[S]BlockifiedDrmTuple[R]
type BlockMapFunc2[S] = BlockifiedDrmTuple[S]Matrix
type BlockReduceFunc = (Matrix, Matrix)Matrix
/** CacheHint type */
// type CacheHint = CacheHint.CacheHint
def safeToNonNegInt(x: Long): Int = {
assert(x == x << -31 >>> -31, "transformation from long to Int is losing significant bits, or is a negative number")
/** Broadcast support API */
def drmBroadcast(m:Matrix)(implicit ctx:DistributedContext):BCast[Matrix] = ctx.drmBroadcast(m)
/** Broadcast support API */
def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
/** Load DRM from hdfs (as in Mahout DRM format) */
def drmDfsRead (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmDfsRead(path)
/** Shortcut to parallelizing matrices with indices, ignore row labels. */
def drmParallelize(m: Matrix, numPartitions: Int = 1)
(implicit sc: DistributedContext): CheckpointedDrm[Int] = drmParallelizeWithRowIndices(m, numPartitions)(sc)
/** Parallelize in-core matrix as a distributed matrix, using row ordinal indices as data set keys. */
def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
(implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeWithRowIndices(m, numPartitions)
/** Parallelize in-core matrix as a distributed matrix, using row labels as a data set keys. */
def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
(implicit ctx: DistributedContext): CheckpointedDrm[String] = ctx.drmParallelizeWithRowLabels(m, numPartitions)
/** This creates an empty DRM with specified number of partitions and cardinality. */
def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
(implicit ctx: DistributedContext): CheckpointedDrm[Int] = ctx.drmParallelizeEmpty(nrow, ncol, numPartitions)
/** Creates empty DRM with non-trivial height */
def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
(implicit ctx: DistributedContext): CheckpointedDrm[Long] = ctx.drmParallelizeEmptyLong(nrow, ncol, numPartitions)
/** Implicit broadcast -> value conversion. */
implicit def bcast2val[T](bcast: BCast[T]): T = bcast.value
/** Just throw all engine operations into context as well. */
implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine
implicit def drm2drmCpOps[K](drm: CheckpointedDrm[K]): CheckpointedOps[K] =
new CheckpointedOps[K](drm)
* We assume that whenever computational action is invoked without explicit checkpoint, the user
* doesn't imply caching
implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE)
/** Implicit conversion to in-core with NONE caching of the result. */
implicit def drm2InCore[K](drm: DrmLike[K]): Matrix = drm.collect
/** Do vertical concatenation of collection of blockified tuples */
private[mahout] def rbind[K:ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
if (blocks.size == 1) {
// No coalescing required.
} else {
// compute total number of rows in a new block
val m =
val n = blocks.head._2.ncol
val coalescedBlock =, n)
val coalescedKeys = new Array[K](m)
var row = 0
for (elem <- blocks.view) {
val block = elem._2
val rowEnd = row + block.nrow
coalescedBlock(row until rowEnd, ::) := block
elem._1.copyToArray(coalescedKeys, row)
row = rowEnd
coalescedKeys -> coalescedBlock
* Convert arbitrarily-keyed matrix to int-keyed matrix. Some algebra will accept only int-numbered
* row matrices. So this method is to help.
* @param drmX input to be transcoded
* @param computeMap collect `old key -> int key` map to front-end?
* @tparam K key type
* @return Sequentially keyed matrix + (optionally) map from non-int key to [[Int]] key. If the
* key type is actually Int, then we just return the argument with None for the map,
* regardless of computeMap parameter.
def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) =
drmX.context.engine.drm2IntKeyed(drmX, computeMap)
* (Optional) Sampling operation. Consistent with Spark semantics of the same.
* @param drmX
* @param fraction
* @param replacement
* @tparam K
* @return samples
def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] =
drmX.context.engine.drmSampleRows(drmX, fraction, replacement)
def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix =
drmX.context.engine.drmSampleKRows(drmX, numSamples, replacement)
* Convert a DRM sample into a Tab Separated Vector (TSV) to be loaded into an R-DataFrame
* for plotting and sketching
* @param drmX - DRM
* @param samplePercent - Percentage of Sample elements from the DRM to be fished out for plotting
* @tparam K
* @return TSV String
def drmSampleToTSV[K](drmX: DrmLike[K], samplePercent: Double = 1): String = {
val drmSize = drmX.checkpoint().numRows()
val sampleRatio: Double = 1.0 * samplePercent / 100
val numSamples: Int = (drmSize * sampleRatio).toInt
val plotMatrix = drmSampleKRows(drmX, numSamples, replacement = false)
// Plot Matrix rows
val matrixRows = plotMatrix.numRows()
val matrixCols = plotMatrix.numCols()
// Convert the Plot Matrix Rows to TSV
var str = ""
for (i <- 0 until matrixRows) {
for (j <- 0 until matrixCols) {
str += plotMatrix(i, j)
if (j <= matrixCols - 2) {
str += '\t'
str += '\n'
// Elementwise unary functions on distributed operands.
def dexp[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true)
def dlog[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true)
def dabs[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs)
def dsqrt[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt)
def dsignum[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum)
// Misc. math utilities.
* Compute column wise means and variances -- distributed version.
* @param drmA Note: will pin input to cache if not yet pinned.
* @tparam K
* @return colMeans → colVariances
def dcolMeanVars[K](drmA: DrmLike[K]): (Vector, Vector) = {
import RLikeDrmOps._
val drmAcp = drmA.checkpoint()
val mu = drmAcp colMeans
// Compute variance using mean(x^2) - mean(x)^2
val variances = (drmAcp ^ 2 colMeans) -=: mu * mu
mu → variances
* Compute column wise means and standard deviations -- distributed version.
* @param drmA note: input will be pinned to cache if not yet pinned
* @return colMeans → colStdevs
def dcolMeanStdevs[K](drmA: DrmLike[K]): (Vector, Vector) = {
val (mu, vars) = dcolMeanVars(drmA)
mu → (vars ::= math.sqrt _)
* Thin column-wise mean and covariance matrix computation. Same as [[dcolMeanCov()]] but suited for
* thin and tall inputs where covariance matrix can be reduced and finalized in driver memory.
* @param drmA note: will pin input to cache if not yet pinned.
* @return mean → covariance matrix (in core)
def dcolMeanCovThin[K: ClassTag](drmA: DrmLike[K]):(Vector, Matrix) = {
import RLikeDrmOps._
val drmAcp = drmA.checkpoint()
val mu = drmAcp colMeans
val mxCov = (drmAcp.t %*% drmAcp).collect /= drmAcp.nrow -= (mu cross mu)
mu → mxCov
* Compute COV(X) matrix and mean of row-wise data set. X is presented as row-wise input matrix A.
* This is a "wide" procedure, covariance matrix is returned as a DRM.
* @param drmA note: will pin input into cache if not yet pinned.
* @return mean → covariance DRM
def dcolMeanCov[K: ClassTag](drmA: DrmLike[K]): (Vector, DrmLike[Int]) = {
import RLikeDrmOps._
implicit val ctx = drmA.context
val drmAcp = drmA.checkpoint()
val bcastMu = drmBroadcast(drmAcp colMeans)
// We use multivaraite analogue COV(X)=E(XX')-mu*mu'. In our case E(XX') = (A'A)/A.nrow.
// Compute E(XX')
val drmSigma = (drmAcp.t %*% drmAcp / drmAcp.nrow)
// Subtract mu*mu'. In this case we assume mu*mu' may still be big enough to be treated by
// driver alone, so we redistribute this operation as well. Hence it may look a bit cryptic.
.mapBlock() { case (keys, block)
// Pin mu as vector reference to memory.
val mu:Vector = bcastMu
keys → (block := { (r, c, v) ⇒ v - mu(keys(r)) * mu(c) })
// return (mu, cov(X) ("bigSigma")).
(bcastMu: Vector) → drmSigma
/** Distributed Squared distance matrix computation. */
def dsqDist(drmX: DrmLike[Int]): DrmLike[Int] = {
// This is a specific case of pairwise distances of X and Y.
import RLikeDrmOps._
// Context needed
implicit val ctx = drmX.context
// Pin to cache if hasn't been pinned yet
val drmXcp = drmX.checkpoint()
// Compute column sum of squares
val s = drmXcp ^ 2 rowSums
val sBcast = drmBroadcast(s)
(drmXcp %*% drmXcp.t)
// Apply second part of the formula as per in-core algorithm
.mapBlock() { case (keys, block)
// Slurp broadcast to memory
val s = sBcast: Vector
// Update in-place
block := { (r, c, x) ⇒ s(keys(r)) + s(c) - 2 * x}
keys → block
* Compute fold-in distances (distributed version). Here, we use pretty much the same math as with
* squared distances.
* D_sq = s*1' + 1*t' - 2*X*Y'
* where s is row sums of hadamard product(X, X), and, similarly,
* s is row sums of Hadamard product(Y, Y).
* @param drmX m x d row-wise dataset. Pinned to cache if not yet pinned.
* @param drmY n x d row-wise dataset. Pinned to cache if not yet pinned.
* @return m x d pairwise squared distance matrix (between rows of X and Y)
def dsqDist(drmX: DrmLike[Int], drmY: DrmLike[Int]): DrmLike[Int] = {
import RLikeDrmOps._
implicit val ctx = drmX.context
val drmXcp = drmX.checkpoint()
val drmYcp = drmY.checkpoint()
val sBcast = drmBroadcast(drmXcp ^ 2 rowSums)
val tBcast = drmBroadcast(drmYcp ^ 2 rowSums)
(drmX %*% drmY.t)
// Apply the rest of the formula
.mapBlock() { case (keys, block) =>
// Cache broadcast representations in local task variable
val s = sBcast: Vector
val t = tBcast: Vector
block := { (r, c, x) => s(keys(r)) + t(c) - 2 * x}
keys → block
package object indexeddataset {
/** Load IndexedDataset from text delimited files */
def indexedDatasetDFSRead(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
def indexedDatasetDFSReadElements(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)