blob: 7db65fd0d187c1be7a37c5be4141e0a46257b5e3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/delimited-text-parser.inline.h"
#include "exec/hdfs-scanner.h"
#include "util/cpu-info.h"
#include "common/names.h"
using namespace impala;
template<bool DELIMITED_TUPLES>
DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
int num_cols, int num_partition_keys, const bool* is_materialized_col,
char tuple_delim, char field_delim, char collection_item_delim, char escape_char)
: is_materialized_col_(is_materialized_col),
num_tuple_delims_(0),
num_delims_(0),
num_cols_(num_cols),
num_partition_keys_(num_partition_keys),
column_idx_(0),
last_row_delim_offset_(-1),
field_delim_(field_delim),
process_escapes_(escape_char != '\0'),
escape_char_(escape_char),
collection_item_delim_(collection_item_delim),
tuple_delim_(tuple_delim),
current_column_has_escape_(false),
last_char_is_escape_(false),
unfinished_tuple_(false){
// Escape character should not be the same as tuple or col delim unless it is the
// empty delimiter.
DCHECK(escape_char == '\0' || escape_char != tuple_delim);
DCHECK(escape_char == '\0' || escape_char != field_delim);
DCHECK(escape_char == '\0' || escape_char != collection_item_delim);
// Initialize the sse search registers.
char search_chars[SSEUtil::CHARS_PER_128_BIT_REGISTER];
memset(search_chars, 0, sizeof(search_chars));
if (process_escapes_) {
search_chars[0] = escape_char_;
xmm_escape_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
// To process escape characters, we need to check if there was an escape
// character between (col_start,col_end). The SSE instructions return
// a bit mask for 16 bits so we need to mask off the bits below col_start
// and after col_end.
low_mask_[0] = 0xffff;
high_mask_[15] = 0xffff;
for (int i = 1; i < 16; ++i) {
low_mask_[i] = low_mask_[i - 1] << 1;
}
for (int i = 14; i >= 0; --i) {
high_mask_[i] = high_mask_[i + 1] >> 1;
}
} else {
memset(high_mask_, 0, sizeof(high_mask_));
memset(low_mask_, 0, sizeof(low_mask_));
}
if (DELIMITED_TUPLES) {
search_chars[num_delims_++] = tuple_delim_;
++num_tuple_delims_;
// Hive will treats \r (^M) as an alternate tuple delimiter, but \r\n is a
// single tuple delimiter.
if (tuple_delim_ == '\n') {
search_chars[num_delims_++] = '\r';
++num_tuple_delims_;
}
xmm_tuple_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
if (field_delim_ != tuple_delim_) search_chars[num_delims_++] = field_delim_;
} else {
search_chars[num_delims_++] = field_delim_;
}
if (collection_item_delim != '\0') search_chars[num_delims_++] = collection_item_delim_;
DCHECK_GT(num_delims_, 0);
xmm_delim_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
ParserReset();
}
template
DelimitedTextParser<true>::DelimitedTextParser(
int num_cols, int num_partition_keys, const bool* is_materialized_col,
char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
template
DelimitedTextParser<false>::DelimitedTextParser(
int num_cols, int num_partition_keys, const bool* is_materialized_col,
char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
template<bool DELIMITED_TUPLES>
void DelimitedTextParser<DELIMITED_TUPLES>::ParserReset() {
current_column_has_escape_ = false;
last_char_is_escape_ = false;
last_row_delim_offset_ = -1;
column_idx_ = num_partition_keys_;
}
template void DelimitedTextParser<true>::ParserReset();
// Parsing raw csv data into FieldLocation descriptors.
template<bool DELIMITED_TUPLES>
Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples,
int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start) {
// Start of this batch.
*next_column_start = *byte_buffer_ptr;
// If there was a '\r' at the end of the last batch, set the offset to
// just before the beginning. Otherwise make it invalid.
if (last_row_delim_offset_ == 0) {
last_row_delim_offset_ = remaining_len;
} else {
last_row_delim_offset_ = -1;
}
if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
if (process_escapes_) {
RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr,
row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
} else {
RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr,
row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
}
}
if (*num_tuples == max_tuples) return Status::OK();
// Handle the remaining characters
while (remaining_len > 0) {
bool new_tuple = false;
bool new_col = false;
if (DELIMITED_TUPLES) unfinished_tuple_ = true;
if (!last_char_is_escape_) {
if (DELIMITED_TUPLES && (**byte_buffer_ptr == tuple_delim_ ||
(tuple_delim_ == '\n' && **byte_buffer_ptr == '\r'))) {
new_tuple = true;
new_col = true;
} else if (**byte_buffer_ptr == field_delim_
|| **byte_buffer_ptr == collection_item_delim_) {
new_col = true;
}
}
if (process_escapes_ && **byte_buffer_ptr == escape_char_) {
current_column_has_escape_ = true;
last_char_is_escape_ = !last_char_is_escape_;
} else {
last_char_is_escape_ = false;
}
if (new_tuple) {
if (last_row_delim_offset_ == remaining_len && **byte_buffer_ptr == '\n') {
// If the row ended in \r\n then move to the \n
++*next_column_start;
} else {
RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
DCHECK(status.ok());
column_idx_ = num_partition_keys_;
row_end_locations[*num_tuples] = *byte_buffer_ptr;
++(*num_tuples);
}
DCHECK(DELIMITED_TUPLES);
unfinished_tuple_ = false;
last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1;
if (*num_tuples == max_tuples) {
++*byte_buffer_ptr;
--remaining_len;
if (last_row_delim_offset_ == remaining_len) last_row_delim_offset_ = 0;
return Status::OK();
}
} else if (new_col) {
RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
}
--remaining_len;
++*byte_buffer_ptr;
}
// For formats that store the length of the row, the row is not delimited:
// e.g. Sequence files.
if (!DELIMITED_TUPLES) {
DCHECK_EQ(remaining_len, 0);
RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
DCHECK(status.ok());
column_idx_ = num_partition_keys_;
++(*num_tuples);
}
return Status::OK();
}
template
Status DelimitedTextParser<true>::ParseFieldLocations(int max_tuples,
int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start);
template
Status DelimitedTextParser<false>::ParseFieldLocations(int max_tuples,
int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start);
template<bool DELIMITED_TUPLES>
int64_t DelimitedTextParser<DELIMITED_TUPLES>::FindFirstInstance(const char* buffer,
int64_t len) {
int64_t tuple_start = 0;
const char* buffer_start = buffer;
bool found = false;
DCHECK(DELIMITED_TUPLES);
// If the last char in the previous buffer was \r then either return the start of
// this buffer or skip a \n at the beginning of the buffer.
if (last_row_delim_offset_ != -1) {
if (*buffer_start == '\n') return 1;
return 0;
}
restart:
found = false;
if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
__m128i xmm_buffer, xmm_tuple_mask;
while (len - tuple_start >= SSEUtil::CHARS_PER_128_BIT_REGISTER) {
// TODO: can we parallelize this as well? Are there multiple sse execution units?
// Load the next 16 bytes into the xmm register and do strchr for the
// tuple delimiter.
xmm_buffer = _mm_loadu_si128(reinterpret_cast<const __m128i*>(buffer));
xmm_tuple_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_tuple_search_,
num_tuple_delims_, xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0);
if (tuple_mask != 0) {
found = true;
// Find first set bit (1-based)
int i = ffs(tuple_mask);
tuple_start += i;
buffer += i;
break;
}
tuple_start += SSEUtil::CHARS_PER_128_BIT_REGISTER;
buffer += SSEUtil::CHARS_PER_128_BIT_REGISTER;
}
}
if (!found) {
for (; tuple_start < len; ++tuple_start) {
char c = *buffer++;
if (c == tuple_delim_ || (c == '\r' && tuple_delim_ == '\n')) {
++tuple_start;
found = true;
break;
}
}
}
if (!found) return -1;
if (process_escapes_) {
// Scan backwards for escape characters. We do this after
// finding the tuple break rather than during the (above)
// forward scan to make the forward scan faster. This will
// perform worse if there are many characters right before the
// tuple break that are all escape characters, but that is
// unlikely.
int num_escape_chars = 0;
int64_t before_tuple_end = tuple_start - 2;
// TODO: If scan range is split between escape character and tuple delimiter,
// before_tuple_end will be -1. Need to scan previous range for escape characters
// in this case.
for (; before_tuple_end >= 0; --before_tuple_end) {
if (buffer_start[before_tuple_end] == escape_char_) {
++num_escape_chars;
} else {
break;
}
}
// TODO: This sucks. All the preceding characters before the tuple delim were
// escape characters. We need to read from the previous block to see what to do.
if (before_tuple_end < 0) {
static bool warning_logged = false;
if (!warning_logged) {
LOG(WARNING) << "Unhandled code path. This might cause a tuple to be "
<< "skipped or repeated.";
warning_logged = true;
}
}
// An even number of escape characters means they cancel out and this tuple break
// is *not* escaped.
if (num_escape_chars % 2 != 0) goto restart;
}
if (tuple_start < len && buffer_start[tuple_start] == '\n' &&
buffer_start[tuple_start - 1] == '\r') {
// We have \r\n, move to the next character.
++tuple_start;
}
return tuple_start;
}
template
int64_t DelimitedTextParser<true>::FindFirstInstance(const char* buffer, int64_t len);