blob: 3de61248175468094047952c880d5a2c6c48e913 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <string.h>
#include "HiveRowSet.h"
#include "hiveclienthelper.h"
/*************************************************************************************************
* Base HiveRowSet Class Logic
************************************************************************************************/
HiveRowSet::HiveRowSet() {
m_is_completely_read = false;
m_bytes_read = 0;
m_last_column_fetched = 0;
m_field_buffer[0] = '\0';
}
HiveRowSet::~HiveRowSet() {
}
void HiveRowSet::reset() {
m_is_completely_read = false;
m_bytes_read = 0;
m_last_column_fetched = 0;
m_field_buffer[0] = '\0';
/* Non Virtual Calls Pure Virtual Idiom */
specialized_reset(); /* Call the specialized subclass reset method */
}
void HiveRowSet::initFieldBuffer() {
/* m_field_buffer should always correspond to the field indicated by m_last_column_fetched*/
extractField(m_last_column_fetched);
}
HiveReturn HiveRowSet::getFieldDataLen(size_t column_idx, size_t* col_len, char* err_buf,
size_t err_buf_len) {
RETURN_ON_ASSERT(col_len == NULL, __FUNCTION__,
"Pointer to col_len (output) cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
*col_len = getFieldLen(column_idx);
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsCString(size_t column_idx, char* buffer, size_t buffer_len,
size_t* data_byte_size, int* is_null_value, char* err_buf,
size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(buffer_len == 0, __FUNCTION__,
"Output buffer cannot have a size of zero.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This field has already been completely fetched by a previous call*/
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
/* This value must be NULL */
*is_null_value = 1;
if (data_byte_size != NULL) {
*data_byte_size = 0;
}
buffer[0] = '\0';
} else {
/* This value has been determined not to be NULL */
*is_null_value = 0;
size_t data_total_len = getFieldLen(column_idx);
/* Cannot read more data then the total number of bytes available */
assert(data_total_len >= m_bytes_read);
size_t bytes_remaining = data_total_len - m_bytes_read; // Excludes null char
if (data_byte_size != NULL) {
/* Save the number of remaining characters to return before this fetch */
*data_byte_size = bytes_remaining;
}
/* Move pointer to the read location */
const char* src_str_ptr = m_field_buffer + m_bytes_read;
/* The total number of bytes to read (+1 null terminator) should be no more than the
* size of the field buffer */
assert(m_bytes_read + bytes_remaining + 1 <= sizeof(m_field_buffer));
/* Copy as many characters as possible from the read location */
size_t bytes_copied = safe_strncpy(buffer, src_str_ptr, min(buffer_len, bytes_remaining + 1)); // +1 for null terminator
/* bytes_copied does not count the null terminator */
m_bytes_read += bytes_copied;
if (m_bytes_read < data_total_len) {
return HIVE_SUCCESS_WITH_MORE_DATA; /* Data truncated; more data to return */
}
}
m_is_completely_read = true;
return HIVE_SUCCESS; /* All data successfully read */
}
HiveReturn HiveRowSet::getFieldAsDouble(size_t column_idx, double* buffer, int* is_null_value,
char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
/* Reset if this column was not fetched on the last attempt */
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the nullformat spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0.0;
} else {
*is_null_value = 0;
*buffer = atof(m_field_buffer);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsInt(size_t column_idx, int* buffer, int* is_null_value,
char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0;
} else {
*is_null_value = 0;
*buffer = atoi(m_field_buffer);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsLong(size_t column_idx, long* buffer, int* is_null_value,
char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0;
} else {
*is_null_value = 0;
*buffer = atol(m_field_buffer);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsULong(size_t column_idx, unsigned long* buffer,
int* is_null_value, char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0;
} else {
*is_null_value = 0;
*buffer = strtoul(m_field_buffer, NULL, 10);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsI64(size_t column_idx, int64_t* buffer, int* is_null_value,
char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0;
} else {
*is_null_value = 0;
*buffer = ATOI64(m_field_buffer);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
HiveReturn HiveRowSet::getFieldAsI64U(size_t column_idx, uint64_t* buffer, int* is_null_value,
char* err_buf, size_t err_buf_len) {
RETURN_ON_ASSERT(buffer == NULL, __FUNCTION__,
"Column data output buffer cannot be NULL.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(is_null_value == NULL, __FUNCTION__,
"Column data is_null_value (output) cannot be NULL.", err_buf, err_buf_len,
HIVE_ERROR);
RETURN_ON_ASSERT(getColumnCount() == 0, __FUNCTION__,
"Rowset contains zero columns.", err_buf, err_buf_len, HIVE_ERROR);
RETURN_ON_ASSERT(column_idx >= getColumnCount(), __FUNCTION__,
"Column index out of bounds.", err_buf, err_buf_len, HIVE_ERROR);
if (m_last_column_fetched != column_idx) {
extractField(column_idx);
m_bytes_read = 0; /* Reset the read offset if different from the last column fetched */
m_last_column_fetched = column_idx;
m_is_completely_read = false;
}
if (m_is_completely_read) {
return HIVE_NO_MORE_DATA; /* This column has already been completely fetched */
}
/* If the column data is the same as the null format spec... */
if (strcmp(getNullFormat(), m_field_buffer) == 0) {
*is_null_value = 1;
*buffer = 0;
} else {
*is_null_value = 0;
*buffer = ATOI64U(m_field_buffer);
}
m_is_completely_read = true;
return HIVE_SUCCESS;
}
/*************************************************************************************************
* HiveSerializedRowSet Subclass Definition
************************************************************************************************/
HiveSerializedRowSet::HiveSerializedRowSet() {
m_row_weak_ptr = NULL;
m_null_format_weak_ptr = NULL;
}
HiveSerializedRowSet::~HiveSerializedRowSet() {
/* Nothing to deallocate */
}
void HiveSerializedRowSet::initialize(Apache::Hadoop::Hive::Schema& schema, string& serialized_row) {
m_row_weak_ptr = &serialized_row;
/* Allocate sufficient space to prevent further resizing */
m_field_offsets.reserve(schema.fieldSchemas.size());
initializeOffsets(schema, serialized_row); // Initialize m_field_offsets
assert(m_field_offsets.size() == schema.fieldSchemas.size());
assert(schema.properties[SERIALIZATION_NULL_FORMAT].length() > 0);
m_null_format_weak_ptr = &(schema.properties[SERIALIZATION_NULL_FORMAT]);
/* Synchronize m_field_buffer and m_last_column_fetched now that extractField() works */
initFieldBuffer();
}
/* This method should never be called outside of the inherited HiveRowSet::reset() */
void HiveSerializedRowSet::specialized_reset() {
m_row_weak_ptr = NULL;
m_field_offsets.clear();
m_null_format_weak_ptr = NULL;
}
void HiveSerializedRowSet::initializeOffsets(Apache::Hadoop::Hive::Schema& schema, string& serialized_row) {
m_field_offsets.push_back(0); // There will always be at least one column
// Keep a temporary field_delim reference so we don't have to keep using the map
string& field_delim(schema.properties[FIELD_DELIM]);
assert(field_delim.length() > 0);
// Assumes that field delimiters will only be one character
size_t idx = serialized_row.find_first_of(field_delim);
while (idx != string::npos) {
// Set the field offset to the start of the following field
m_field_offsets.push_back(idx + 1);
idx = serialized_row.find_first_of(field_delim, idx + 1);
}
}
size_t HiveSerializedRowSet::getColumnCount() {
return m_field_offsets.size();
}
const char* HiveSerializedRowSet::getNullFormat() {
assert(m_null_format_weak_ptr != NULL);
return m_null_format_weak_ptr->c_str();
}
size_t HiveSerializedRowSet::getFieldLen(size_t column_idx) {
assert(column_idx < getColumnCount());
assert(m_row_weak_ptr != NULL);
size_t len;
// If this is the last column...
if (column_idx == getColumnCount() - 1) {
assert(m_row_weak_ptr->length() >= m_field_offsets[column_idx]);
len = m_row_weak_ptr->length() - m_field_offsets[column_idx];
} else {
assert(m_field_offsets[column_idx + 1] > m_field_offsets[column_idx]);
len = m_field_offsets[column_idx + 1] - m_field_offsets[column_idx] - 1;
}
/* Enforce the constraint that no data exceed MAX_BYTE_LENGTH */
len = min(len, (size_t) MAX_BYTE_LENGTH);
return len;
}
void HiveSerializedRowSet::extractField(size_t column_idx) {
assert(column_idx < getColumnCount());
assert(m_row_weak_ptr != NULL);
/* The field buffer should always be large enough to hold the field */
assert(getFieldLen(column_idx) < sizeof(m_field_buffer));
/* Just safety precaution to prevent buffer overflow */
/* Reduce buffer size by one to save space for null terminator */
size_t extract_len = min(getFieldLen(column_idx), sizeof(m_field_buffer) - 1);
size_t copied = m_row_weak_ptr->copy(m_field_buffer, extract_len, m_field_offsets[column_idx]);
assert(copied == extract_len);
/* Make sure the buffer is null terminated */
m_field_buffer[extract_len] = '\0';
}
/*************************************************************************************************
* HiveStringVectorRowSet Subclass Definition
************************************************************************************************/
HiveStringVectorRowSet::HiveStringVectorRowSet() {
m_fields_weak_ptr = NULL;
m_null_format_weak_ptr = NULL;
}
HiveStringVectorRowSet::~HiveStringVectorRowSet() {
/* Nothing to deallocate */
}
void HiveStringVectorRowSet::initialize(Apache::Hadoop::Hive::Schema& schema, vector<string>* fields) {
assert(fields != NULL);
m_fields_weak_ptr = fields;
assert(schema.properties[SERIALIZATION_NULL_FORMAT].length() > 0);
m_null_format_weak_ptr = &(schema.properties[SERIALIZATION_NULL_FORMAT]);
/* Synchronize m_field_buffer and m_last_column_fetched now that extractField() works */
initFieldBuffer();
}
/* This method should never be called outside of the inherited HiveRowSet::reset() */
void HiveStringVectorRowSet::specialized_reset() {
m_fields_weak_ptr = NULL;
m_null_format_weak_ptr = NULL;
}
size_t HiveStringVectorRowSet::getColumnCount() {
assert(m_fields_weak_ptr != NULL);
return m_fields_weak_ptr->size();
}
const char* HiveStringVectorRowSet::getNullFormat() {
assert(m_null_format_weak_ptr != NULL);
return m_null_format_weak_ptr->c_str();
}
size_t HiveStringVectorRowSet::getFieldLen(size_t column_idx) {
assert(column_idx < getColumnCount());
assert(m_fields_weak_ptr != NULL);
size_t len = m_fields_weak_ptr->at(column_idx).length();
/* Enforce the constraint that no data exceed MAX_BYTE_LENGTH */
len = min(len, (size_t) MAX_BYTE_LENGTH);
return len;
}
void HiveStringVectorRowSet::extractField(size_t column_idx) {
assert(column_idx < getColumnCount());
assert(m_fields_weak_ptr != NULL);
safe_strncpy(m_field_buffer, m_fields_weak_ptr->at(column_idx).c_str(), sizeof(m_field_buffer));
}