/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.localdictionary;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.format.LocalDictionaryChunk;

/**
 * Class to maintain page level dictionary. It will store all unique dictionary values
 * used in a page. This is required while writing blocklet level dictionary in carbondata
 * file
 */
public class PageLevelDictionary {

  /**
   * dictionary generator to generate dictionary values for page data
   */
  private LocalDictionaryGenerator localDictionaryGenerator;

  /**
   * set of dictionary surrogate key in this page
   */
  private BitSet usedDictionaryValues;

  private String columnName;

  private DataType dataType;

  private boolean isComplexTypePrimitive;
  // compressor to be used for the dictionary. The compressor is the same as column compressor.
  private String columnCompressor;

  public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName,
      DataType dataType, boolean isComplexTypePrimitive, String columnCompressor) {
    this.localDictionaryGenerator = localDictionaryGenerator;
    this.usedDictionaryValues = new BitSet();
    this.columnName = columnName;
    this.dataType = dataType;
    this.isComplexTypePrimitive = isComplexTypePrimitive;
    this.columnCompressor = columnCompressor;
  }

  /**
   * Below method will be used to get the dictionary value
   *
   * @param data column data
   * @return dictionary value
   * @throws DictionaryThresholdReachedException when threshold crossed for column
   */
  public int getDictionaryValue(byte[] data) throws DictionaryThresholdReachedException {
    int dictionaryValue = localDictionaryGenerator.generateDictionary(data);
    this.usedDictionaryValues.set(dictionaryValue);
    return dictionaryValue;
  }

  /**
   * Method to merge the dictionary value across pages
   *
   * @param pageLevelDictionary other page level dictionary
   */
  public void mergerDictionaryValues(PageLevelDictionary pageLevelDictionary) {
    usedDictionaryValues.or(pageLevelDictionary.usedDictionaryValues);
  }

  /**
   * Below method will be used to get the local dictionary chunk for writing
   * @TODO Support for numeric data type dictionary exclude columns
   * @return encoded local dictionary chunk
   * @throws IOException
   * in case of problem in encoding
   */
  public LocalDictionaryChunk getLocalDictionaryChunkForBlocklet()
      throws IOException {
    // TODO support for actual data type dictionary ColumnSPEC
    ColumnType columnType = ColumnType.PLAIN_VALUE;
    boolean isVarcharType = false;
    int lvSize = CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
    if (DataTypes.VARCHAR == dataType) {
      columnType = ColumnType.PLAIN_LONG_VALUE;
      lvSize = CarbonCommonConstants.INT_SIZE_IN_BYTE;
      isVarcharType = true;
    }
    TableSpec.ColumnSpec spec =
        TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType);
    ColumnPage dictionaryColumnPage = ColumnPage.newPage(
        new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor),
        usedDictionaryValues.cardinality());
    // TODO support data type specific stats collector for numeric data types
    dictionaryColumnPage.setStatsCollector(new DummyStatsCollector());
    int rowId = 0;
    ByteBuffer byteBuffer = null;
    for (int i = usedDictionaryValues.nextSetBit(0);
         i >= 0; i = usedDictionaryValues.nextSetBit(i + 1)) {
      if (!isComplexTypePrimitive) {
        dictionaryColumnPage
            .putData(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(i));
      } else {
        byte[] dictionaryKeyBasedOnValue = localDictionaryGenerator.getDictionaryKeyBasedOnValue(i);
        byteBuffer = ByteBuffer.allocate(lvSize + dictionaryKeyBasedOnValue.length);
        if (!isVarcharType) {
          byteBuffer.putShort((short) dictionaryKeyBasedOnValue.length);
        } else {
          byteBuffer.putInt(dictionaryKeyBasedOnValue.length);
        }
        byteBuffer.put(dictionaryKeyBasedOnValue);
        dictionaryColumnPage.putData(rowId++, byteBuffer.array());
      }
    }
    // creating a encoder
    ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
    // get encoded dictionary values
    LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage);
    // set compressed dictionary values
    localDictionaryChunk.setDictionary_values(
        CompressorFactory.getInstance().getCompressor(columnCompressor).compressByte(
            usedDictionaryValues.toByteArray()));
    // free the dictionary page memory
    dictionaryColumnPage.freeMemory();
    return localDictionaryChunk;
  }
}
