blob: dac0aa5f0d4a62e53102a2f9df1d06c2f895ff58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.instructions.spark;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.distribution.PoissonDistribution;
import org.apache.commons.math3.random.Well1024a;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.random.SamplingUtils;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.OpOpDG;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.DataGenOp;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.DataGen;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.BasicTensorBlock;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.data.TensorIndexes;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.TensorCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.Statistics;
import scala.Array;
import scala.Tuple2;
public class RandSPInstruction extends UnarySPInstruction {
private static final Log LOG = LogFactory.getLog(RandSPInstruction.class.getName());
// internal configuration
private static final long INMEMORY_NUMBLOCKS_THRESHOLD = 1024 * 1024;
private OpOpDG _method = null;
private final CPOperand rows, cols, dims;
private int blocksize;
//private boolean minMaxAreDoubles;
private final double minValue, maxValue;
private final String minValueStr, maxValueStr;
private final double sparsity;
private final String pdf, pdfParams;
private long seed = 0;
private final String dir;
private final CPOperand seq_from, seq_to, seq_incr;
private Long runtimeSeed;
// sample specific attributes
private final boolean replace;
// seed positions
private static final int SEED_POSITION_RAND = 8;
private static final int SEED_POSITION_SAMPLE = 4;
private RandSPInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows,
CPOperand cols, CPOperand dims, int blen, String minValue, String maxValue,
double sparsity, long seed, String dir, String probabilityDensityFunction, String pdfParams,
CPOperand seqFrom, CPOperand seqTo, CPOperand seqIncr, boolean replace, String opcode, String istr)
{
super(SPType.Rand, op, in, out, opcode, istr);
this._method = mthd;
this.rows = rows;
this.cols = cols;
this.dims = dims;
this.blocksize = blen;
this.minValueStr = minValue;
this.maxValueStr = maxValue;
double minDouble, maxDouble;
try {
minDouble = !minValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ?
Double.valueOf(minValue) : -1;
maxDouble = !maxValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ?
Double.valueOf(maxValue) : -1;
//minMaxAreDoubles = true;
} catch (NumberFormatException e) {
// Non double values
if (!minValueStr.equals(maxValueStr)) {
throw new DMLRuntimeException("Rand instruction does not support " +
"non numeric Datatypes for range initializations.");
}
minDouble = -1;
maxDouble = -1;
//minMaxAreDoubles = false;
}
this.minValue = minDouble;
this.maxValue = maxDouble;
this.sparsity = sparsity;
this.seed = seed;
this.dir = dir;
this.pdf = probabilityDensityFunction;
this.pdfParams = pdfParams;
this.seq_from = seqFrom;
this.seq_to = seqTo;
this.seq_incr = seqIncr;
this.replace = replace;
}
private RandSPInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows,
CPOperand cols, CPOperand dims, int blen, String minValue, String maxValue, double sparsity, long seed,
String dir, String probabilityDensityFunction, String pdfParams, String opcode, String istr)
{
this(op, mthd, in, out, rows, cols, dims, blen, minValue, maxValue, sparsity, seed, dir,
probabilityDensityFunction, pdfParams, null, null, null, false, opcode, istr);
}
private RandSPInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows,
CPOperand cols, CPOperand dims, int blen, CPOperand seqFrom, CPOperand seqTo,
CPOperand seqIncr, String opcode, String istr) {
this(op, mthd, in, out, rows, cols, dims, blen, "-1", "-1", -1, -1, null,
null, null, seqFrom, seqTo, seqIncr, false, opcode, istr);
}
private RandSPInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
CPOperand dims, int blen, String maxValue, boolean replace, long seed, String opcode, String istr) {
this(op, mthd, in, out, rows, cols, dims, blen, "-1", maxValue, -1, seed, null,
null, null, null, null, null, replace, opcode, istr);
}
public long getRows() {
return rows.isLiteral() ? UtilFunctions.parseToLong(rows.getName()) : -1;
}
public long getCols() {
return cols.isLiteral() ? UtilFunctions.parseToLong(cols.getName()) : -1;
}
public int getBlocksize() {
return blocksize;
}
public double getMinValue() {
return minValue;
}
public double getMaxValue() {
return maxValue;
}
public double getSparsity() {
return sparsity;
}
public long getSeed() {
return seed;
}
public String getDims() { return dims.getName(); }
public String getPdf() {
return pdf;
}
public String getPdfParams() {
return pdfParams;
}
public static RandSPInstruction parseInstruction(String str) {
String[] s = InstructionUtils.getInstructionPartsWithValueType ( str );
String opcode = s[0];
OpOpDG method = null;
if ( opcode.equalsIgnoreCase(DataGen.RAND_OPCODE) ) {
method = OpOpDG.RAND;
InstructionUtils.checkNumFields ( str, 10, 11 );
}
else if ( opcode.equalsIgnoreCase(DataGen.SEQ_OPCODE) ) {
method = OpOpDG.SEQ;
// 8 operands: rows, cols, blen, from, to, incr, outvar
InstructionUtils.checkNumFields ( str, 7 );
}
else if ( opcode.equalsIgnoreCase(DataGen.SAMPLE_OPCODE) ) {
method = OpOpDG.SAMPLE;
// 7 operands: range, size, replace, seed, blen, outvar
InstructionUtils.checkNumFields ( str, 6 );
}
Operator op = null;
// output is specified by the last operand
CPOperand out = new CPOperand(s[s.length-1]);
if ( method == OpOpDG.RAND ) {
int missing; // number of missing params (row & cols or dims)
CPOperand rows = null, cols = null, dims = null;
if (s.length == 12) {
missing = 1;
rows = new CPOperand(s[1]);
cols = new CPOperand(s[2]);
}
else {
missing = 2;
dims = new CPOperand(s[1]);
}
int blen = Integer.parseInt(s[4 - missing]);
double sparsity = !s[7 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ?
Double.parseDouble(s[7 - missing]) : -1;
long seed = !s[8 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ?
Long.parseLong(s[8 - missing]) : -1;
String dir = s[9 - missing];
String pdf = s[10 - missing];
String pdfParams = !s[11 - missing].contains( Lop.VARIABLE_NAME_PLACEHOLDER) ?
s[11 - missing] : null;
return new RandSPInstruction(op, method, null, out, rows, cols, dims,
blen, s[5 - missing], s[6 - missing], sparsity, seed, dir, pdf, pdfParams, opcode, str);
}
else if ( method == OpOpDG.SEQ) {
int blen = Integer.parseInt(s[3]);
CPOperand from = new CPOperand(s[4]);
CPOperand to = new CPOperand(s[5]);
CPOperand incr = new CPOperand(s[6]);
CPOperand in = null;
return new RandSPInstruction(op, method, in, out, null,
null, null, blen, from, to, incr, opcode, str);
}
else if ( method == OpOpDG.SAMPLE)
{
String max = !s[1].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ?
s[1] : "0";
CPOperand rows = new CPOperand(s[2]);
CPOperand cols = new CPOperand("1", ValueType.INT64, DataType.SCALAR);
boolean replace = (!s[3].contains(Lop.VARIABLE_NAME_PLACEHOLDER)
&& Boolean.valueOf(s[3]));
long seed = Long.parseLong(s[4]);
int blen = Integer.parseInt(s[5]);
return new RandSPInstruction(op, method, null, out, rows, cols,
null, blen, max, replace, seed, opcode, str);
}
else
throw new DMLRuntimeException("Unrecognized data generation method: " + method);
}
@Override
public void processInstruction( ExecutionContext ec ){
SparkExecutionContext sec = (SparkExecutionContext)ec;
//process specific datagen operator
switch( _method ) {
case RAND: generateRandData(sec); break;
case SEQ: generateSequence(sec); break;
case SAMPLE: generateSample(sec); break;
default:
throw new DMLRuntimeException("Invalid datagen method: "+_method);
}
}
private void generateRandData(SparkExecutionContext sec) {
if (output.getDataType() == DataType.MATRIX) {
generateRandDataMatrix(sec);
} else {
generateRandDataTensor(sec);
}
//reset runtime seed (e.g., when executed in loop)
runtimeSeed = null;
}
@SuppressWarnings("resource")
private void generateRandDataMatrix(SparkExecutionContext sec) {
long lrows = sec.getScalarInput(rows).getLongValue();
long lcols = sec.getScalarInput(cols).getLongValue();
//step 1: generate pseudo-random seed (because not specified)
long lSeed = generateRandomSeed();
if( LOG.isTraceEnabled() )
LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
//step 2: potential in-memory rand operations if applicable
if( ConfigurationManager.isDynamicRecompilation()
&& isMemAvail(lrows, lcols, sparsity, minValue, maxValue)
&& DMLScript.getGlobalExecMode() != ExecMode.SPARK )
{
RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(
pdf, (int) lrows, (int) lcols, blocksize,
sparsity, minValue, maxValue, pdfParams);
MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed);
sec.setMatrixOutput(output.getName(), mb);
Statistics.decrementNoOfExecutedSPInst();
return;
}
//step 3: seed generation
JavaPairRDD<MatrixIndexes, Long> seedsRDD = null;
Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
double totalSize = OptimizerUtils.estimatePartitionedSizeExactSparsity( lrows, lcols, blocksize,
sparsity); //overestimate for on disk, ensures hdfs block per partition
double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
DataCharacteristics tmp = new MatrixCharacteristics(lrows, lcols, blocksize);
long numBlocks = tmp.getNumBlocks();
long numColBlocks = tmp.getNumColBlocks();
//a) in-memory seed rdd construction
if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
{
ArrayList<Tuple2<MatrixIndexes, Long>> seeds = new ArrayList<>();
for( long i=0; i<numBlocks; i++ ) {
long r = 1 + i/numColBlocks;
long c = 1 + i%numColBlocks;
MatrixIndexes indx = new MatrixIndexes(r, c);
Long seedForBlock = bigrand.nextLong();
seeds.add(new Tuple2<>(indx, seedForBlock));
}
//for load balancing: degree of parallelism such that ~128MB per partition
int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
//create seeds rdd
seedsRDD = sec.getSparkContext().parallelizePairs(seeds, numPartitions);
}
//b) file-based seed rdd construction (for robustness wrt large number of blocks)
else
{
Path path = new Path(LibMatrixDatagen.generateUniqueSeedPath(dir));
PrintWriter pw = null;
try
{
FileSystem fs = IOUtilFunctions.getFileSystem(path);
pw = new PrintWriter(fs.create(path));
StringBuilder sb = new StringBuilder();
for( long i=0; i<numBlocks; i++ ) {
sb.append(1 + i/numColBlocks);
sb.append(',');
sb.append(1 + i%numColBlocks);
sb.append(',');
sb.append(bigrand.nextLong());
pw.println(sb.toString());
sb.setLength(0);
}
}
catch( IOException ex ) {
throw new DMLRuntimeException(ex);
}
finally {
IOUtilFunctions.closeSilently(pw);
}
//for load balancing: degree of parallelism such that ~128MB per partition
int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
//create seeds rdd
seedsRDD = sec.getSparkContext()
.textFile(path.toString(), numPartitions)
.mapToPair(new ExtractMatrixSeedTuple());
}
//step 4: execute rand instruction over seed input
JavaPairRDD<MatrixIndexes, MatrixBlock> out = seedsRDD
.mapToPair(new GenerateRandomBlock(lrows, lcols, blocksize,
sparsity, minValue, maxValue, pdf, pdfParams));
//step 5: output handling
DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
if(!mcOut.dimsKnown(true)) {
//note: we cannot compute the nnz from sparsity because this would not reflect the
//actual number of non-zeros, except for extreme values of sparsity equals 0 or 1.
//However, in all cases we keep this information for more coarse-grained decisions.
long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*lrows*lcols) : -1;
mcOut.set(lrows, lcols, blocksize, lnnz);
if( !mcOut.nnzKnown() )
mcOut.setNonZerosBound((long) (sparsity*lrows*lcols));
}
sec.setRDDHandleForVariable(output.getName(), out);
}
@SuppressWarnings("resource")
private void generateRandDataTensor(SparkExecutionContext sec) {
int[] tDims = DataConverter.getTensorDimensions(sec, dims);
//step 1: generate pseudo-random seed (because not specified)
long lSeed = generateRandomSeed();
if( LOG.isTraceEnabled() )
LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
//step 2: TODO potential in-memory rand operations if applicable
//step 3: seed generation
JavaPairRDD<TensorIndexes, Long> seedsRDD;
Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
// TODO calculate totalSize
// TODO use real blocksize given by instruction (once correct)
blocksize = TensorCharacteristics.DEFAULT_BLOCK_SIZE[tDims.length - 2];
long[] longDims = new long[tDims.length];
long totalSize = 1;
long hdfsBlkSize = blocksize * tDims.length;
for (int i = 0; i < tDims.length; i++) {
longDims[i] = tDims[i];
totalSize *= tDims[i];
}
TensorCharacteristics tmp = new TensorCharacteristics(longDims, blocksize, 0);
long numBlocks = tmp.getNumBlocks();
//a) in-memory seed rdd construction
//for load balancing: degree of parallelism such that ~128MB per partition
int numPartitions = (int) Math.max(Math.min(totalSize / hdfsBlkSize, numBlocks), 1);
if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
{
ArrayList<Tuple2<TensorIndexes, Long>> seeds = new ArrayList<>();
long[] ix = new long[tmp.getNumDims()];
Arrays.fill(ix, 1);
for( long i=0; i<numBlocks; i++ ) {
TensorIndexes indx = new TensorIndexes(ix);
Long seedForBlock = bigrand.nextLong();
seeds.add(new Tuple2<>(indx, seedForBlock));
UtilFunctions.computeNextTensorIndexes(tmp, ix);
}
//create seeds rdd
seedsRDD = sec.getSparkContext().parallelizePairs(seeds, numPartitions);
}
//b) file-based seed rdd construction (for robustness wrt large number of blocks)
else
{
Path path = new Path(LibMatrixDatagen.generateUniqueSeedPath(dir));
PrintWriter pw = null;
try
{
FileSystem fs = IOUtilFunctions.getFileSystem(path);
pw = new PrintWriter(fs.create(path));
StringBuilder sb = new StringBuilder();
long[] blockIx = new long[tmp.getNumDims()];
Arrays.fill(blockIx, 1);
for( long i=0; i<numBlocks; i++ ) {
for (int j = tmp.getNumDims() - 1; j >= 0; j--)
sb.append(blockIx[j]).append(',');
sb.append(bigrand.nextLong());
pw.println(sb.toString());
sb.setLength(0);
UtilFunctions.computeNextTensorIndexes(tmp, blockIx);
}
}
catch( IOException ex ) {
throw new DMLRuntimeException(ex);
}
finally {
IOUtilFunctions.closeSilently(pw);
}
//create seeds rdd
seedsRDD = sec.getSparkContext()
.textFile(path.toString(), numPartitions)
.mapToPair(new ExtractTensorSeedTuple());
}
//step 4: execute rand instruction over seed input
// TODO getDimLengthPerBlock accurate for each dimension
JavaPairRDD<TensorIndexes, TensorBlock> out = seedsRDD
.mapToPair(new GenerateRandomTensorBlock(output.getValueType(), tDims, blocksize,
sparsity, minValueStr, maxValueStr, pdf, pdfParams));
//step 5: output handling
DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
if(!mcOut.dimsKnown(true)) {
mcOut.set(tmp);
}
sec.setRDDHandleForVariable(output.getName(), out);
}
@SuppressWarnings("resource")
private void generateSequence(SparkExecutionContext sec) {
double lfrom = sec.getScalarInput(seq_from).getDoubleValue();
double lto = sec.getScalarInput(seq_to).getDoubleValue();
double lincr = sec.getScalarInput(seq_incr).getDoubleValue();
//sanity check valid increment
if( lincr == 0 ) {
throw new DMLRuntimeException("ERROR: While performing seq(" + lfrom + "," + lto + "," + lincr + ")");
}
//handle default 1 to -1 for special case of from>to
lincr = LibMatrixDatagen.updateSeqIncr(lfrom, lto, lincr);
if( LOG.isTraceEnabled() )
LOG.trace("Process RandSPInstruction seq with seqFrom="+lfrom+", seqTo="+lto+", seqIncr"+lincr);
//step 1: offset generation
JavaRDD<Double> offsetsRDD = null;
long nnz = UtilFunctions.getSeqLength(lfrom, lto, lincr);
double totalSize = OptimizerUtils.estimatePartitionedSizeExactSparsity( nnz, 1, blocksize,
nnz); //overestimate for on disk, ensures hdfs block per partition
double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
long numBlocks = (long)Math.ceil(((double)nnz)/blocksize);
//a) in-memory offset rdd construction
if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
{
ArrayList<Double> offsets = new ArrayList<>();
for( long i=0; i<numBlocks; i++ ) {
double off = lfrom + lincr*i*blocksize;
offsets.add(off);
}
//for load balancing: degree of parallelism such that ~128MB per partition
int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
//create offset rdd
offsetsRDD = sec.getSparkContext().parallelize(offsets, numPartitions);
}
//b) file-based offset rdd construction (for robustness wrt large number of blocks)
else
{
Path path = new Path(LibMatrixDatagen.generateUniqueSeedPath(dir));
PrintWriter pw = null;
try {
FileSystem fs = IOUtilFunctions.getFileSystem(path);
pw = new PrintWriter(fs.create(path));
for( long i=0; i<numBlocks; i++ ) {
double off = lfrom + lincr*i*blocksize;
pw.println(off);
}
}
catch( IOException ex ) {
throw new DMLRuntimeException(ex);
}
finally {
IOUtilFunctions.closeSilently(pw);
}
//for load balancing: degree of parallelism such that ~128MB per partition
int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
//create seeds rdd
offsetsRDD = sec.getSparkContext()
.textFile(path.toString(), numPartitions)
.map(new ExtractOffsetTuple());
}
//step 2: execute seq instruction over offset input
JavaPairRDD<MatrixIndexes, MatrixBlock> out = offsetsRDD
.mapToPair(new GenerateSequenceBlock(blocksize, lfrom, lto, lincr));
//step 3: output handling
DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
if(!mcOut.dimsKnown()) {
mcOut.set(nnz, 1, blocksize, nnz);
}
sec.setRDDHandleForVariable(output.getName(), out);
}
/**
* Helper function to construct a sample.
*
* @param sec spark execution context
*/
private void generateSample(SparkExecutionContext sec) {
long lrows = sec.getScalarInput(rows).getLongValue();
if ( maxValue < lrows && !replace )
throw new DMLRuntimeException("Sample (size=" + rows + ") larger than population (size=" + maxValue + ") can only be generated with replacement.");
if( LOG.isTraceEnabled() )
LOG.trace("Process RandSPInstruction sample with range="+ maxValue +", size="+ lrows +", replace="+ replace + ", seed=" + seed);
// sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of the time.
double fraction = SamplingUtils.computeFractionForSampleSize((int)lrows, UtilFunctions.toLong(maxValue), replace);
Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(seed);
// divide the population range across numPartitions by creating SampleTasks
double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
long outputSize = MatrixBlock.estimateSizeDenseInMemory(lrows,1);
int numPartitions = (int) Math.ceil(outputSize/hdfsBlockSize);
long partitionSize = (long) Math.ceil(maxValue /numPartitions);
ArrayList<SampleTask> offsets = new ArrayList<>();
long st = 1;
while (st <= maxValue) {
SampleTask s = new SampleTask();
s.range_start = st;
s.seed = bigrand.nextLong();
offsets.add(s);
st = st + partitionSize;
}
JavaRDD<SampleTask> offsetRDD = sec.getSparkContext().parallelize(offsets, numPartitions);
// Construct the sample in a distributed manner
JavaRDD<Double> rdd = offsetRDD.flatMap( (new GenerateSampleBlock(replace, fraction, (long) maxValue, partitionSize)) );
// Randomize the sampled elements
JavaRDD<Double> randomizedRDD = rdd.mapToPair(new AttachRandom()).sortByKey().values();
// Trim the sampled list to required size & attach matrix indexes to randomized elements
JavaPairRDD<MatrixIndexes, MatrixCell> miRDD = randomizedRDD
.zipWithIndex()
.filter( new TrimSample(lrows) )
.mapToPair( new Double2MatrixCell() );
DataCharacteristics mcOut = new MatrixCharacteristics(lrows, 1, blocksize, lrows);
// Construct BinaryBlock representation
JavaPairRDD<MatrixIndexes, MatrixBlock> mbRDD =
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, true);
sec.getDataCharacteristics(output.getName()).setNonZeros(lrows);
sec.setRDDHandleForVariable(output.getName(), mbRDD);
}
private long generateRandomSeed() {
long lSeed = seed; //seed per invocation
if (lSeed == DataGenOp.UNSPECIFIED_SEED) {
if (runtimeSeed == null)
runtimeSeed = DataGenOp.generateRandomSeed();
lSeed = runtimeSeed;
}
return lSeed;
}
/**
* Private class that defines a sampling task.
* The task produces a portion of sample from range [range_start, range_start+partitionSize].
*
*/
private static class SampleTask implements Serializable
{
private static final long serialVersionUID = -725284524434342939L;
long seed;
long range_start;
@Override
public String toString() { return "(" + seed + "," + range_start +")"; }
}
/**
* Main class to perform distributed sampling.
*
* Each invocation of this FlatMapFunction produces a portion of sample
* to be included in the final output.
*
* The input range from which the sample is constructed is given by
* [range_start, range_start+partitionSize].
*
* When replace=TRUE, the sample is produced by generating Poisson
* distributed counts (denoting the number of occurrences) for each
* element in the input range.
*
* When replace=FALSE, the sample is produced by comparing a generated
* random number against the required sample fraction.
*
* In the special case of fraction=1.0, the permutation of the input
* range is computed, simply by creating RDD of elements from input range.
*
*/
private static class GenerateSampleBlock implements FlatMapFunction<SampleTask, Double>
{
private static final long serialVersionUID = -8211490954143527232L;
private double _frac;
private boolean _replace;
private long _maxValue, _partitionSize;
GenerateSampleBlock(boolean replace, double frac, long max, long psize)
{
_replace = replace;
_frac = frac;
_maxValue = max;
_partitionSize = psize;
}
@Override
public Iterator<Double> call(SampleTask t)
throws Exception {
long st = t.range_start;
long end = Math.min(t.range_start+_partitionSize, _maxValue);
ArrayList<Double> retList = new ArrayList<>();
if ( _frac == 1.0 )
{
for(long i=st; i <= end; i++)
retList.add((double)i);
}
else
{
if(_replace)
{
PoissonDistribution pdist = new PoissonDistribution( (_frac > 0.0 ? _frac :1.0) );
for(long i=st; i <= end; i++)
{
int count = pdist.sample();
while(count > 0) {
retList.add((double)i);
count--;
}
}
}
else
{
Random rnd = new Random(t.seed);
for(long i=st; i <=end; i++)
if ( rnd.nextDouble() < _frac )
retList.add((double) i);
}
}
return retList.iterator();
}
}
/**
* Function that filters the constructed sample contain to required number of elements.
*
*/
private static class TrimSample implements Function<Tuple2<Double,Long>, Boolean> {
private static final long serialVersionUID = 6773370625013346530L;
long _max;
TrimSample(long max) {
_max = max;
}
@Override
public Boolean call(Tuple2<Double, Long> v1) throws Exception {
return ( v1._2 < _max );
}
}
/**
* Function to convert JavaRDD of Doubles to {@code JavaPairRDD<MatrixIndexes, MatrixCell>}
*
*/
private static class Double2MatrixCell implements PairFunction<Tuple2<Double, Long>, MatrixIndexes, MatrixCell>
{
private static final long serialVersionUID = -2125669746624320536L;
@Override
public Tuple2<MatrixIndexes, MatrixCell> call(Tuple2<Double, Long> t)
throws Exception {
long rowID = t._2()+1;
MatrixIndexes mi = new MatrixIndexes(rowID, 1);
MatrixCell mc = new MatrixCell(t._1());
return new Tuple2<>(mi, mc);
}
}
/**
* Pair function to attach a random number as a key to input JavaRDD.
* The produced JavaPairRDD is subsequently used to randomize the sampled elements.
*
*/
private static class AttachRandom implements PairFunction<Double, Double, Double> {
private static final long serialVersionUID = -7508858192367406554L;
Random r = null;
AttachRandom() {
r = new Random();
}
@Override
public Tuple2<Double, Double> call(Double t) throws Exception {
return new Tuple2<>( r.nextDouble(), t );
}
}
private static class ExtractMatrixSeedTuple implements PairFunction<String, MatrixIndexes, Long> {
private static final long serialVersionUID = 3973794676854157101L;
@Override
public Tuple2<MatrixIndexes, Long> call(String arg)
throws Exception
{
String[] parts = IOUtilFunctions.split(arg, ",");
MatrixIndexes ix = new MatrixIndexes(
Long.parseLong(parts[0]), Long.parseLong(parts[1]));
return new Tuple2<>(ix,Long.parseLong(parts[2]));
}
}
private static class ExtractTensorSeedTuple implements PairFunction<String, TensorIndexes, Long> {
private static final long serialVersionUID = 3973794676854157101L;
@Override
public Tuple2<TensorIndexes, Long> call(String arg) throws Exception {
String[] parts = IOUtilFunctions.split(arg, ",");
long[] ix = new long[parts.length - 1];
for (int i = 0; i < parts.length - 1; i++)
ix[i] = Long.parseLong(parts[i]);
TensorIndexes to = new TensorIndexes(ix);
return new Tuple2<>(to,Long.parseLong(parts[parts.length - 1]));
}
}
private static class ExtractOffsetTuple implements Function<String, Double> {
private static final long serialVersionUID = -3980257526545002552L;
@Override
public Double call(String arg) throws Exception {
return Double.parseDouble(arg);
}
}
private static class GenerateRandomBlock implements PairFunction<Tuple2<MatrixIndexes, Long>, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 1616346120426470173L;
private long _rlen;
private long _clen;
private int _blen;
private double _sparsity;
private double _min;
private double _max;
private String _pdf;
private String _pdfParams;
public GenerateRandomBlock(long rlen, long clen, int blen, double sparsity, double min, double max, String pdf, String pdfParams) {
_rlen = rlen;
_clen = clen;
_blen = blen;
_sparsity = sparsity;
_min = min;
_max = max;
_pdf = pdf;
_pdfParams = pdfParams;
}
@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Long> kv)
throws Exception
{
//compute local block size:
MatrixIndexes ix = kv._1();
long blockRowIndex = ix.getRowIndex();
long blockColIndex = ix.getColumnIndex();
int lrlen = UtilFunctions.computeBlockSize(_rlen, blockRowIndex, _blen);
int lclen = UtilFunctions.computeBlockSize(_clen, blockColIndex, _blen);
long seed = kv._2;
MatrixBlock blk = new MatrixBlock();
RandomMatrixGenerator rgen = LibMatrixDatagen
.createRandomMatrixGenerator(_pdf, lrlen, lclen,
_blen, _sparsity, _min, _max, _pdfParams);
blk.randOperationsInPlace(rgen, null, seed);
blk.examSparsity();
return new Tuple2<>(kv._1, blk);
}
}
private static class GenerateRandomTensorBlock implements PairFunction<Tuple2<TensorIndexes, Long>, TensorIndexes, TensorBlock>
{
private static final long serialVersionUID = -512119897654170462L;
private ValueType _vt;
private int[] _dims;
private int _blen;
private double _sparsity;
private String _min;
private String _max;
private String _pdf;
private String _pdfParams;
public GenerateRandomTensorBlock(ValueType vt, int[] dims, int blen,
double sparsity, String min, String max, String pdf, String pdfParams) {
_vt = vt;
_dims = new int[dims.length];
Array.copy(dims, 0, _dims, 0, dims.length);
_blen = blen;
_sparsity = sparsity;
_min = min;
_max = max;
_pdf = pdf;
_pdfParams = pdfParams;
}
@Override
public Tuple2<TensorIndexes, TensorBlock> call(Tuple2<TensorIndexes, Long> kv)
throws Exception
{
//compute local block size:
TensorIndexes ix = kv._1();
// TODO: accurate block size computation
int[] blockDims = new int[_dims.length];
blockDims[0] = UtilFunctions.computeBlockSize(_dims[0], ix.getIndex(0), _blen);
for (int i = 1; i < _dims.length; i++) {
blockDims[i] = UtilFunctions.computeBlockSize(_dims[i], ix.getIndex(i), _blen);
}
int clen = (int) UtilFunctions.prod(blockDims, 1);
long seed = kv._2;
BasicTensorBlock tb = new BasicTensorBlock(_vt, blockDims);
// TODO implement sparse support
tb.allocateDenseBlock();
if (!_min.equals(_max)) {
if (_vt == ValueType.STRING) {
throw new DMLRuntimeException("Random string data can not be generated for tensors.");
}
MatrixBlock blk = new MatrixBlock();
RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(_pdf, blockDims[0], clen,
_blen, _sparsity, Double.parseDouble(_min), Double.parseDouble(_max), _pdfParams);
blk.randOperationsInPlace(rgen, null, seed);
blk.examSparsity();
tb.set(blk);
}
else {
switch (_vt) {
case STRING:
case BOOLEAN:
tb.set(_min);
break;
case INT64:
case INT32:
tb.set(Long.parseLong(_min));
break;
default:
tb.set(Double.parseDouble(_min));
break;
}
}
return new Tuple2<>(kv._1, new TensorBlock(tb));
}
}
private static class GenerateSequenceBlock implements PairFunction<Double, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 5779681055705756965L;
private final double _global_seq_start;
private final double _global_seq_end;
private final double _seq_incr;
private final int _blen;
public GenerateSequenceBlock(int blen, double global_seq_start, double global_seq_end, double seq_incr) {
_global_seq_start = global_seq_start;
_global_seq_end = global_seq_end;
_seq_incr = seq_incr;
_blen = blen;
}
@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Double seq_from) throws Exception {
double seq_to = (_seq_incr > 0) ?
Math.min(_global_seq_end, seq_from + _seq_incr*(_blen-1)) :
Math.max(_global_seq_end, seq_from + _seq_incr*(_blen+1));
long globalRow = Math.round((seq_from-_global_seq_start)/_seq_incr)+1;
long rowIndex = UtilFunctions.computeBlockIndex(globalRow, _blen);
MatrixIndexes indx = new MatrixIndexes(rowIndex, 1);
MatrixBlock blk = MatrixBlock.seqOperations(seq_from, seq_to, _seq_incr);
return new Tuple2<>(indx, blk);
}
}
/**
* This will check if there is sufficient memory locally.
*
* @param lrows number of rows
* @param lcols number of columns
* @param sparsity sparsity ratio
* @param min minimum value
* @param max maximum value
* @return
*/
private static boolean isMemAvail(long lrows, long lcols, double sparsity, double min, double max) {
double size = (min == 0 && max == 0) ? OptimizerUtils.estimateSizeEmptyBlock(lrows, lcols):
OptimizerUtils.estimateSizeExactSparsity(lrows, lcols, sparsity);
return ( OptimizerUtils.isValidCPDimensions(lrows, lcols)
&& OptimizerUtils.isValidCPMatrixSize(lrows, lcols, sparsity)
&& size < OptimizerUtils.getLocalMemBudget() );
}
@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
String tmpInstStr = instString;
if (getSeed() == DataGenOp.UNSPECIFIED_SEED) {
//generate pseudo-random seed (because not specified)
if (runtimeSeed == null)
runtimeSeed = (minValue == maxValue && sparsity == 1) ?
DataGenOp.UNSPECIFIED_SEED : DataGenOp.generateRandomSeed();
int position = (_method == OpOpDG.RAND) ? SEED_POSITION_RAND :
(_method == OpOpDG.SAMPLE) ? SEED_POSITION_SAMPLE : 0;
tmpInstStr = InstructionUtils.replaceOperand(
tmpInstStr, position, String.valueOf(runtimeSeed));
if( !rows.isLiteral() )
tmpInstStr = InstructionUtils.replaceOperand(tmpInstStr, 2,
new CPOperand(ec.getScalarInput(rows)).getLineageLiteral());
if( !cols.isLiteral() )
tmpInstStr = InstructionUtils.replaceOperand(tmpInstStr, 3,
new CPOperand(ec.getScalarInput(cols)).getLineageLiteral());
}
return Pair.of(output.getName(), new LineageItem(tmpInstStr, getOpcode()));
}
}