// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef IMPALA_EXEC_DELIMITED_TEXT_PARSER_INLINE_H
#define IMPALA_EXEC_DELIMITED_TEXT_PARSER_INLINE_H

#include "delimited-text-parser.h"
#include "util/cpu-info.h"
#include "util/sse-util.h"

namespace impala {

/// Updates the values in the field and tuple masks, escaping them if necessary.
/// If the character at n is an escape character, then delimiters(tuple/field/escape
/// characters) at n+1 don't count.
inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
    uint16_t* delim_mask) {
  // Escape characters can escape escape characters.
  bool first_char_is_escape = *last_char_is_escape;
  bool escape_next = first_char_is_escape;
  for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
    if (escape_next) {
      escape_mask &= ~SSEUtil::SSE_BITMASK[i];
    }
    escape_next = escape_mask & SSEUtil::SSE_BITMASK[i];
  }

  // Remember last character for the next iteration
  *last_char_is_escape = escape_mask &
      SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];

  // Shift escape mask up one so they match at the same bit index as the tuple and
  // field mask (instead of being the character before) and set the correct first bit
  escape_mask = escape_mask << 1 | (first_char_is_escape ? 1 : 0);

  // If escape_mask[n] is true, then tuple/field_mask[n] is escaped
  *delim_mask &= ~escape_mask;
}

template <bool DELIMITED_TUPLES>
template <bool PROCESS_ESCAPES>
inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
    char** next_column_start, int* num_fields, FieldLocation* field_locations) {
  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
    return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
  }
  if (ReturnCurrentColumn()) {
    // Found a column that needs to be parsed, write the start/len to 'field_locations'
    field_locations[*num_fields].start = *next_column_start;
    int64_t field_len = len;
    if (PROCESS_ESCAPES && current_column_has_escape_) {
      field_len = -len;
    }
    field_locations[*num_fields].len = static_cast<int32_t>(field_len);
    ++(*num_fields);
  }
  if (PROCESS_ESCAPES) current_column_has_escape_ = false;
  *next_column_start += len + 1;
  ++column_idx_;
  return Status::OK();
}

template <bool DELIMITED_TUPLES>
template <bool PROCESS_ESCAPES>
inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
    char** last_column, int* num_fields, FieldLocation* field_locations) {
  // Fill in any columns missing from the end of the tuple.
  char* dummy = NULL;
  if (last_column == NULL) last_column = &dummy;
  while (column_idx_ < num_cols_) {
    RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(len, last_column,
        num_fields, field_locations));
    // The rest of the columns will be null.
    last_column = &dummy;
    len = 0;
  }
  return Status::OK();
}

/// SSE optimized raw text file parsing.  SSE4_2 added an instruction (with 3 modes) for
/// text processing.  The modes mimic strchr, strstr and strcmp.  For text parsing, we can
/// leverage the strchr functionality.
//
/// The instruction operates on two sse registers:
///  - the needle (what you are searching for)
///  - the haystack (where you are searching in)
/// Both registers can contain up to 16 characters.  The result is a 16-bit mask with a bit
/// set for each character in the haystack that matched any character in the needle.
/// For example:
///  Needle   = 'abcd000000000000' (we're searching for any a's, b's, c's or d's)
///  Haystack = 'asdfghjklhjbdwwc' (the raw string)
///  Result   = '1010000000011001'
template <bool DELIMITED_TUPLES>
template <bool PROCESS_ESCAPES>
inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
    int64_t* remaining_len, char** byte_buffer_ptr,
    char** row_end_locations, FieldLocation* field_locations,
    int* num_tuples, int* num_fields, char** next_column_start) {
  DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));

  // To parse using SSE, we:
  //  1. Load into different sse registers the different characters we need to search for
  //        tuple breaks, field breaks, escape characters
  //  2. Load 16 characters at a time into the sse register
  //  3. Use the SSE instruction to do strchr on those 16 chars, the result is a bitmask
  //  4. Compute the bitmask for tuple breaks, field breaks and escape characters.
  //  5. If there are escape characters, fix up the matching masked bits in the
  //        field/tuple mask
  //  6. Go through the mask bit by bit and write the parsed data.

  // xmm registers:
  //  - xmm_buffer: the register holding the current (16 chars) we're working on from the
  //        file
  //  - xmm_delim_search_: the delim search register.  Contains field delimiter,
  //        collection_item delim_char and tuple delimiter
  //  - xmm_escape_search_: the escape search register. Only contains escape char
  //  - xmm_delim_mask: the result of doing strchr for the delimiters
  //  - xmm_escape_mask: the result of doing strchr for the escape char
  __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;

  while (LIKELY(*remaining_len >= SSEUtil::CHARS_PER_128_BIT_REGISTER)) {
    // Load the next 16 bytes into the xmm register
    xmm_buffer = _mm_loadu_si128(reinterpret_cast<__m128i*>(*byte_buffer_ptr));

    // Do the strchr for tuple and field breaks
    // The strchr sse instruction returns the result in the lower bits of the sse
    // register.  Since we only process 16 characters at a time, only the lower 16 bits
    // can contain non-zero values.
    // _mm_extract_epi16 will extract 16 bits out of the xmm register.  The second
    // parameter specifies which 16 bits to extract (0 for the lowest 16 bits).
    xmm_delim_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_delim_search_,
        num_delims_, xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
    uint16_t delim_mask = _mm_extract_epi16(xmm_delim_mask, 0);

    uint16_t escape_mask = 0;
    // If the table does not use escape characters, skip processing for it.
    if (PROCESS_ESCAPES) {
      DCHECK(escape_char_ != '\0');
      xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
          xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
      escape_mask = _mm_extract_epi16(xmm_escape_mask, 0);
      ProcessEscapeMask(escape_mask, &last_char_is_escape_, &delim_mask);
    }

    char* last_char = *byte_buffer_ptr + 15;
    bool last_char_is_unescaped_delim = delim_mask >> 15;
    if (DELIMITED_TUPLES) {
      unfinished_tuple_ = !(last_char_is_unescaped_delim &&
          (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
    }

    int last_col_idx = 0;
    // Process all non-zero bits in the delim_mask from lsb->msb.  If a bit
    // is set, the character in that spot is either a field or tuple delimiter.
    while (delim_mask != 0) {
      // ffs is a libc function that returns the index of the first set bit (1-indexed)
      int n = ffs(delim_mask) - 1;
      DCHECK_GE(n, 0);
      DCHECK_LT(n, 16);
      // clear current bit
      delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);

      if (PROCESS_ESCAPES) {
        // Determine if there was an escape character between [last_col_idx, n]
        bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
        current_column_has_escape_ |= escaped;
        last_col_idx = n;
      }

      char* delim_ptr = *byte_buffer_ptr + n;

      if (IsFieldOrCollectionItemDelimiter(*delim_ptr)) {
        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
            next_column_start, num_fields, field_locations));
        continue;
      }

      if (DELIMITED_TUPLES &&
          (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r'))) {
        if (UNLIKELY(
                last_row_delim_offset_ == *remaining_len - n && *delim_ptr == '\n')) {
          // If the row ended in \r\n then move the next start past the \n
          ++*next_column_start;
          last_row_delim_offset_ = -1;
          continue;
        }
        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
            next_column_start, num_fields, field_locations));
        Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
        DCHECK(status.ok());
        column_idx_ = num_partition_keys_;
        row_end_locations[*num_tuples] = delim_ptr;
        ++(*num_tuples);
        // Remember where we saw the last \r.
        last_row_delim_offset_ = *delim_ptr == '\r' ? *remaining_len - n - 1 : -1;
        if (UNLIKELY(*num_tuples == max_tuples)) {
          (*byte_buffer_ptr) += (n + 1);
          if (PROCESS_ESCAPES) last_char_is_escape_ = false;
          *remaining_len -= (n + 1);
          // If the last character we processed was \r then set the offset to 0
          // so that we will use it at the beginning of the next batch.
          if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
          return Status::OK();
        }
      }
    }

    if (PROCESS_ESCAPES) {
      // Determine if there was an escape character between (last_col_idx, 15)
      bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
      current_column_has_escape_ |= unprocessed_escape;
    }

    *remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
    *byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
  }
  return Status::OK();
}

