/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.io.FrameWriter;
import org.apache.sysds.runtime.io.FrameWriterFactory;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.junit.Assert;


/**
 * <p>
 * Provides methods to easily create tests. Implemented methods can be used for
 * </p>
 * <ul>
 * <li>data comparison</li>
 * <li>test data generation</li>
 * <li>writing files</li>
 * <li>reading files</li>
 * <li>clean up</li>
 * </ul>
 */
public class TestUtils 
{

	private static final Log LOG = LogFactory.getLog(TestUtils.class.getName());

	/** job configuration used for file system access */
	public static Configuration conf = new Configuration();

	/** global random generator for default seed */
	public static Random random = new Random(System.currentTimeMillis());

	/** internal buffer to store assertion information */
	private static ArrayList<String> _AssertInfos = new ArrayList<>();
	private static boolean _AssertOccured = false;

	/* Compare expected scalar generated by Java with actual scalar generated by DML */
	@SuppressWarnings("resource")
	public static void compareDMLScalarWithJavaScalar(String expectedFile, String actualFile, double epsilon) {
		try {
			String lineExpected = null;
			String lineActual = null;
			
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(compareFile, conf);
			FSDataInputStream fsin = fs.open(compareFile);
			try( BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) {
				lineExpected = compareIn.readLine();
			}
			
			Path outFile = new Path(actualFile);
			FSDataInputStream fsout = fs.open(outFile);
			try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ) {
				lineActual = outIn.readLine();
			}

			assertEquals(expectedFile + ": " + lineExpected + " vs " + actualFile + ": " + lineActual,
					Double.parseDouble(lineExpected), Double.parseDouble(lineActual), epsilon);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}
	
	/**
	 * Compares contents of an expected file with the actual file, where rows may be permuted
	 * @param expectedFile
	 * @param actualDir
	 * @param epsilon
	 */
	@SuppressWarnings("resource")
	public static void compareDMLMatrixWithJavaMatrixRowsOutOfOrder(String expectedFile, String actualDir, double epsilon)
	{
		try {
			HashMap<CellIndex, Double> expectedValues = new HashMap<>();
			
			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);
			readValuesFromFileStream(fsin, expectedValues);

			HashMap<CellIndex, Double> actualValues = new HashMap<>();

			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				readValuesFromFileStream(fsout, actualValues);
			}

			ArrayList<Double> e_list = new ArrayList<>();
			for (CellIndex index : expectedValues.keySet()) {
				Double expectedValue = expectedValues.get(index);
				if(expectedValue != 0.0)
					e_list.add(expectedValue);
			}
			
			ArrayList<Double> a_list = new ArrayList<>();
			for (CellIndex index : actualValues.keySet()) {
				Double actualValue = actualValues.get(index);
				if(actualValue != 0.0)
					a_list.add(actualValue);
			}
			
			Collections.sort(e_list);
			Collections.sort(a_list);
			
