blob: 7407bfd5532d188007617edfbbfa8da1fd9b3db4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.nemo.examples.beam;
import com.github.fommil.netlib.BLAS;
import com.github.fommil.netlib.LAPACK;
import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.commons.lang.ArrayUtils;
import org.netlib.util.intW;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
* Sample Alternating Least Square application.
public final class AlternatingLeastSquare {
private static final Logger LOG = LoggerFactory.getLogger(AlternatingLeastSquare.class.getName());
* Private constructor.
private AlternatingLeastSquare() {
* Method for parsing the input line.
public static final class ParseLine extends DoFn<String, KV<Integer, KV<int[], float[]>>> {
private final boolean isUserData;
* Constructor for Parseline DoFn class.
* @param isUserData flag that distinguishes user data from item data.
public ParseLine(final boolean isUserData) {
this.isUserData = isUserData;
* ProcessElement method for BEAM.
* @param c Process context.
* @throws Exception Exception on the way.
public void processElement(final ProcessContext c) throws Exception {
final String text = c.element().trim();
if (text.startsWith("#") || text.length() == 0) {
// comments and empty lines
final String[] split = text.split("\\s+|:");
final int userId = Integer.parseInt(split[0]);
final int itemId = Integer.parseInt(split[1]);
final float rating = Float.parseFloat(split[2]);
final int[] userAry = new int[1];
userAry[0] = userId;
final int[] itemAry = new int[1];
itemAry[0] = itemId;
final float[] ratingAry = new float[1];
ratingAry[0] = rating;
if (isUserData) {
c.output(KV.of(userId, KV.of(itemAry, ratingAry)));
} else {
c.output(KV.of(itemId, KV.of(userAry, ratingAry)));
* A DoFn that relays a single vector list.
public static final class UngroupSingleVectorList
extends DoFn<KV<Integer, Iterable<float[]>>, KV<Integer, float[]>> {
* ProcessElement method for BEAM.
* @param c ProcessContext.
* @throws Exception Exception on the way.
public void processElement(final ProcessContext c) throws Exception {
final KV<Integer, Iterable<float[]>> element = c.element();
final Iterator<float[]> floatIterator = element.getValue().iterator();
final float[] floatList =;
if (floatIterator.hasNext()) {
throw new RuntimeException("Only a single vector list is expected");
// Output the ungrouped single vector list
c.output(KV.of(element.getKey(), floatList));
* Combiner for the training data.
public static final class TrainingDataCombiner
extends Combine.CombineFn<KV<int[], float[]>, List<KV<int[], float[]>>, KV<int[], float[]>> {
public List<KV<int[], float[]>> createAccumulator() {
return new LinkedList<>();
public List<KV<int[], float[]>> addInput(final List<KV<int[], float[]>> accumulator,
final KV<int[], float[]> value) {
return accumulator;
public List<KV<int[], float[]>> mergeAccumulators(final Iterable<List<KV<int[], float[]>>> accumulators) {
final List<KV<int[], float[]>> merged = new LinkedList<>();
for (final List<KV<int[], float[]>> acc : accumulators) {
return merged;
public KV<int[], float[]> extractOutput(final List<KV<int[], float[]>> accumulator) {
int dimension = 0;
for (final KV<int[], float[]> kv : accumulator) {
dimension += kv.getKey().length;
final int[] intArr = new int[dimension];
final float[] floatArr = new float[dimension];
int itr = 0;
for (final KV<int[], float[]> kv : accumulator) {
final int[] ints = kv.getKey();
final float[] floats = kv.getValue();
for (int i = 0; i < ints.length; i++) {
intArr[itr] = ints[i];
floatArr[itr] = floats[i];
return KV.of(intArr, floatArr);
* DoFn for calculating next matrix at each iteration.
public static final class CalculateNextMatrix extends DoFn<KV<Integer, KV<int[], float[]>>, KV<Integer, float[]>> {
private static final LAPACK NETLIB_LAPACK = LAPACK.getInstance();
private static final BLAS NETLIB_BLAS = BLAS.getInstance();
private final int numFeatures;
private final double lambda;
private final PCollectionView<Map<Integer, float[]>> fixedMatrixView;
* Constructor for CalculateNextMatrix DoFn class.
* @param numFeatures number of features.
* @param lambda lambda.
* @param fixedMatrixView a PCollectionView of the fixed matrix (item / user matrix).
public CalculateNextMatrix(final int numFeatures,
final double lambda,
final PCollectionView<Map<Integer, float[]>> fixedMatrixView) {
this.numFeatures = numFeatures;
this.lambda = lambda;
this.fixedMatrixView = fixedMatrixView;
* ProcessElement method for BEAM.
* @param c ProcessContext.
* @throws Exception Exception on the way.
public void processElement(final ProcessContext c) throws Exception {
final double[] upperTriangularLeftMatrix = new double[numFeatures * (numFeatures + 1) / 2];
final Map<Integer, float[]> fixedMatrix = c.sideInput(fixedMatrixView);
final int[] indexArr = c.element().getValue().getKey();
final float[] ratingArr = c.element().getValue().getValue();
final int size = indexArr.length;
final float[] vector = new float[numFeatures];
final double[] rightSideVector = new double[numFeatures];
final double[] tmp = new double[numFeatures];
for (int i = 0; i < size; i++) {
final int ratingIndex = indexArr[i];
final float rating = ratingArr[i];
for (int j = 0; j < numFeatures; j++) {
tmp[j] = fixedMatrix.get(ratingIndex)[j];
NETLIB_BLAS.dspr("U", numFeatures, 1.0, tmp, 1, upperTriangularLeftMatrix);
if (rating != 0.0) {
NETLIB_BLAS.daxpy(numFeatures, rating, tmp, 1, rightSideVector, 1);
final double regParam = lambda * size;
int a = 0;
int b = 2;
while (a < upperTriangularLeftMatrix.length) {
upperTriangularLeftMatrix[a] += regParam;
a += b;
b += 1;
final intW info = new intW(0);
NETLIB_LAPACK.dppsv("U", numFeatures, 1, upperTriangularLeftMatrix, rightSideVector, numFeatures, info);
if (info.val != 0) {
throw new RuntimeException("returned info value : " + info.val);
for (int i = 0; i < vector.length; i++) {
vector[i] = (float) rightSideVector[i];
c.output(KV.of(c.element().getKey(), vector));
* Composite transform that wraps the transforms inside the loop.
* The loop updates the user matrix and the item matrix in each iteration.
public static final class UpdateUserAndItemMatrix
extends LoopCompositeTransform<PCollection<KV<Integer, float[]>>, PCollection<KV<Integer, float[]>>> {
private final Integer numFeatures;
private final double lambda;
private final PCollection<KV<Integer, KV<int[], float[]>>> parsedUserData;
private final PCollection<KV<Integer, KV<int[], float[]>>> parsedItemData;
* Constructor of UpdateUserAndItemMatrix CompositeTransform.
* @param numFeatures number of features.
* @param lambda lambda.
* @param parsedUserData PCollection of parsed user data.
* @param parsedItemData PCollection of parsed item data.
UpdateUserAndItemMatrix(final Integer numFeatures, final double lambda,
final PCollection<KV<Integer, KV<int[], float[]>>> parsedUserData,
final PCollection<KV<Integer, KV<int[], float[]>>> parsedItemData) {
this.numFeatures = numFeatures;
this.lambda = lambda;
this.parsedUserData = parsedUserData;
this.parsedItemData = parsedItemData;
public PCollection<KV<Integer, float[]>> expand(final PCollection<KV<Integer, float[]>> itemMatrix) {
// Make Item Matrix view.
final PCollectionView<Map<Integer, float[]>> itemMatrixView =
itemMatrix.apply(GroupByKey.create()).apply(ParDo.of(new UngroupSingleVectorList())).apply(View.asMap());
// Get new User Matrix
final PCollectionView<Map<Integer, float[]>> userMatrixView = parsedUserData
.apply(ParDo.of(new CalculateNextMatrix(numFeatures, lambda, itemMatrixView)).withSideInputs(itemMatrixView))
.apply(GroupByKey.create()).apply(ParDo.of(new UngroupSingleVectorList())).apply(View.asMap());
// return new Item Matrix
return parsedItemData.apply(ParDo.of(new CalculateNextMatrix(numFeatures, lambda, userMatrixView))
* A DoFn that creates an initial matrix.
public static final class CreateInitialMatrix extends DoFn<KV<Integer, KV<int[], float[]>>, KV<Integer, float[]>> {
private final int numFeatures;
private final boolean isDeterministic;
* @param numFeatures number of the features.
* @param isDeterministic whether or not to initialize the matrix in deterministic mode.
CreateInitialMatrix(final int numFeatures,
final boolean isDeterministic) {
this.numFeatures = numFeatures;
this.isDeterministic = isDeterministic;
* ProcessElement method for BEAM.
* @param c ProcessContext.
* @throws Exception Exception on the way.
public void processElement(final ProcessContext c) throws Exception {
final float[] result = new float[numFeatures];
final KV<Integer, KV<int[], float[]>> element = c.element();
final float[] ratings = element.getValue().getValue();
for (int i = 0; i < ratings.length; i++) {
result[0] += ratings[i];
result[0] /= ratings.length;
for (int i = 1; i < result.length; i++) {
if (isDeterministic) {
result[i] = (float) (0.5 * 0.01); // use a deterministic average value
} else {
result[i] = (float) (Math.random() * 0.01);
c.output(KV.of(element.getKey(), result));
* Main function for the ALS BEAM program.
* @param args arguments.
public static void main(final String[] args) {
final Long start = System.currentTimeMillis();;
final String inputFilePath = args[0];
final Integer numFeatures = Integer.parseInt(args[1]);
final Integer numItr = Integer.parseInt(args[2]);
final Double lambda;
if (args.length > 3) {
lambda = Double.parseDouble(args[3]);
} else {
lambda = 0.05;
final String outputFilePath;
boolean checkOutput = false;
if (args.length > 4) {
outputFilePath = args[4];
checkOutput = true;
} else {
outputFilePath = "";
final PipelineOptions options = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(options);
p.getCoderRegistry().registerCoderProvider(CoderProviders.fromStaticMethods(int[].class, IntArrayCoder.class));
p.getCoderRegistry().registerCoderProvider(CoderProviders.fromStaticMethods(float[].class, FloatArrayCoder.class));
// Read raw data
final PCollection<String> rawData =, inputFilePath);
// Parse data for item
final PCollection<KV<Integer, KV<int[], float[]>>> parsedItemData = rawData
.apply(ParDo.of(new ParseLine(false)))
.apply(Combine.perKey(new TrainingDataCombiner()));
// Parse data for user
final PCollection<KV<Integer, KV<int[], float[]>>> parsedUserData = rawData
.apply(ParDo.of(new ParseLine(true)))
.apply(Combine.perKey(new TrainingDataCombiner()));
// Create Initial Item Matrix
PCollection<KV<Integer, float[]>> itemMatrix =
parsedItemData.apply(ParDo.of(new CreateInitialMatrix(numFeatures, checkOutput)));
// Iterations to update Item Matrix.
for (int i = 0; i < numItr; i++) {
// NOTE: a single composite transform for the iteration.
itemMatrix = itemMatrix.apply(new UpdateUserAndItemMatrix(numFeatures, lambda, parsedUserData, parsedItemData));
if (checkOutput) {
final PCollection<String> result = itemMatrix.apply(MapElements.<KV<Integer, float[]>, String>via(
new SimpleFunction<KV<Integer, float[]>, String>() {
public String apply(final KV<Integer, float[]> elem) {
final List<String> values = Stream.of(ArrayUtils.toObject(elem.getValue()))
return elem.getKey() + "," + String.join(",", values);
GenericSourceSink.write(result, outputFilePath);
};"JCT " + (System.currentTimeMillis() - start));