/// Simplified version of ParseSSE which does not handle tuple delimiters.
template<>
template <bool PROCESS_ESCAPES>
inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len,
    char* buffer, FieldLocation* field_locations, int* num_fields) {
  char* next_column_start = buffer;
  __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;

  column_idx_ = num_partition_keys_;
  current_column_has_escape_ = false;
  if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) {
    while (LIKELY(remaining_len >= SSEUtil::CHARS_PER_128_BIT_REGISTER)) {
      // Load the next 16 bytes into the xmm register
      xmm_buffer = _mm_loadu_si128(reinterpret_cast<__m128i*>(buffer));

      xmm_delim_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_delim_search_,
          num_delims_, xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
      uint16_t delim_mask = _mm_extract_epi16(xmm_delim_mask, 0);

      uint16_t escape_mask = 0;
      // If the table does not use escape characters, skip processing for it.
      if (PROCESS_ESCAPES) {
        DCHECK(escape_char_ != '\0');
        xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
            xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
        escape_mask = _mm_extract_epi16(xmm_escape_mask, 0);
        ProcessEscapeMask(escape_mask, &last_char_is_escape_, &delim_mask);
      }

      int last_col_idx = 0;
      // Process all non-zero bits in the delim_mask from lsb->msb.  If a bit
      // is set, the character in that spot is a field.
      while (delim_mask != 0) {
        // ffs is a libc function that returns the index of the first set bit (1-indexed)
        int n = ffs(delim_mask) - 1;
        DCHECK_GE(n, 0);
        DCHECK_LT(n, 16);

        if (PROCESS_ESCAPES) {
          // Determine if there was an escape character between [last_col_idx, n]
          bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
          current_column_has_escape_ |= escaped;
          last_col_idx = n;
        }

        // clear current bit
        delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);

        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer + n - next_column_start,
            &next_column_start, num_fields, field_locations));
      }

      if (PROCESS_ESCAPES) {
        // Determine if there was an escape character between (last_col_idx, 15)
        bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
        current_column_has_escape_ |= unprocessed_escape;
      }

      remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
      buffer += SSEUtil::CHARS_PER_128_BIT_REGISTER;
    }
  }

  while (remaining_len > 0) {
    if (*buffer == escape_char_) {
      current_column_has_escape_ = true;
      last_char_is_escape_ = !last_char_is_escape_;
    } else {
      last_char_is_escape_ = false;
    }

    if (!last_char_is_escape_ && IsFieldOrCollectionItemDelimiter(*buffer)) {
      RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer - next_column_start,
          &next_column_start, num_fields, field_locations));
    }

    --remaining_len;
    ++buffer;
  }

  // Last column does not have a delimiter after it.  Add that column and also
  // pad with empty cols if the input is ragged.
  return FillColumns<PROCESS_ESCAPES>(buffer - next_column_start,
      &next_column_start, num_fields, field_locations);
}

}

#endif
