/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.runtime.compress.estim;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.UncompressedBitmap;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;

/**
 * Main abstract class for estimating size of compressions on columns.
 */
public abstract class CompressedSizeEstimator {
	protected static final Log LOG = LogFactory.getLog(CompressedSizeEstimator.class.getName());

	/** The Matrix Block to extract the compression estimates from */
	protected MatrixBlock _data;
	/** The number of rows in the matrix block, extracted to a field because the matrix could be transposed */
	protected final int _numRows;
	/** The number of columns in the matrix block, extracted to a field because the matrix could be transposed */
	protected final int _numCols;
	/** The compression settings to use, for estimating the size, and compress the ColGroups. */
	protected final CompressionSettings _compSettings;

	/**
	 * Main Constructor for Compression Estimator.
	 * 
	 * protected because the factory should be used to construct the CompressedSizeEstimator
	 * 
	 * @param data         The matrix block to extract information from
	 * @param compSettings The Compression settings used.
	 */
	protected CompressedSizeEstimator(MatrixBlock data, CompressionSettings compSettings) {
		_data = data;
		_numRows = compSettings.transposeInput ? _data.getNumColumns() : _data.getNumRows();
		_numCols = compSettings.transposeInput ? _data.getNumRows() : _data.getNumColumns();
		_compSettings = compSettings;
	}

	/**
	 * Single threaded version of extracting Compression Size info
	 * 
	 * @return The Compression Size info of each Column compressed isolated.
	 */
	public CompressedSizeInfo computeCompressedSizeInfos() {
		return computeCompressedSizeInfos(1);
	}

	/**
	 * Multi threaded version of extracting Compression Size info
	 * 
	 * @param k The concurrency degree.
	 * @return The Compression Size info of each Column compressed isolated.
	 */
	public CompressedSizeInfo computeCompressedSizeInfos(int k) {
		CompressedSizeInfoColGroup[] sizeInfos = estimateIndividualColumnGroupSizes(k);
		return computeCompressedSizeInfos(sizeInfos);
	}

	private CompressedSizeInfo computeCompressedSizeInfos(CompressedSizeInfoColGroup[] sizeInfos) {
		List<Integer> colsC = new ArrayList<>();
		List<Integer> colsUC = new ArrayList<>();
		HashMap<Integer, Double> compRatios = new HashMap<>();
		int nnzUC = 0;

		for(int col = 0; col < _numCols; col++) {
			double uncompSize = sizeInfos[col].getCompressionSize(CompressionType.UNCOMPRESSED);
			double minCompressedSize = (double) sizeInfos[col].getMinSize();
			double compRatio = uncompSize / minCompressedSize;

			if(compRatio > 1000) {
				StringBuilder sb = new StringBuilder();
				sb.append("Very good CompressionRatio: " +String.format("%10.1f", compRatio));
				sb.append(" UncompressedSize: " + String.format("%14.0f",uncompSize));
				sb.append(" tCompressedSize: " + String.format("%14.0f",minCompressedSize));
				sb.append(" type: " + sizeInfos[col].getBestCompressionType());
				LOG.warn(sb.toString());
			}

			if(compRatio > 1) {
				colsC.add(col);
				compRatios.put(col, compRatio);
			}
			else {
				colsUC.add(col);
				// TODO nnzUC not incrementing as intended outside this function.
				nnzUC += sizeInfos[col].getEstNnz();
			}
		}

		// correction of column classification (reevaluate dense estimates if necessary)
		if(!MatrixBlock.evalSparseFormatInMemory(_numRows, colsUC.size(), nnzUC) && !colsUC.isEmpty()) {
			for(int i = 0; i < colsUC.size(); i++) {
				int col = colsUC.get(i);
				double uncompSize = MatrixBlock.estimateSizeInMemory(_numRows, 1, 1.0);
				// CompressedMatrixBlock.getUncompressedSize(numRows, 1, 1.0);
				double compRatio = uncompSize / sizeInfos[col].getMinSize();
				if(compRatio > 1) {
					colsC.add(col);
					colsUC.remove(i);
					i--;
					compRatios.put(col, compRatio);
					nnzUC -= sizeInfos[col].getEstNnz();
				}
			}
		}

		if(LOG.isTraceEnabled()) {
			LOG.trace("C: " + Arrays.toString(colsC.toArray(new Integer[0])));
			LOG.trace(
				"-- compression ratios: " + Arrays.toString(colsC.stream().map(c -> compRatios.get(c)).toArray()));
			LOG.trace("UC: " + Arrays.toString(colsUC.toArray(new Integer[0])));
			LOG.trace(
				"-- compression ratios: " + Arrays.toString(colsUC.stream().map(c -> compRatios.get(c)).toArray()));
		}

		return new CompressedSizeInfo(sizeInfos, colsC, colsUC, compRatios, nnzUC);

	}