			assertTrue("Matrix nzs not equal", e_list.size() == a_list.size());
			for(int i=0; i < e_list.size(); i++)
			{
				assertTrue("Matrix values not equals", Math.abs(e_list.get(i) - a_list.get(i)) <= epsilon);
			}
			
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}
	
	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS with Matrix Market format
	 * </p>
	 * 
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	@SuppressWarnings("resource")
	public static void compareMMMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) {
		try {
			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);
			
			HashMap<CellIndex, Double> expectedValues = new HashMap<>();
			String[] expRcn = null;
			
			try(BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) {
				// skip the header of Matrix Market file
				String line = compareIn.readLine();
				
				// rows, cols and nnz
				line = compareIn.readLine();
				expRcn = line.split(" ");
				
				readValuesFromFileStreamAndPut(compareIn, expectedValues);
			}
			
			HashMap<CellIndex, Double> actualValues = new HashMap<>();

			FSDataInputStream fsout = fs.open(outDirectory);
			try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ) {
					
				//skip MM header
				String line = outIn.readLine();
				
				//rows, cols and nnz
				line = outIn.readLine();
				String[] rcn = line.split(" ");
				
				if (Integer.parseInt(expRcn[0]) != Integer.parseInt(rcn[0])) {
					LOG.warn(" Rows mismatch: expected " + Integer.parseInt(expRcn[0]) + ", actual " + Integer.parseInt(rcn[0]));
				}
				else if (Integer.parseInt(expRcn[1]) != Integer.parseInt(rcn[1])) {
					LOG.warn(" Cols mismatch: expected " + Integer.parseInt(expRcn[1]) + ", actual " + Integer.parseInt(rcn[1]));
				}
				else if (Integer.parseInt(expRcn[2]) != Integer.parseInt(rcn[2])) {
					LOG.warn(" Nnz mismatch: expected " + Integer.parseInt(expRcn[2]) + ", actual " + Integer.parseInt(rcn[2]));
				}

				readValuesFromFileStreamAndPut(outIn, actualValues);
			}

			Set<CellIndex> allKeys = new HashSet<>();
			allKeys.addAll(expectedValues.keySet());
			if(expectedValues.size() != actualValues.size())
				allKeys.addAll(actualValues.keySet());

			int countErrors = 0;
			for (CellIndex index : allKeys) {
				Double expectedValue = expectedValues.get(index);
				Double actualValue = actualValues.get(index);
				if (expectedValue == null)
					expectedValue = 0.0;
				if (actualValue == null)
					actualValue = 0.0;

				if (!compareCellValue(expectedValue, actualValue, epsilon, false)) {
					System.out.println(expectedFile+": "+index+" mismatch: expected " + expectedValue + ", actual " + actualValue);
					countErrors++;
				}
			}
			assertTrue("for file " + actualDir + " " + countErrors + " values are not equal", countErrors == 0);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}
	
	/**
	 * Read doubles from the input stream and put them into the given hashmap of values. 
	 * @param inputStream input stream of doubles with related indices
	 * @param values hashmap of values (initially empty)
	 * @throws IOException 
	 */
	public static void readValuesFromFileStream(FSDataInputStream inputStream, HashMap<CellIndex, Double> values)
		throws IOException
	{
		try( BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream)) ) {
			readValuesFromFileStreamAndPut(inReader, values);
		}
	}

	/**
	 * Read values from file stream and put into hashmap
	 * @param inReader BufferedReader to read values from
	 * @param values hashmap where values are put
	*/
	public static void readValuesFromFileStreamAndPut(BufferedReader inReader, HashMap<CellIndex, Double> values) 
		throws IOException
	{
		String line = null;
		while ((line = inReader.readLine()) != null) {
			StringTokenizer st = new StringTokenizer(line, " ");
			int i = Integer.parseInt(st.nextToken());
			int j = Integer.parseInt(st.nextToken());
			double v = Double.parseDouble(st.nextToken());
			values.put(new CellIndex(i, j), v);
		}
	}

	/**
	 * <p>
	 * Read the cell values of the expected file and actual files. Schema is used for correct parsing if the file is a
	 * frame and if it is null FP64 will be used for all values (useful for Matrices).
	 * </p>
	 *
	 * @param schema         the schema of the frame, can be null (for FP64)
	 * @param expectedFile   the file with expected values
	 * @param actualDir      the directory where the actual values were written
	 * @param expectedValues the HashMap where the expected values will be written to
	 * @param actualValues   the HashMap where the actual values will be written to
	 */
	private static void readActualAndExpectedFile(ValueType[] schema, String expectedFile, String actualDir,
		HashMap<CellIndex, Object> expectedValues, HashMap<CellIndex, Object> actualValues) {
		try {
			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);

			try(BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin))) {
				String line;
				while((line = compareIn.readLine()) != null) {
					StringTokenizer st = new StringTokenizer(line, " ");
					int i = Integer.parseInt(st.nextToken());
					int j = Integer.parseInt(st.nextToken());
					ValueType vt = (schema != null) ? schema[j - 1] : ValueType.FP64;
					Object obj = UtilFunctions.stringToObject(vt, st.nextToken());
					expectedValues.put(new CellIndex(i, j), obj);
				}
			}

			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for(FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					String line;
					while((line = outIn.readLine()) != null) {
						StringTokenizer st = new StringTokenizer(line, " ");
						int i = Integer.parseInt(st.nextToken());
						int j = Integer.parseInt(st.nextToken());
						ValueType vt = (schema != null) ? schema[j - 1] : ValueType.FP64;
						Object obj = UtilFunctions.stringToObject(vt, st.nextToken());
						actualValues.put(new CellIndex(i, j), obj);
					}
				}
			}
		}
		catch(IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}
	
	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS
	 * </p>
	 * 
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	@SuppressWarnings("resource")
	public static void compareDMLMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) {
		HashMap<CellIndex, Object> expectedValues = new HashMap<>();
		HashMap<CellIndex, Object> actualValues = new HashMap<>();

		readActualAndExpectedFile(null, expectedFile, actualDir, expectedValues, actualValues);

		Set<CellIndex> allKeys = new HashSet<>();
		allKeys.addAll(expectedValues.keySet());
		if(expectedValues.size() != actualValues.size())
			allKeys.addAll(actualValues.keySet());
		int countErrors = 0;
		for(CellIndex index : allKeys) {
			Double expectedValue = (Double) expectedValues.get(index);
			Double actualValue = (Double) actualValues.get(index);
			if(expectedValue == null)
				expectedValue = 0.0;
			if(actualValue == null)
				actualValue = 0.0;

			if(!compareCellValue(expectedValue, actualValue, epsilon, false)) {
				System.out.println(
					expectedFile + ": " + index + " mismatch: expected " + expectedValue + ", actual " + actualValue);
				countErrors++;
			}
		}
		assertEquals("for file " + actualDir + " " + countErrors + " values are not equal", 0, countErrors);
	}
	
	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS
	 * </p>
	 *
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 */
	@SuppressWarnings("resource")
	public static void compareDMLFrameWithJavaFrame(ValueType[] schema, String expectedFile, String actualDir) {
		HashMap<CellIndex, Object> expectedValues = new HashMap<>();
		HashMap<CellIndex, Object> actualValues = new HashMap<>();

		readActualAndExpectedFile(schema, expectedFile, actualDir, expectedValues, actualValues);

		Set<CellIndex> allKeys = new HashSet<>();
		allKeys.addAll(expectedValues.keySet());
		if(expectedValues.size() != actualValues.size())
			allKeys.addAll(actualValues.keySet());
		int countErrors = 0;
		for(CellIndex index : allKeys) {
			Object expectedValue = expectedValues.get(index);
			Object actualValue = actualValues.get(index);

			int j = index.column;
			if(UtilFunctions.compareTo(schema[j - 1], expectedValue, actualValue) != 0) {
				System.out.println(
					expectedFile + ": " + index + " mismatch: expected " + expectedValue + ", actual " + actualValue);
				countErrors++;
			}
		}
		assertEquals("for file " + actualDir + " " + countErrors + " values are not equal", 0, countErrors);
	}
	
	public static void compareTensorBlocks(TensorBlock tb1, TensorBlock tb2) {
		Assert.assertEquals(tb1.getValueType(), tb2.getValueType());
		Assert.assertArrayEquals(tb1.getSchema(), tb2.getSchema());
		Assert.assertEquals(tb1.getNumRows(), tb2.getNumRows());
		Assert.assertEquals(tb1.getNumColumns(), tb2.getNumColumns());
		for (int i = 0; i < tb1.getNumRows(); i++)
			for (int j = 0; j < tb1.getNumColumns(); j++)
				Assert.assertEquals(tb1.get(new int[]{i, j}), tb2.get(new int[]{i, j}));
	}
	
	public static TensorBlock createBasicTensor(ValueType vt, int rows, int cols, double sparsity) {
		return DataConverter.convertToTensorBlock(TestUtils.round(
			MatrixBlock.randOperations(rows, cols, sparsity, 0, 10, "uniform", 7)), vt, true);
	}
	
	public static TensorBlock createDataTensor(ValueType vt, int rows, int cols, double sparsity) {
		return DataConverter.convertToTensorBlock(TestUtils.round(
			MatrixBlock.randOperations(rows, cols, sparsity, 0, 10, "uniform", 7)), vt, false);
	}

	/**
	 * Reads values from a matrix file in HDFS in DML format
	 * 
	 * @deprecated You should not use this method, it is recommended to use the
	 *             corresponding method in AutomatedTestBase
	 * @param filePath
	 * @return
	 */
	public static HashMap<CellIndex, Double> readDMLMatrixFromHDFS(String filePath) 
	{
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		
		try 
		{
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);

			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream outIn = fs.open(file.getPath());
				readValuesFromFileStream(outIn, expectedValues);
			}
		} 
		catch (IOException e) {
			assertTrue("could not read from file " + filePath+": "+e.getMessage(), false);
		}

		return expectedValues;
	}

	/**
	 * Reads values from a matrix file in OS's FS in R format
	 * 
	 * @deprecated You should not use this method, it is recommended to use the
	 *             corresponding method in AutomatedTestBase
	 * 
	 * @param filePath
	 * @return
	 */
	public static HashMap<CellIndex, Double> readRMatrixFromFS(String filePath) 
	{
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		
		try(BufferedReader reader = new BufferedReader(new FileReader(filePath))) 
		{
			// skip both R header lines
			String line = reader.readLine();
			
			int matrixType = -1;
			if ( line.endsWith(" general") )
				matrixType = 1;
			if ( line.endsWith(" symmetric") )
				matrixType = 2;
			
			if ( matrixType == -1 )
				throw new RuntimeException("unknown matrix type while reading R matrix: " + line);
			
			line = reader.readLine(); // header line with dimension and nnz information
			
			while ((line = reader.readLine()) != null) {
				StringTokenizer st = new StringTokenizer(line, " ");
				int i = Integer.parseInt(st.nextToken());
				int j = Integer.parseInt(st.nextToken());
				if( st.hasMoreTokens() ) {
					double v = Double.parseDouble(st.nextToken());
					if( v==0.0 ) continue;
					expectedValues.put(new CellIndex(i, j), v);
					if ( matrixType == 2 )
						expectedValues.put(new CellIndex(j, i), v);
				}
				else { //pattern
					expectedValues.put(new CellIndex(i, j), 1.0);
					if ( matrixType == 2 )
						expectedValues.put(new CellIndex(j, i), 1.0);
				}
			}
		} 
		catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		
		return expectedValues;
	}
	
	/**
	 * Reads a scalar value in DML format from HDFS
	 */
	public static HashMap<CellIndex, Double> readDMLScalarFromHDFS(String filePath) {
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		expectedValues.put(new CellIndex(1,1), readDMLScalar(filePath));
		return expectedValues;
	}

	public static double readDMLScalar(String filePath) {
		try {
			double d=Double.NaN;
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			String line;
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))){
					while ((line = outIn.readLine()) != null) { // only 1 scalar value in file
						d = Double.parseDouble(line);
					}
				}
			}
			return d;
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return Double.NaN;
	}

	public static boolean readDMLBoolean(String filePath) {
		try {
			Boolean b = null;
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			String line;
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					while ((line = outIn.readLine()) != null) { // only 1 scalar value in file
						b = Boolean.valueOf(Boolean.parseBoolean(line));
					}
				}
			}
			return b.booleanValue();
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return _AssertOccured;
	}
	
	public static String readDMLString(String filePath) {
		try {
			StringBuilder sb =  new StringBuilder();
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(InputStreamReader is = new InputStreamReader(fsout)){
					sb.append(IOUtils.toString(is));
				}
			}
			return sb.toString();
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return null;
	}
		
	
	/**
	 * Reads a scalar value in R format from OS's FS
	 */
	public static HashMap<CellIndex, Double> readRScalarFromFS(String filePath) {
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		expectedValues.put(new CellIndex(1,1), readRScalar(filePath));
		return expectedValues;
	}
	
	public static Double readRScalar(String filePath) {
		try {
			double d = Double.NaN;
			try(BufferedReader compareIn = new BufferedReader(new FileReader(filePath))) {
				String line;
				while ((line = compareIn.readLine()) != null) { // only 1 scalar value in file
					d = Double.parseDouble(line);
				}
			}
			return d;
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return Double.NaN;
	}
	
	public static String processMultiPartCSVForR(String csvFile) throws IOException {
		File csv = new File(csvFile);
		if (csv.isDirectory()) {
			File[] parts = csv.listFiles();
			
			int count=0;
			int index = -1;
			for(int i=0; i < parts.length; i++ ) {
				File f = parts[i];
				String path = f.getPath();
				if (path.startsWith(".") && path.endsWith(".crc"))
					continue;
				count++;
				index = i;
			}
			
			if ( count == 1) {
				csvFile = parts[index].toString();
			}
			else if ( count > 1 ) {
				File tmp = new File(csvFile+"_temp.csv");

				try( OutputStreamWriter out = new OutputStreamWriter(new FileOutputStream(tmp), "UTF-8") ) {
					// Directory listing may contain .crc files or may be in the
					// wrong order. Sanitize the list of names.
					ArrayList<String> partNames = new ArrayList<>();
					for (File part : parts) {
						String partName = part.getName();
						if (false == partName.endsWith(".crc")) {
							partNames.add(partName);
						}
					}
					Collections.sort(partNames);

					for (String name : partNames) {
						File part = new File(csv, name);
						// Assume that each file fits into memory.
						String fileContents = FileUtils.readFileToString(part,
								"UTF-8");
						out.append(fileContents);
					}
				}
				
				csvFile = tmp.getCanonicalPath();
			}
			else {
				throw new RuntimeException("Unexpected error while reading a CSV file in R: " + count);
			}
		}
		return csvFile;
	}

	/**
	 * Compares two double values regarding tolerance t. If one or both of them
	 * is null it is converted to 0.0.
	 * 
	 * @param v1
	 * @param v2
	 * @param t Tolerance
	 * @return
	 */
	public static boolean compareCellValue(Double v1, Double v2, double t, boolean ignoreNaN) {
		if (v1 == null)
			v1 = 0.0;
		if (v2 == null)
			v2 = 0.0;
		if( ignoreNaN && (v1.isNaN() || v1.isInfinite() || v2.isNaN() || v2.isInfinite()) )
			return true;
		if (v1.equals(v2))
			return true;

		if(AutomatedTestBase.TEST_GPU) {
			return Math.abs(v1 - v2) <= Math.max(t, AutomatedTestBase.GPU_TOLERANCE);
		}


		return Math.abs(v1 - v2) <= t;
	}
	
	public static void compareMatrices(double[] expectedMatrix, double[] actualMatrix, double epsilon) {
		compareMatrices(new double[][]{expectedMatrix}, 
			new double[][]{actualMatrix}, 1, expectedMatrix.length, epsilon);
	}
	
	/**
	 * Compares two matrices in array format.
	 * 
	 * @param expectedMatrix expected values
	 * @param actualMatrix actual values
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param epsilon tolerance for value comparison
	 */
	public static void compareMatrices(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
			double epsilon) {
		int countErrors = 0;
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (!compareCellValue(expectedMatrix[i][j], actualMatrix[i][j], epsilon, false)) {
					System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		assertTrue("" + countErrors + " values are not in equal", countErrors == 0);
	}
	
	public static void compareFrames(String[][] expectedFrame, String[][] actualFrame, int rows, int cols ) {
		int countErrors = 0;
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if( !( (expectedFrame[i][j]==null && actualFrame[i][j]==null) ||
					expectedFrame[i][j].equals(actualFrame[i][j]) || (expectedFrame[i][j]+".0").equals(actualFrame[i][j])) ) {
					System.out.println(expectedFrame[i][j] +" vs actual: "+actualFrame[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		assertTrue("" + countErrors + " values are not in equal", countErrors == 0);
	}
	
	public static void compareScalars(double d1, double d2, double tol) {
		assertTrue("Given scalars do not match: " + d1 + " != " + d2 , compareCellValue(d1, d2, tol, false));	
	}

	public static void compareMatricesBit(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
		long maxUnitsOfLeastPrecision){
		int countErrors = 0;
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if( !compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j], maxUnitsOfLeastPrecision)){
					System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		assertTrue("" + countErrors + " values are not in equal", countErrors == 0);
	}

	public static void compareMatricesBitAvgDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
		long maxUnitsOfLeastPrecision, long maxAvgDistance, String message){
		int countErrors = 0;
		long sumDistance = 0;
		long distance;
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				distance = compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j]);
				sumDistance += distance;
				if(distance > maxUnitsOfLeastPrecision){
					System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		long avgDistance = sumDistance / (rows * cols);
		assertTrue(message + "\n" + countErrors + " values are not in equal", countErrors == 0);
		assertTrue(message + "\nThe avg distance in bits: "+ avgDistance +" was higher than max: " + maxAvgDistance,
			avgDistance <= maxAvgDistance);
	}

	public static void compareMatricesBitAvgDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows,
		int cols, long maxUnitsOfLeastPrecision, long maxAvgDistance) {
			compareMatricesBitAvgDistance(expectedMatrix, actualMatrix, rows, cols, maxUnitsOfLeastPrecision, maxAvgDistance, "");
	}

	/**
	 * Compare two double precision floats for equality within a margin of error.
	 *  
	 * This can be used to compensate for inequality caused by accumulated
	 * floating point math errors.
	 * 
	 * The error margin is specified in ULPs (units of least precision).
	 * A one-ULP difference means there are no representable floats in between.
	 * E.g. 0f and 1.4e-45f are one ULP apart. So are -6.1340704f and -6.13407f.
	 * Depending on the number of calculations involved, typically a margin of
	 * 1-5 ULPs should be enough.
	 * 
	 * @param d1 The expected value.
	 * @param d2 The actual value.
	 * @return Whether distance in bits
	 */
	public static long compareScalarBits(double d1, double d2) {
		long expectedBits = Double.doubleToLongBits(d1) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d1) : Double.doubleToLongBits(d1);
		long actualBits = Double.doubleToLongBits(d2) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d2) : Double.doubleToLongBits(d2);
		long difference = expectedBits > actualBits ? expectedBits - actualBits : actualBits - expectedBits;
		return difference;
	}

	public static boolean compareScalarBits(double d1, double d2, long maxUnitsOfLeastPrecision) {
		if (Double.isNaN(d1) || Double.isNaN(d2))
			return false;
		long expectedBits = Double.doubleToLongBits(d1) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d1) : Double.doubleToLongBits(d1);
		long actualBits = Double.doubleToLongBits(d2) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d2) : Double.doubleToLongBits(d2);
		long difference = expectedBits > actualBits ? expectedBits - actualBits : actualBits - expectedBits;
		return difference <= maxUnitsOfLeastPrecision;
	}

	public static void compareScalarBitsJUnit(double d1, double d2, long maxUnitsOfLeastPrecision){

		assertTrue("Given scalars do not match: " + d1 + " != " + d2 ,compareScalarBits(d1,d2,maxUnitsOfLeastPrecision));
	}
	
	public static void compareScalars(String expected, String actual) {
			assertEquals(expected, actual);
	}

	public static boolean compareMatrices(HashMap<CellIndex, Double> m1, HashMap<CellIndex, Double> m2,
			double tolerance, String name1, String name2) 
	{
		return compareMatrices(m1, m2, tolerance, name1, name2, false);
	}
	
	public static void compareMatrices(HashMap<CellIndex, Double> m1, MatrixBlock m2, double tolerance) {
		double[][] ret1 = convertHashMapToDoubleArray(m1);
		double[][] ret2 = DataConverter.convertToDoubleMatrix(m2);
		compareMatrices(ret1, ret2, m2.getNumRows(), m2.getNumColumns(), tolerance);
	}
	
	public static void compareMatrices(MatrixBlock m1, MatrixBlock m2, double tolerance) {
		double[][] ret1 = DataConverter.convertToDoubleMatrix(m1);
		double[][] ret2 = DataConverter.convertToDoubleMatrix(m2);
		compareMatrices(ret1, ret2, m2.getNumRows(), m2.getNumColumns(), tolerance);
	}
	
	/**
	 * Compares two matrices given as HashMaps. The matrix containing more nnz
	 * is iterated and each cell value compared against the corresponding cell
	 * in the smaller matrix, to ensure that all values are compared.<br/>
	 * This method does not assert. Instead statistics are added to
	 * AssertionBuffer, at the end of the test you should call
	 * {@link TestUtils#displayAssertionBuffer()}.
	 * 
	 * @param m1
	 * @param m2
	 * @param tolerance
	 * @return True if matrices are identical regarding tolerance.
	 */
	public static boolean compareMatrices(HashMap<CellIndex, Double> m1, HashMap<CellIndex, Double> m2,
			double tolerance, String name1, String name2, boolean ignoreNaN) {
		HashMap<CellIndex, Double> first = m2;
		HashMap<CellIndex, Double> second = m1;
		String namefirst = name2;
		String namesecond = name1;
		boolean flag = true;
		
		// to ensure that always the matrix with more nnz is iterated
		if (m1.size() > m2.size()) {
			first = m1;
			second = m2;
			namefirst = name1;
			namesecond = name2;
			flag=false;
		}

		int countErrorWithinTolerance = 0;
		int countIdentical = 0;
		double minerr = Double.MAX_VALUE;
		double maxerr = -Double.MAX_VALUE;

		for (Entry<CellIndex, Double> e : first.entrySet()) {
			Double v1 = e.getValue() == null ? 0.0 : e.getValue();
			Double v2 = second.get(e.getKey());
			v2 = v2 == null ? 0.0 : v2;
			minerr = Math.min(minerr, Math.abs(v1 - v2));
			maxerr = Math.max(maxerr, Math.abs(v1 - v2));

			if (!compareCellValue(v1, v2, 0, ignoreNaN)) {
				if (!compareCellValue(v1, v2, tolerance, ignoreNaN)) {
					countErrorWithinTolerance++;
					if(!flag)
						System.out.println(e.getKey()+": "+v1+" <--> "+v2);
					else 
						System.out.println(e.getKey()+": "+v2+" <--> "+v1);
				}
			} else {
				countIdentical++;
			}
		}

		String assertPrefix = (countErrorWithinTolerance == 0) ? "    " : "!  ";
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " # stored values in " + namefirst + ": "
				+ first.size());
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " # stored values in " + namesecond + ": "
				+ second.size());
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " identical values(z=0): " + countIdentical);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " wrong values(z=" + tolerance + "): "
				+ countErrorWithinTolerance);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " min error: " + minerr);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " max error: " + maxerr);

		if (countErrorWithinTolerance == 0)
			return true;

		_AssertOccured = true;
		return false;
	}
	
	
	/**
	 * 
	 * @param vt
	 * @param in1
	 * @param in2
	 * @param tolerance
	 * 
	 * @return
	 */
	public static int compareTo(ValueType vt, Object in1, Object in2, double tolerance) {
		if(in1 == null && in2 == null) return 0;
		else if(in1 == null) return -1;
		else if(in2 == null) return 1;
 
		switch( vt ) {
			case STRING:  return ((String)in1).compareTo((String)in2);
			case BOOLEAN: return ((Boolean)in1).compareTo((Boolean)in2);
			case INT64:     return ((Long)in1).compareTo((Long)in2);
			case FP64:  
				return (Math.abs((Double)in1-(Double)in2) < tolerance)?0:
					((Double)in1).compareTo((Double)in2);
			default: throw new RuntimeException("Unsupported value type: "+vt);
		}
	}
	
	/**
	 * 
	 * @param vt
	 * @param in1
	 * @param inR
	 * @return
	 */
	public static int compareToR(ValueType vt, Object in1, Object inR, double tolerance) {
		if(in1 == null && (inR == null || (inR.toString().compareTo("NA")==0))) return 0;
		else if(in1 == null && vt == ValueType.STRING) return -1;
		else if(inR == null) return 1;
 
		switch( vt ) {
			case STRING:  return ((String)in1).compareTo((String)inR);
			case BOOLEAN: 
				if(in1 == null)
					return Boolean.FALSE.compareTo(((Boolean)inR).booleanValue());
				else
					return ((Boolean)in1).compareTo((Boolean)inR);
			case INT64:     
				if(in1 == null)
					return new Long(0).compareTo(((Long)inR));
				else
					return ((Long)in1).compareTo((Long)inR);
			case FP64:  
				if(in1 == null)
					return (new Double(0)).compareTo((Double)inR);
				else
					return (Math.abs((Double)in1-(Double)inR) < tolerance)?0:	
						((Double)in1).compareTo((Double)inR);
			default: throw new RuntimeException("Unsupported value type: "+vt);
		}
	}
	
	/**
	 * Converts a 2D array into a sparse hashmap matrix.
	 * 
	 * @param matrix
	 * @return
	 */
	public static HashMap<CellIndex, Double> convert2DDoubleArrayToHashMap(double[][] matrix) {
		HashMap<CellIndex, Double> hmMatrix = new HashMap<>();
		for (int i = 0; i < matrix.length; i++) {
			for (int j = 0; j < matrix[i].length; j++) {
				if (matrix[i][j] != 0)
					hmMatrix.put(new CellIndex(i + 1, j + 1), matrix[i][j]);
			}
		}

		return hmMatrix;
	}
	
	/**
	 * Method to convert a hashmap of matrix entries into a double array
	 * @param matrix
	 * @return
	 */
	public static double[][] convertHashMapToDoubleArray(HashMap <CellIndex, Double> matrix)
	{
		int max_rows = -1, max_cols= -1;
		for(CellIndex ci :matrix.keySet())
		{
			if(ci.row > max_rows)
			{
				max_rows = ci.row;
			}
			if(ci.column > max_cols)
			{
				max_cols = ci.column;
			}
		}
		
		double [][] ret_arr = new double[max_rows][max_cols];
		
		for(CellIndex ci:matrix.keySet())
		{
			int i = ci.row-1;
			int j = ci.column-1;
			ret_arr[i][j] = matrix.get(ci);
		}
		
		return ret_arr;
		
	}
	
	public static double[][] convertHashMapToDoubleArray(HashMap <CellIndex, Double> matrix, int rows, int cols)
	{
		double [][] ret_arr = new double[rows][cols];
		
		for(CellIndex ci:matrix.keySet()) {
			int i = ci.row-1;
			int j = ci.column-1;
			ret_arr[i][j] = matrix.get(ci);
		}
		
		return ret_arr;
		
	}

	/**
	 * Converts a 2D double array into a 1D double array.
	 * 
	 * @param array
	 * @return
	 */
	public static double[] convert2Dto1DDoubleArray(double[][] array) {
		double[] ret = new double[array.length * array[0].length];
		int c = 0;
		for (int i = 0; i < array.length; i++) {
			for (int j = 0; j < array[0].length; j++) {
				ret[c++] = array[i][j];
			}
		}

		return ret;
	}

	/**
	 * Converts a 1D double array into a 2D double array.
	 * 
	 * @param array
	 * @return
	 */
	public static double[][] convert1Dto2DDoubleArray(double[] array, int rows) {
		int cols = array.length / rows;
		double[][] ret = new double[rows][cols];

		for (int c = 0; c < array.length; c++) {
			ret[c % cols][c / cols] = array[c];
		}

		return ret;
	}

	/**
	 * Asserts the content of assertion buffer, which may contain of all methods
	 * which assert not themselves but add information to that buffer.
	 */
	public static void displayAssertionBuffer() {
		String msg = "Detailed matrices characteristics:\n";
		for (String cur : _AssertInfos) {
			msg += cur + "\n";
		}

		assertTrue(msg, !_AssertOccured);
	}

	/**
	 * <p>
	 * Compares a dml matrix file in HDFS with a file in normal file system
	 * generated by R
	 * </p>
	 * 
	 * @param rFile
	 *            file with values calculated by R
	 * @param hdfsDir
	 *            file with actual values calculated by DML
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	public static void compareDMLHDFSFileWithRFile(String rFile, String hdfsDir, double epsilon) {
		try {
			Path outDirectory = new Path(hdfsDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			HashMap<CellIndex, Double> expectedValues = new HashMap<>();
			HashMap<CellIndex, Double> actualValues = new HashMap<>();
			try(BufferedReader compareIn = new BufferedReader(new FileReader(rFile))) {
				// skip both R header lines
				compareIn.readLine();
				compareIn.readLine();
				readValuesFromFileStreamAndPut(compareIn, expectedValues);
			}
			
			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				readValuesFromFileStream(fsout, actualValues);
			}
			Set<CellIndex> allKeys = new HashSet<>();
			allKeys.addAll(expectedValues.keySet());
			if(expectedValues.size() != actualValues.size())
				allKeys.addAll(actualValues.keySet());

			int countErrors = 0;
			for (CellIndex index : allKeys) {
				Double expectedValue = expectedValues.get(index);
				Double actualValue = actualValues.get(index);
				if (expectedValue == null)
					expectedValue = 0.0;
				if (actualValue == null)
					actualValue = 0.0;

				if (!compareCellValue(expectedValue, actualValue, epsilon, false))
					countErrors++;
			}
			assertTrue("for file " + hdfsDir + " " + countErrors + " values are not in equal", countErrors == 0);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks a matrix against a number of specifications.
	 * </p>
	 * 
	 * @param data
	 *            matrix data
	 * @param mc
	 *            matrix characteristics
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 */
	public static void checkMatrix(double[][] data, MatrixCharacteristics mc, long rows, long cols, double min, double max) {
		assertEquals(rows, mc.getRows());
		assertEquals(cols, mc.getCols());
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				assertTrue("invalid value",
						((data[i][j] >= min && data[i][j] <= max) || data[i][j] == 0));
			}
		}
	}

	/**
	 * <p>
	 * Checks a matrix read from a file in text format against a number of
	 * specifications.
	 * </p>
	 * 
	 * @param outDir
	 *            directory containing the matrix
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 */
	public static void checkMatrix(String outDir, long rows, long cols, double min, double max) {
		try {
			Path outDirectory = new Path(outDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			assertTrue(outDir + " does not exist", fs.exists(outDirectory));
			
			if( fs.getFileStatus(outDirectory).isDirectory() )
			{
				FileStatus[] outFiles = fs.listStatus(outDirectory);
				for (FileStatus file : outFiles) {
					FSDataInputStream fsout = fs.open(file.getPath());
					try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ){
						String line;
						while ((line = outIn.readLine()) != null) {
							String[] rcv = line.split(" ");
							long row = Long.parseLong(rcv[0]);
							long col = Long.parseLong(rcv[1]);
							double value = Double.parseDouble(rcv[2]);
							assertTrue("invalid row index", (row > 0 && row <= rows));
							assertTrue("invlaid column index", (col > 0 && col <= cols));
							assertTrue("invalid value", ((value >= min && value <= max) || value == 0));
						}
					}
				}
			}
			else
			{
				FSDataInputStream fsout = fs.open(outDirectory);
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					String line;
					while ((line = outIn.readLine()) != null) {
						String[] rcv = line.split(" ");
						long row = Long.parseLong(rcv[0]);
						long col = Long.parseLong(rcv[1]);
						double value = Double.parseDouble(rcv[2]);
						assertTrue("invalid row index", (row > 0 && row <= rows));
						assertTrue("invlaid column index", (col > 0 && col <= cols));
						assertTrue("invalid value", ((value >= min && value <= max) || value == 0));
					}
				}
			}
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks for matrix in directory existence.
	 * </p>
	 * 
	 * @param outDir
	 *            directory
	 */
	@SuppressWarnings("resource")
	public static void checkForOutputExistence(String outDir) {
		try {
			Path outDirectory = new Path(outDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			assertEquals("number of files in directory not 1", 1, outFiles.length);
			FSDataInputStream fsout = fs.open(outFiles[0].getPath());
			String outLine = null;
			try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
				outLine = outIn.readLine();
			}
			assertNotNull("file is empty", outLine);
			assertTrue("file is empty", outLine.length() > 0);
		} catch (IOException e) {
			fail("unable to read " + outDir + ": " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Removes all the directories specified in the array in HDFS
	 * </p>
	 * 
	 * @param directories
	 *            directories array
	 */
	public static void removeHDFSDirectories(String[] directories) {
		try {
			for (String directory : directories) {
				Path dir = new Path(directory);
				FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf);
				if (fs.exists(dir) && fs.getFileStatus(dir).isDirectory()) {
					fs.delete(dir, true);
				}
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Removes all the directories specified in the array in OS filesystem
	 * </p>
	 * 
	 * @param directories
	 *            directories array
	 */
	public static void removeDirectories(String[] directories) {
		for (String directory : directories) {
			File dir = new File(directory);
			deleteDirectory(dir);
		}
	}

	private static boolean deleteDirectory(File path) {
		if (path.exists()) {
			File[] files = path.listFiles();
			for (int i = 0; i < files.length; i++) {
				if (files[i].isDirectory()) {
					deleteDirectory(files[i]);
				} else {
					files[i].delete();
				}
			}
		}
		return (path.delete());
	}

	/**
	 * <p>
	 * Removes all the files specified in the array in HDFS
	 * </p>
	 * 
	 * @param files
	 *            files array
	 */
	public static void removeHDFSFiles(String[] files) {
		try {
			for (String directory : files) {
				Path dir = new Path(directory);
				FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf);
				if (fs.exists(dir) && !fs.getFileStatus(dir).isDirectory()) {
					fs.delete(dir, false);
				}
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Removes all the files specified in the array in OS filesystem
	 * </p>
	 * 
	 * @param files
	 *            files array
	 */
	public static void removeFiles(String[] files) {
		for (String directory : files) {
			File f = new File(directory);
			if (!f.exists() || !f.canWrite() || f.isDirectory())
				continue;

			f.delete();
		}
	}

	/**
	 * <p>
	 * Clears a complete directory.
	 * </p>
	 * 
	 * @param directory
	 *            directory
	 */
	public static void clearDirectory(String directory) {
		try {
			Path path = new Path(directory);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			FileStatus[] directoryContent = fs.listStatus(path);
			for (FileStatus content : directoryContent) {
				fs.delete(content.getPath(), true);
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters as a two
	 * dimensional array.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 * 
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param sparsity
	 *            sparsity
	 * @param seed
	 *            seed
	 * @return random matrix
	 */
	public static double[][] generateTestMatrix(int rows, int cols, double min, double max, double sparsity, long seed) {
		double[][] matrix = new double[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (random.nextDouble() > sparsity)
					continue;
				matrix[i][j] = (random.nextDouble() * (max - min) + min);
			}
		}

		return matrix;
	}

	/**
	 * 
	 * Generates a test matrix, but only containing real numbers, in the range specified.
	 * 
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param min minimum value whole number
	 * @param max maximum value whole number
	 * @param sparsity sparsity
	 * @param seed seed
	 * @return random matrix containing whole numbers in the range specified.
	 */
	public static int[][] generateTestMatrixIntV(int rows, int cols, int min, int max, double sparsity, long seed) {
		int[][] matrix = new int[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		if (max - min != 0){
			for (int i = 0; i < rows; i++) {
				for (int j = 0; j < cols; j++) {
					if (random.nextDouble() > sparsity)
						continue;
					matrix[i][j] = (random.nextInt((max - min)) + min);
				}
			}
		} else{
			for (int i = 0; i < rows; i++) {
				for (int j = 0; j < cols; j++) {
					if (random.nextDouble() > sparsity)
						continue;
					matrix[i][j] = max;
				}
			}
		}

		return matrix;
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters as a two
	 * dimensional array. The matrix will not contain any zero values.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 * 
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param seed
	 *            seed
	 * @return random matrix
	 */
	public static double[][] generateNonZeroTestMatrix(int rows, int cols, double min, double max, long seed) {
		double[][] matrix = new double[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				double randValue;
				do {
					randValue = random.nextDouble();
				} while (randValue == 0);
				matrix[i][j] = (randValue * (max - min) + min);
			}
		}

		return matrix;
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters and writes it to a
	 * file using the text format.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 * 
	 * @param file
	 *            output file
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param sparsity
	 *            sparsity
	 * @param seed
	 *            seed
	 */
	public static void generateTestMatrixToFile(String file, int rows, int cols, double min, double max,
			double sparsity, long seed) {
		try {
			Path inFile = new Path(file);
			FileSystem fs = IOUtilFunctions.getFileSystem(inFile, conf);
			DataOutputStream out = fs.create(inFile);
			try( PrintWriter pw = new PrintWriter(out) ) {
				Random random = (seed == -1) ? TestUtils.random : new Random(seed);
				
				for (int i = 1; i <= rows; i++) {
					for (int j = 1; j <= cols; j++) {
						if (random.nextDouble() > sparsity)
							continue;
						double value = (random.nextDouble() * (max - min) + min);
						if (value != 0)
							pw.println(i + " " + j + " " + value);
					}
				}
			}
		} catch (IOException e) {
			fail("unable to write test matrix: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 *     Generates a random FrameBlock with given parameters.
	 * </p>
	 */
	public static FrameBlock generateRandomFrameBlock(int rows, int cols, ValueType[] schema, Random random){
		String[] names = new String[cols];
		for(int i = 0; i < cols; i++)
			names[i] = schema[i].toString();
		FrameBlock frameBlock = new FrameBlock(schema, names);
		frameBlock.ensureAllocatedColumns(rows);
		for(int row = 0; row < rows; row++)
			for(int col = 0; col < cols; col++)
				frameBlock.set(row, col, generateRandomValueFromValueType(schema[col], random));
		return frameBlock;
	}

	public static FrameBlock generateRandomFrameBlock(int rows, int cols, ValueType[] schema, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomFrameBlock(rows, cols, schema, random);
	}

	public static FrameBlock generateRandomFrameBlock(int rows, int cols, long seed){
		ValueType[] schema = generateRandomSchema(cols, seed);
		return generateRandomFrameBlock(rows, cols,schema ,seed);
	}

	/**
	 * <p>
	 *     Generates a random Schema with given params. With no type Unknown
	 * </p>
	 *
	 * @param size
	 * 				size of the schema
	 * @param random
	 * 				random Object
	 */
	public static ValueType[] generateRandomSchema(int size, Random random){
		final List<ValueType> valueTypes = Collections.unmodifiableList(Arrays.asList(ValueType.FP64, ValueType.INT64, ValueType.BOOLEAN, ValueType.STRING));
		ValueType[] newSchema = new ValueType[size];
		for(int i = 0; i < size; i++){
			newSchema[i] = valueTypes.get(random.nextInt(valueTypes.size()));
		}
		return newSchema;
	}

	public static ValueType[] generateRandomSchema(int size, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomSchema(size, random);
	}

	/**
	 * <p>
	 *     Generates a random SchemaMap with given params. Maximum name length per name is 10
	 * </p>
	 *
	 * @param size
	 * 				size of the schemaMap
	 * @param random
	 * 				random Object
	 */
	public static Map<String, Integer> generateRandomSchemaMap(int size, Random random){
		Map<String, Integer> schemaMap = new HashMap<>();
		List<String> generatedPaths = new ArrayList<>();
		for(int k = 0; k < (size/2) + 1; k++){
			generatedPaths.add(generateRandomJSONPath(0, random));
		}
		while(generatedPaths.size() < size){
			generateRandomJSONPaths(generatedPaths, random, size - generatedPaths.size());
		}
		for(int i = 0; i < generatedPaths.size(); i++){
			schemaMap.put(generatedPaths.get(i), i);
		}
		return schemaMap;
	}

	public static Map<String, Integer> generateRandomSchemaMap(int size, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomSchemaMap(size, random);
	}



	/**
	 * <p>
	 *     Generates a random JSON paths from a existing set of paths, Function is probabilistic so may have to be
	 *     repeated to get the exact number of paths in size
	 * </p>
	 *
	 * @param size
	 * 				extrapolates the given paths to a MAXIMUM of size paths
	 * @param random
	 * 				random object
	 */
	public static List<String> generateRandomJSONPaths(List<String> paths, Random random, int size){
		List<String> newPaths = new LinkedList<>();
		if(paths.size() == 0 || size <= 0){
			return newPaths;
		}
		int pathslen = paths.size();
		for(int i = 0; i < pathslen; i++){
			String base = paths.get(i);
			int subEntries = random.nextInt(5) + 2;
			for(int c = 0; c < subEntries && size > 0; c++){
				String sub = base + generateRandomJSONPath(0, random);
				paths.add(sub);
				if(c == 0){
					paths.remove(base);
					pathslen--;
					size++;
				}
				size--;
				if(random.nextBoolean()){
					newPaths.add(sub);
					paths.remove(sub);
					size++;
					pathslen--;
				}
			}

		}
		List<String> ret = generateRandomJSONPaths(newPaths, random, size - newPaths.size());
		paths.addAll(ret);
		return paths;
	}

	public static List<String> generateRandomJSONPaths(List<String> paths, long seed, int size){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomJSONPaths(paths, random, size);
	}

	/**
	 * <p>
	 *     Generates a random JSON path
	 * </p>
	 *
	 * @param len
	 * 				length of the new path = len + 1
	 * @param random
	 * 				random Object
	 */
	public static String generateRandomJSONPath(int len, Random random){
		String current = "/" + random.ints('a', 'z' + 1).limit(10).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
		if(len == 0){
			return current;
		}
		return current + generateRandomJSONPath(len - 1, random);
	}

	public static String generateRandomJSONPath(int len, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomJSONPath(len, random);
	}
	/**
	 * <p>
	 *     Generates a random value for a given Value Type
	 * </p>
	 *
	 * @param valueType
	 * 				the ValueType of which to generate the value
	 * @param random
	 * 				random Object
	 */
	public static Object generateRandomValueFromValueType(ValueType valueType, Random random){
		switch (valueType){
			case FP32:	  return random.nextFloat();
			case FP64:    return random.nextDouble();
			case INT32:   return random.nextInt();
			case INT64:   return random.nextLong();
			case BOOLEAN: return random.nextBoolean();
			case STRING:
				return random.ints('a', 'z' + 1)
						.limit(10)
						.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
						.toString();
			default:
				return null;
		}
	}

	public static Object generateRandomValueFromValueType(ValueType valueType, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomValueFromValueType(valueType, random);
	}

	/**
	 * Counts the number of NNZ values in a matrix
	 * 
	 * @param matrix
	 * @return
	 */
	public static int countNNZ(double[][] matrix) {
		int n = 0;
		for (int i = 0; i < matrix.length; i++) {
			for (int j = 0; j < matrix[0].length; j++) {
				if (matrix[i][j] != 0)
					n++;
			}
		}
		return n;
	}

	public static void writeCSVTestMatrix(String file, double[][] matrix) 
	{
		try 
		{
			//create outputstream to HDFS / FS and writer
			Path path = new Path(file);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			DataOutputStream out = fs.create(path, true);
			try( BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {
				//writer actual matrix
				StringBuilder sb = new StringBuilder();
				for (int i = 0; i < matrix.length; i++) {
					sb.setLength(0);
					if ( matrix[i][0] != 0 )
						sb.append(matrix[i][0]);
					for (int j = 1; j < matrix[i].length; j++) {
						sb.append(",");
						if ( matrix[i][j] == 0 ) 
							continue;
						sb.append(matrix[i][j]);
					}
					sb.append('\n');
					pw.append(sb.toString());
				}
			}
		} 
		catch (IOException e) 
		{
			fail("unable to write (csv) test matrix (" + file + "): " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Writes a matrix to a file using the text format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 * @param isR
	 *            when true, writes a R matrix to disk
	 * 
	 */
	public static void writeTestMatrix(String file, double[][] matrix, boolean isR) 
	{
		try 
		{
			//create outputstream to HDFS / FS and writer
			DataOutputStream out = null;
			if (!isR) {
				Path path = new Path(file);
				FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
				out = fs.create(path, true);
			} 
			else {
				out = new DataOutputStream(new FileOutputStream(file));
			}
			
			try( BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {
				
				//write header
				if( isR ) {
					/** add R header */
					pw.append("%%MatrixMarket matrix coordinate real general\n");
					pw.append("" + matrix.length + " " + matrix[0].length + " " + matrix.length*matrix[0].length+"\n");
				}
				
				//writer actual matrix
				StringBuilder sb = new StringBuilder();
				boolean emptyOutput = true;
				for (int i = 0; i < matrix.length; i++) {
					for (int j = 0; j < matrix[i].length; j++) {
						if ( matrix[i][j] == 0 ) 
							continue;
						sb.append(i + 1);
						sb.append(' ');
						sb.append(j + 1);
						sb.append(' ');
						sb.append(matrix[i][j]);
						sb.append('\n');
						pw.append(sb.toString());
						sb.setLength(0);
						emptyOutput = false;
					}
				}
				
				//writer dummy entry if empty
				if( emptyOutput )
					pw.append("1 1 " + matrix[0][0]);
			}
		} 
		catch (IOException e) 
		{
			fail("unable to write test matrix (" + file + "): " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Writes a matrix to a file using the text format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 */
	public static void writeTestMatrix(String file, double[][] matrix) {
		writeTestMatrix(file, matrix, false);
	}

	
	/**
	 * <p>
	 * Writes a frame to a file using the text format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param data
	 *            frame data
	 * @param isR
	 * @throws IOException 
	 */
	public static void writeTestFrame(String file, double[][] data, ValueType[] schema, FileFormat fmt, boolean isR) throws IOException {
		FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt);
		FrameBlock frame = new FrameBlock(schema);
		initFrameData(frame, data, schema, data.length);
		writer.writeFrameToHDFS(frame, file, data.length, schema.length);
	}
	
	/**
	 * <p>
	 * Writes a frame to a file using the text format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param data
	 *            frame data
	 * @throws IOException 
	 */
	public static void writeTestFrame(String file, double[][] data, ValueType[] schema, FileFormat fmt) throws IOException {
		writeTestFrame(file, data, schema, fmt, false);
	}

	public static void initFrameData(FrameBlock frame, double[][] data, ValueType[] lschema, int rows) {
		Object[] row1 = new Object[lschema.length];
		for( int i=0; i<rows; i++ ) {
			for( int j=0; j<lschema.length; j++ ) {
				data[i][j] = UtilFunctions.objectToDouble(lschema[j], 
						row1[j] = UtilFunctions.doubleToObject(lschema[j], data[i][j]));
				if(row1[j] != null && lschema[j] == ValueType.STRING)
					row1[j] = "Str" + row1[j];
			}
			frame.appendRow(row1);
		}
	}

	
	/* Write a scalar value to a file */
	public static void writeTestScalar(String file, double value) {
		try {
			DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
			try( PrintWriter pw = new PrintWriter(out) ) {
				pw.println(value);
			}
		} catch (IOException e) {
			fail("unable to write test scalar (" + file + "): " + e.getMessage());
		}
	}

	public static void writeTestScalar(String file, long value) {
		try {
			DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
			try( PrintWriter pw = new PrintWriter(out) ) {
				pw.println(value);
			}
		} catch (IOException e) {
			fail("unable to write test scalar (" + file + "): " + e.getMessage());
		}
	}
	
	/**
	 * <p>
	 * Writes a matrix to a file using the binary cells format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 */
	@SuppressWarnings("deprecation")
	public static void writeBinaryTestMatrixCells(String file, double[][] matrix) {
		try {
			SequenceFile.Writer writer = null;
			try {
				Path path = new Path(file);
				FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
				writer = new SequenceFile.Writer(fs, conf, path,
					MatrixIndexes.class, MatrixCell.class);

				MatrixIndexes index = new MatrixIndexes();
				MatrixCell value = new MatrixCell();
				for (int i = 0; i < matrix.length; i++) {
					for (int j = 0; j < matrix[i].length; j++) {
						if (matrix[i][j] != 0) {
							index.setIndexes((i + 1), (j + 1));
							value.setValue(matrix[i][j]);
							writer.append(index, value);
						}
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(writer);
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to write test matrix: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Writes a matrix to a file using the binary blocks format.
	 * </p>
	 * 
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 * @param rowsInBlock
	 *            rows in block
	 * @param colsInBlock
	 *            columns in block
	 * @param sparseFormat
	 *            sparse format
	 */
	@SuppressWarnings("deprecation")
	public static void writeBinaryTestMatrixBlocks(String file, double[][] matrix, int rowsInBlock, int colsInBlock,
			boolean sparseFormat) {
		SequenceFile.Writer writer = null;
			
		try {
			Path path = new Path(file);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			writer = new SequenceFile.Writer(fs, conf, path,
					MatrixIndexes.class, MatrixBlock.class);

			MatrixIndexes index = new MatrixIndexes();
			MatrixBlock value = new MatrixBlock();
			for (int i = 0; i < matrix.length; i += rowsInBlock) {
				int rows = Math.min(rowsInBlock, (matrix.length - i));
				for (int j = 0; j < matrix[i].length; j += colsInBlock) {
					int cols = Math.min(colsInBlock, (matrix[i].length - j));
					index.setIndexes(((i / rowsInBlock) + 1), ((j / colsInBlock) + 1));
					value = new MatrixBlock(rows, cols, sparseFormat);
					for (int k = 0; k < rows; k++) {
						for (int l = 0; l < cols; l++) {
							value.setValue(k, l, matrix[i + k][j + l]);
						}
					}
					writer.append(index, value);
				}
			}
		} 
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to write test matrix: " + e.getMessage());
		}
		finally {
			IOUtilFunctions.closeSilently(writer);
		}
	}

	/**
	 * <p>
	 * Prints out a DML script.
	 * </p>
	 * 
	 * @param dmlScriptFile
	 *            filename of DML script
	 */
	public static void printDMLScript(String dmlScriptFile) {
		System.out.println("Running script: " + dmlScriptFile + "\n");
		System.out.println("******************* DML script *******************");
		try(BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile)))) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print dml script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}

	/**
	 * <p>
	 * Prints out a PYDML script.
	 * </p>
	 * 
	 * @param pydmlScriptFile
	 *            filename of PYDML script
	 */
	public static void printPYDMLScript(String pydmlScriptFile) {
		System.out.println("Running script: " + pydmlScriptFile + "\n");
		System.out.println("******************* PYDML script *******************");
		try(BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(pydmlScriptFile))) ) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		} 
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print pydml script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}
	
	/**
	 * <p>
	 * Prints out an R script.
	 * </p>
	 * 
	 * @param dmlScriptFile
	 *            filename of RL script
	 */
	public static void printRScript(String dmlScriptFile) {
		System.out.println("Running script: " + dmlScriptFile + "\n");
		System.out.println("******************* R script *******************");
		try( BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile)))) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print R script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}

	/**
	 * <p>
	 * Renames a temporary DML script file back to it's original name.
	 * </p>
	 * 
	 * @param dmlScriptFile
	 *            temporary script file
	 */
	public static void renameTempDMLScript(String dmlScriptFile) {
		File oldPath = new File(dmlScriptFile + "t");
		File newPath = new File(dmlScriptFile);
		oldPath.renameTo(newPath);
	}

	/**
	 * <p>
	 * Removes all temporary files and directories in the current working
	 * directory.
	 * </p>
	 */
	public static void removeTemporaryFiles() {
		try {
			Path workingDir = new Path(".");
			FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf);
			FileStatus[] files = fs.listStatus(workingDir);
			for (FileStatus file : files) {
				String fileName = file.getPath().toString().substring(
						file.getPath().getParent().toString().length() + 1);
				if (fileName.contains("temp"))
					fs.delete(file.getPath(), false);
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to remove temporary files: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks if any temporary files or directories exist in the current working
	 * directory.
	 * </p>
	 * 
	 * @return true if temporary files or directories are available
	 */
	@SuppressWarnings("resource")
	public static boolean checkForTemporaryFiles() {
		try {
			Path workingDir = new Path(".");
			FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf);
			FileStatus[] files = fs.listStatus(workingDir);
			for (FileStatus file : files) {
				String fileName = file.getPath().toString().substring(
						file.getPath().getParent().toString().length() + 1);
				if (fileName.contains("temp"))
					return true;
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to remove temporary files: " + e.getMessage());
		}

		return false;
	}

	/**
	 * <p>
	 * Returns the path to a file in a directory if it is the only file in the
	 * directory.
	 * </p>
	 * 
	 * @param directory
	 *            directory containing the file
	 * @return path of the file
	 */
	public static Path getFileInDirectory(String directory) {
		try {
			Path path = new Path(directory);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			FileStatus[] files = fs.listStatus(path);
			if (files.length != 1)
				throw new IOException("requires exactly one file in directory " + directory);

			return files[0].getPath();
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to open file in " + directory);
		}

		return null;
	}

	/**
	 * <p>
	 * Creates an empty file.
	 * </p>
	 * 
	 * @param filename
	 *            filename
	 */
	public static void createFile(String filename) throws IOException {
		Path path = new Path(filename);
		FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
		fs.create(path);
	}

	/**
	 * <p>
	 * Performs transpose onto a matrix and returns the result.
	 * </p>
	 * 
	 * @param a
	 *            matrix
	 * @return transposed matrix
	 */
	public static double[][] performTranspose(double[][] a) {
		int rows = a[0].length;
		int cols = a.length;
		double[][] result = new double[rows][cols];

		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				result[i][j] = a[j][i];
			}
		}

		return result;
	}

	/**
	 * <p>
	 * Performs matrix multiplication onto two matrices and returns the result.
	 * </p>
	 * 
	 * @param a
	 *            left matrix
	 * @param b
	 *            right matrix
	 * @return computed result
	 */
	public static double[][] performMatrixMultiplication(double[][] a, double[][] b) {
		int rows = a.length;
		int cols = b[0].length;
		double[][] result = new double[rows][cols];

		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				double value = 0;
				for (int k = 0; k < a[i].length; k++) {
					value += (a[i][k] * b[k][j]);
				}
				result[i][j] = value;
			}
		}

		return result;
	}

	/**
	 * <p>
	 * Returns a random integer value.
	 * </p>
	 * 
	 * @return random integer value
	 */
	public static int getRandomInt() {
		Random random = new Random(System.currentTimeMillis());
		int randomValue = random.nextInt();
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a positive random integer value.
	 * </p>
	 * 
	 * @return positive random integer value
	 */
	public static int getPositiveRandomInt() {
		int randomValue = TestUtils.getRandomInt();
		if (randomValue < 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a negative random integer value.
	 * </p>
	 * 
	 * @return negative random integer value
	 */
	public static int getNegativeRandomInt() {
		int randomValue = TestUtils.getRandomInt();
		if (randomValue > 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a random double value.
	 * </p>
	 * 
	 * @return random double value
	 */
	public static double getRandomDouble() {
		Random random = new Random(System.currentTimeMillis());
		double randomValue = random.nextInt() * random.nextDouble();
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a positive random double value.
	 * </p>
	 * 
	 * @return positive random double value
	 */
	public static double getPositiveRandomDouble() {
		double randomValue = TestUtils.getRandomDouble();
		if (randomValue < 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a negative random double value.
	 * </p>
	 * 
	 * @return negative random double value
	 */
	public static double getNegativeRandomDouble() {
		double randomValue = TestUtils.getRandomDouble();
		if (randomValue > 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns the string representation of a double value which can be used in
	 * a DML script.
	 * </p>
	 * 
	 * @param value
	 *            double value
	 * @return string representation
	 */
	public static String getStringRepresentationForDouble(double value) {
		NumberFormat nf = NumberFormat.getInstance(new Locale("EN"));
		nf.setGroupingUsed(false);
		nf.setMinimumFractionDigits(1);
		nf.setMaximumFractionDigits(20);
		return nf.format(value);
	}
	
	public static void replaceRandom( double[][] A, int rows, int cols, double replacement, int len ) {
		Random rand = new Random();
		for( int i=0; i<len; i++ )
			A[rand.nextInt(rows-1)][rand.nextInt(cols-1)] = replacement;
	}

	/**
	 * Clears internal assertion information storage
	 */
	public static void clearAssertionInformation() {
		_AssertInfos.clear();
		_AssertOccured = false;
	}

	/**
	 * <p>
	 * Generates a matrix containing easy to debug values in its cells.
	 * </p>
	 * 
	 * @param rows
	 * @param cols
	 * @param bContainsZeros
	 *            If true, the matrix contains zeros. If false, the matrix
	 *            contains only positive values.
	 * @return
	 */
	public static double[][] createNonRandomMatrixValues(int rows, int cols, boolean bContainsZeros) {
		double[][] matrix = new double[rows][cols];
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (!bContainsZeros)
					matrix[i][j] = (i + 1) * 10 + (j + 1);
				else
					matrix[i][j] = (i) * 10 + (j);
			}
		}
		return matrix;
	}
	
	public static double[][] round(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.round(data[i][j]);
		return data;
	}
	
	public static double[][] round(double[][] data, int col) {
		for(int i=0; i<data.length; i++)
			data[i][col]=Math.round(data[i][col]);
		return data;
	}
	
	public static MatrixBlock round(MatrixBlock data) {
		return DataConverter.convertToMatrixBlock(
			round(DataConverter.convertToDoubleMatrix(data)));
	}
	
	public static double[][] floor(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.floor(data[i][j]);
		return data;
	}
	
	public static double[][] ceil(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.ceil(data[i][j]);
		return data;
	}
	
	public static double[][] floor(double[][] data, int col) {
		for(int i=0; i<data.length; i++)
			data[i][col]=Math.floor(data[i][col]);
		return data;
	}
	
	public static double sum(double[][] data, int rows, int cols) {
		double sum = 0;
		for (int i = 0; i< rows; i++){
			for (int j = 0; j < cols; j++){
				sum += data[i][j];
			}
		}
		return sum;
	}
	
	public static long computeNNZ(double[][] data) {
		long nnz = 0;
		for(int i=0; i<data.length; i++)
			nnz += UtilFunctions.computeNnz(data[i], 0, data[i].length);
		return nnz;
	}
	
	public static double[][] seq(int from, int to, int incr) {
		int len = (int)UtilFunctions.getSeqLength(from, to, incr);
		double[][] ret = new double[len][1];
		for(int i=0, val=from; val<=to; i++, val+=incr)
			ret[i][0] = val;
		return ret;
	}
	
	public static void shutdownThreads(Thread... ts) {
		for( Thread t : ts )
			shutdownThread(t);
	}

	public static void shutdownThreads(Process... ts) {
		for( Process t : ts )
			shutdownThread(t);
	}
	
	public static void shutdownThread(Thread t) {
		// kill the worker
		if( t != null ) {
			t.interrupt();
			try {
				t.join();
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void shutdownThread(Process t) {
		// kill the worker
		if( t != null ) {
			Process d = t.destroyForcibly();
			try {
				d.waitFor();
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static String federatedAddress(int port, String input) {
		return federatedAddress("localhost", port, input);
	}
	
	public static String federatedAddress(String host, int port, String input) {
		return host + ':' + port + '/' + input;
	}

	public static double gaussian_probability (double point)
	//  "Handbook of Mathematical Functions", ed. by M. Abramowitz and I.A. Stegun,
	//  U.S. Nat-l Bureau of Standards, 10th print (Dec 1972), Sec. 7.1.26, p. 299
	{
		double t_gp = 1.0 / (1.0 + Math.abs (point) * 0.231641888);  // 0.231641888 = 0.3275911 / sqrt (2.0)
		double erf_gp = 1.0 - t_gp * ( 0.254829592
				+ t_gp * (-0.284496736
				+ t_gp * ( 1.421413741
				+ t_gp * (-1.453152027
				+ t_gp *   1.061405429)))) * Math.exp (- point * point / 2.0);
		erf_gp = erf_gp * (point > 0 ? 1.0 : -1.0);
		return (0.5 + 0.5 * erf_gp);
	}

	public static double logFactorial (double x)
	//  From paper: C. Lanczos "A Precision Approximation of the Gamma Function",
	//  Journal of the SIAM: Numerical Analysis, Series B, Vol. 1, 1964, pp. 86-96
	{
		final double[] cf = {1.000000000178, 76.180091729406, -86.505320327112,
				24.014098222230, -1.231739516140, 0.001208580030, -0.000005363820};
		double a_5 = cf[0] + cf[1] / (x + 1) + cf[2] / (x + 2) + cf[3] / (x + 3)
				+ cf[4] / (x + 4) + cf[5] / (x + 5) + cf[6] / (x + 6);
		return Math.log(a_5) + (x + 0.5) * Math.log(x + 5.5) - (x + 5.5) + 0.91893853320467; // log(sqrt(2 * PI))
	}

	public static long nextPoisson (Random r, double mu)
	// Prob[k] = mu^k * exp(-mu) / k!
	// The main part is from W. H"ormann "The Transformed Rejection Method
	// for Generating Poisson Random Variables"
	{
		if (mu <= 0.0)
			return 0;
		if (mu >= 100000.0)
			return Math.round (mu + Math.sqrt (mu) * r.nextGaussian ());
		if (mu >= 10.0)
		{
			long output = 0;
			double c = mu + 0.445;
			double b = 0.931 + 2.53 * Math.sqrt (mu);
			double a = -0.059 + 0.02483 * b;
			double one_by_alpha = 1.1239 + 1.1328 / (b - 3.4);
			double u_r = 0.43;
			double v_r = 0.9277 - 3.6224 / (b - 2);
			while (true)
			{
				double U;
				double V = r.nextDouble ();
				if (V <= 2 * u_r * v_r)
				{
					U = V / v_r - u_r;
					output = (long) Math.floor ((2 * a / (0.5 - Math.abs (U)) + b) * U + c);
					break;
				}
				if (V >= v_r)
				{
					U = r.nextDouble () - 0.5;
				}
				else
				{
					U = V / v_r - (u_r + 0.5);
					U = Math.signum (U) * 0.5 - U;
					V = v_r * r.nextDouble ();
				}
				double us = 0.5 - Math.abs (U);
				if (0.487 < Math.abs (U) && us < V)
					continue;
				long k = (long) Math.floor ((2 * a / us + b) * U + c);
				double V_to_compare = (V * one_by_alpha) / (a / us / us + b);
				if (0 <= k &&  Math.log (V_to_compare) <= - mu + k * Math.log (mu) - TestUtils.logFactorial (k))
				{
					output = k;
					break;
				}
			}
			return output;
		}
		long count = 0;
		double res_mu = mu;
		while (res_mu > 0.0)
		{
			count ++;
			res_mu += Math.log (r.nextDouble ());
		}
		return count - 1;
	}

	public static double nextGamma (Random r, double alpha)
	// PDF(x) = x^(alpha-1) * exp(-x) / Gamma(alpha)
	// D.Knuth "The Art of Computer Programming", 2nd Edition, Vol. 2, Sec. 3.4.1
	{
		double x;
		if (alpha > 10000.0)
		{
			x = 1.0 - 1.0 / (9.0 * alpha) + r.nextGaussian() / Math.sqrt (9.0 * alpha);
			return alpha * x * x * x;
		}
		else if (alpha > 5.0)
		{
			x = 0.0;
			double the_root = Math.sqrt (2.0 * alpha - 1.0);
			boolean is_accepted = false;
			while (! is_accepted)
			{
				double y = Math.tan (Math.PI * r.nextDouble());
				x = the_root * y + alpha - 1.0;
				if (x <= 0)
					continue;
				double z = Math.exp ((alpha - 1.0) * (1.0 + Math.log (x / (alpha - 1.0))) - x);
				is_accepted = (r.nextDouble() <= z * (1.0 + y * y));
			}
			return x;
		}
		else if (alpha > 0.0)
		{
			x = 1.0;
			double frac_alpha = alpha;
			while (frac_alpha >= 1.0)
			{
				x *= r.nextDouble ();
				frac_alpha -= 1.0;
			}
			double output = - Math.log (x);
			if (frac_alpha > 0.0)  // Has to be between 0 and 1
			{
				double ceee = Math.E / (frac_alpha + Math.E);
				boolean is_accepted = false;
				while (! is_accepted)
				{
					double u = r.nextDouble();
					if (u <= ceee)
					{
						x = Math.pow (u / ceee, 1.0 / frac_alpha);
						is_accepted = (r.nextDouble() <= Math.exp (- x));
					}
					else
					{
						x = 1.0 - Math.log ((1.0 - u) / (1.0 - ceee));
						is_accepted = (r.nextDouble() <= Math.pow (x, frac_alpha - 1.0));
					}
				}
				output += x;
			}
			return output;
		}
		else  //  alpha <= 0.0
			return 0.0;
	}

	public static double[] scaleWeights (double[] w_unscaled, double[][] X, double icept, double meanLF, double sigmaLF)
	{
		int rows = X.length;
		int cols = w_unscaled.length;
		double[] w = new double [cols];
		for (int j = 0; j < cols; j ++)
			w [j] = w_unscaled [j];

		double sum_wx = 0.0;
		double sum_1x = 0.0;
		double sum_wxwx = 0.0;
		double sum_1x1x = 0.0;
		double sum_wx1x = 0.0;

		for (int i = 0; i < rows; i ++)
		{
			double wx = 0.0;
			double one_x = 0.0;
			for (int j = 0; j < cols; j ++)
			{
				wx += w [j] * X [i][j];
				one_x += X [i][j];
			}
			sum_wx += wx;
			sum_1x += one_x;
			sum_wxwx += wx * wx;
			sum_1x1x += one_x * one_x;
			sum_wx1x += wx * one_x;
		}

		double a0 = (meanLF - icept) * rows * sum_wx / (sum_wx * sum_wx + sum_1x * sum_1x);
		double b0 = (meanLF - icept) * rows * sum_1x / (sum_wx * sum_wx + sum_1x * sum_1x);
		double a1 = sum_1x;
		double b1 = - sum_wx;
		double qA = a1 * a1 * sum_wxwx + 2 * a1 * b1 * sum_wx1x + b1 * b1 * sum_1x1x;
		double qB = 2 * (a0 * a1 * sum_wxwx + a0 * b1 * sum_wx1x + a1 * b0 * sum_wx1x + b0 * b1 * sum_1x1x);
		double qC_nosigmaLF = a0 * a0 * sum_wxwx + 2 * a0 * b0 * sum_wx1x + b0 * b0 * sum_1x1x - rows * (meanLF - icept) * (meanLF - icept);
		double qC = qC_nosigmaLF - rows * sigmaLF * sigmaLF;
		double qD = qB * qB - 4 * qA * qC;
		if (qD < 0)
		{
			double new_sigmaLF = Math.sqrt (qC_nosigmaLF / rows - qB * qB / (4 * qA * rows));
			String error_message = String.format ("Cannot generate the weights: linear form variance demand is too tight!  Try sigmaLF >%8.4f", new_sigmaLF);
			System.out.println (error_message);
			System.out.flush ();
			throw new IllegalArgumentException (error_message);
		}
		double t = (- qB + Math.sqrt (qD)) / (2 * qA);
		double a = a0 + t * a1;
		double b = b0 + t * b1;
		for (int j = 0; j < cols; j ++)
			w [j] = a * w [j] + b;

		double sum_eta = 0.0;
		double sum_sq_eta = 0.0;
		for (int i = 0; i < rows; i ++)
		{
			double eta = 0.0;
			for (int j = 0; j < cols; j ++)
				eta +=  w [j] * X [i][j];
			sum_eta += eta;
			sum_sq_eta += eta * eta;
		}
		double mean_eta = icept + sum_eta / rows;
		double sigma_eta = Math.sqrt ((sum_sq_eta - sum_eta * sum_eta / rows) / (rows - 1));
		System.out.println (String.format ("Linear Form Mean  =%8.4f (Desired:%8.4f)",  mean_eta, meanLF));
		System.out.println (String.format ("Linear Form Sigma =%8.4f (Desired:%8.4f)", sigma_eta, sigmaLF));

		return w;
	}
	public static class GLMDist
	{
		final int dist;         // GLM distribution family type
		final double param;     // GLM parameter, typically variance power of the mean
		final int link;         // GLM link function type
		final double link_pow;  // GLM link function as power of the mean
		double dispersion = 1.0;
		long binom_n = 1;

		public GLMDist (int _dist, double _param, int _link, double _link_pow) {
			dist = _dist; param = _param; link = _link; link_pow = _link_pow;
		}

		public void set_dispersion (double _dispersion) {
			dispersion = _dispersion;
		}

		public void set_binom_n (long _n) {
			binom_n = _n;
		}

		public boolean is_binom_n_needed () {
			return (dist == 2 && param == 1.0);
		}

		public double nextGLM (Random r, double eta) {
			double mu = 0.0;
			switch (link) {
				case 1: // LINK: POWER
					if (link_pow == 0.0)	   // LINK: log
						mu = Math.exp (eta);
					else if (link_pow ==  1.0) // LINK: identity
						mu = eta;
					else if (link_pow == -1.0) // LINK: inverse
						mu = 1.0 / eta;
					else if (link_pow ==  0.5) // LINK: sqrt
						mu = eta * eta;
					else if (link_pow == -2.0) // LINK: 1/mu^2
						mu = Math.sqrt (1.0 / eta);
					else
						mu = Math.pow (eta, 1.0 / link_pow);
					break;
				case 2: // LINK: logit
					mu = 1.0 / (1.0 + Math.exp (- eta));
					break;
				case 3: // LINK: probit
					mu = TestUtils.gaussian_probability (eta);
					break;
				case 4: // LINK: cloglog
					mu = 1.0 - Math.exp (- Math.exp (eta));
					break;
				case 5: // LINK: cauchit
					mu = 0.5 + Math.atan (eta) / Math.PI;
					break;
				default:
					mu = 0.0;
			}

			double output = 0.0;
			if (dist == 1)  // POWER
			{
				double var_pow = param;
				if (var_pow == 0.0) // Gaussian, with dispersion = sigma^2
				{
					output = mu + Math.sqrt (dispersion) * r.nextGaussian ();
				}
				else if (var_pow == 1.0) // Poisson; Negative Binomial if overdispersion
				{
					double lambda = mu;
					if (dispersion > 1.000000001)
					{
						// output = Negative Binomial random variable with:
						//	 Number of failures = mu / (dispersion - 1.0)
						//	 Probability of success = 1.0 - 1.0 / dispersion
						lambda = (dispersion - 1.0) * TestUtils.nextGamma (r, mu / (dispersion - 1.0));
					}
					output = TestUtils.nextPoisson (r, lambda);
				}
				else if (var_pow == 2.0) // Gamma
				{
					double beta = dispersion * mu;
					output = beta * TestUtils.nextGamma (r, mu / beta);
				}
				else if (var_pow == 3.0) // Inverse Gaussian
				{
					// From: Raj Chhikara, J.L. Folks.  The Inverse Gaussian Distribution:
					// Theory: Methodology, and Applications.  CRC Press, 1988, Section 4.5
					double y_Gauss = r.nextGaussian ();
					double mu_y_sq = mu * y_Gauss * y_Gauss;
					double x_invG = 0.5 * dispersion * mu * (2.0 / dispersion + mu_y_sq
							- Math.sqrt (mu_y_sq * (4.0 / dispersion + mu_y_sq)));
					output = ((mu + x_invG) * r.nextDouble() < mu ? x_invG : (mu * mu / x_invG));
				}
				else
				{
					output = mu + Math.sqrt (12.0 * dispersion) * (r.nextDouble () - 0.5);
				}
			}
			else if (dist == 2 && param != 1.0) // Binomial, dispersion ignored
			{
				double bernoulli_zero = param;
				output = (r.nextDouble () < mu ? 1.0 : bernoulli_zero);
			}
			else if (dist == 2) // param == 1.0, Binomial Two-Column, dispersion used
			{
				double alpha_plus_beta = (binom_n - dispersion) / (dispersion - 1.0);
				double alpha = mu * alpha_plus_beta;
				double x = TestUtils.nextGamma (r, alpha);
				double y = TestUtils.nextGamma (r, alpha_plus_beta - alpha);
				double p = x / (x + y);
				long out = 0;
				for (long i = 0; i < binom_n; i++)
					if (r.nextDouble() < p)
						out ++;
				output = out;
			}
			return output;
		}
	}
	
	public static double[][] generateUnbalancedGLMInputDataX(int rows, int cols, double logFeatureVarianceDisbalance) {
		double[][] X = generateTestMatrix(rows, cols, -1.0, 1.0, 1.0, 34567);
		double shift_X = 1.0;
		// make the feature columns of X variance disbalanced
		for (int j = 0; j < cols; j++) {
			double varFactor = Math.pow(10.0, logFeatureVarianceDisbalance * (-0.25 + j / (double) (2 * cols - 2)));
			for (int i = 0; i < rows; i++)
				X[i][j] = shift_X + X[i][j] * varFactor;
		}
		return X;
	}
	
	public static double[] generateUnbalancedGLMInputDataB(double[][] X, int cols, double intercept, double avgLinearForm, double stdevLinearForm, Random r) {
		double[] beta_unscaled = new double[cols];
		for (int j = 0; j < cols; j++)
			beta_unscaled[j] = r.nextGaussian();
		return scaleWeights(beta_unscaled, X, intercept, avgLinearForm, stdevLinearForm);
	}
	
	public static double[][] generateUnbalancedGLMInputDataY(double[][] X, double[] beta, int rows, int cols, GLMDist glmdist, double intercept, double dispersion, Random r) {
		double[][] y = null;
		if (glmdist.is_binom_n_needed())
			y = new double[rows][2];
		else
			y = new double[rows][1];

		for (int i = 0; i < rows; i++) {
			double eta = intercept;
			for (int j = 0; j < cols; j++) {
				eta += X[i][j] * beta[j];
			}
			if (glmdist.is_binom_n_needed()) {
				long n = Math.round(dispersion * (1.0 + 2.0 * r.nextDouble()) + 1.0);
				glmdist.set_binom_n(n);
				y[i][0] = glmdist.nextGLM(r, eta);
				y[i][1] = n - y[i][0];
			}
			else {
				y[i][0] = glmdist.nextGLM(r, eta);
			}
		}
		
		return y;
	}
}