	private CompressedSizeInfoColGroup[] estimateIndividualColumnGroupSizes(int k) {
		return (k > 1) ? CompressedSizeInfoColGroup(_numCols, k) : CompressedSizeInfoColGroup(_numCols);
	}

	/**
	 * Method used for compressing into one type of colGroup
	 * 
	 * @return CompressedSizeInfo on a compressed colGroup compressing the entire matrix into a single colGroup type.
	 */
	public CompressedSizeInfoColGroup estimateCompressedColGroupSize() {
		int[] colIndexes = makeColIndexes();
		return estimateCompressedColGroupSize(colIndexes);
	}

	/**
	 * Abstract method for extracting Compressed Size Info of specified columns, together in a single ColGroup
	 * 
	 * @param colIndexes The Colums to group together inside a ColGroup
	 * @return The CompressedSizeInformation associated with the selected ColGroups.
	 */
	public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes);

	/**
	 * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitMap. Note this
	 * method works both for the sample based estimator and the exact estimator, since the bitmap, can be extracted from
	 * a sample or from the entire dataset.
	 * 
	 * @param ubm the UncompressedBitMap, either extracted from a sample or from the entier dataset
	 * @return The size factors estimated from the Bit Map.
	 */
	public CompressedSizeEstimationFactors estimateCompressedColGroupSize(UncompressedBitmap ubm) {
		return CompressedSizeEstimationFactors.computeSizeEstimationFactors(ubm,
			_compSettings.validCompressions.contains(CompressionType.RLE),
			_numRows,
			ubm.getNumColumns());
	}

	// ------------------------------------------------
	// PARALLEL CODE
	// ------------------------------------------------

	private CompressedSizeInfoColGroup[] CompressedSizeInfoColGroup(int clen) {
		CompressedSizeInfoColGroup[] ret = new CompressedSizeInfoColGroup[clen];
		for(int col = 0; col < clen; col++)
			ret[col] = estimateCompressedColGroupSize(new int[] {col});
		return ret;
	}

	private CompressedSizeInfoColGroup[] CompressedSizeInfoColGroup(int clen, int k) {
		try {
			ExecutorService pool = CommonThreadPool.get(k);
			ArrayList<SizeEstimationTask> tasks = new ArrayList<>();
			for(int col = 0; col < clen; col++)
				tasks.add(new SizeEstimationTask(this, col));
			List<Future<CompressedSizeInfoColGroup>> rtask = pool.invokeAll(tasks);
			ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>();
			for(Future<CompressedSizeInfoColGroup> lrtask : rtask)
				ret.add(lrtask.get());
			pool.shutdown();
			return ret.toArray(new CompressedSizeInfoColGroup[0]);
		}
		catch(InterruptedException | ExecutionException e) {
			throw new DMLRuntimeException(e);
		}
	}

	private static class SizeEstimationTask implements Callable<CompressedSizeInfoColGroup> {
		private final CompressedSizeEstimator _estimator;
		private final int _col;

		protected SizeEstimationTask(CompressedSizeEstimator estimator, int col) {
			_estimator = estimator;
			_col = col;
		}

		@Override
		public CompressedSizeInfoColGroup call() {
			return _estimator.estimateCompressedColGroupSize(new int[] {_col});
		}
	}

	// ------------------------------------------------
	// PARALLEL CODE END
	// ------------------------------------------------

	// UTIL

	private int[] makeColIndexes() {
		int[] colIndexes = new int[_numCols];
		for(int i = 0; i < _numCols; i++) {
			colIndexes[i] = i;
		}
		return colIndexes;
	}
}
