| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // This file is copied from |
| // https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionsCodingIP.cpp |
| // and modified by Doris |
| |
| #pragma once |
| #include <glog/logging.h> |
| |
| #include <cstddef> |
| #include <memory> |
| |
| #include "common/cast_set.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/columns/column_struct.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/common/format_ip.h" |
| #include "vec/common/ipv6_to_binary.h" |
| #include "vec/common/unaligned.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type.h" |
| #include "vec/data_types/data_type_ipv4.h" |
| #include "vec/data_types/data_type_ipv6.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/data_types/data_type_number.h" |
| #include "vec/data_types/data_type_string.h" |
| #include "vec/data_types/data_type_struct.h" |
| #include "vec/functions/function.h" |
| #include "vec/functions/function_helpers.h" |
| #include "vec/runtime/ip_address_cidr.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| class FunctionIPv4NumToString : public IFunction { |
| private: |
| template <PrimitiveType ArgPType> |
| Status execute_type(Block& block, const ColumnWithTypeAndName& argument, size_t result) const { |
| using ColumnType = ColumnVector<ArgPType>; |
| const ColumnPtr& column = argument.column; |
| |
| const auto* col = assert_cast<const ColumnType*>(column.get()); |
| const typename ColumnType::Container& vec_in = col->get_data(); |
| auto col_res = ColumnString::create(); |
| |
| ColumnString::Chars& vec_res = col_res->get_chars(); |
| ColumnString::Offsets& offsets_res = col_res->get_offsets(); |
| |
| vec_res.resize(vec_in.size() * |
| (IPV4_MAX_TEXT_LENGTH + 1)); /// the longest value is: 255.255.255.255\0 |
| offsets_res.resize(vec_in.size()); |
| char* begin = reinterpret_cast<char*>(vec_res.data()); |
| char* pos = begin; |
| |
| auto null_map = ColumnUInt8::create(vec_in.size(), 0); |
| size_t src_size = |
| std::min(sizeof(typename PrimitiveTypeTraits<ArgPType>::CppType), (unsigned long)4); |
| for (size_t i = 0; i < vec_in.size(); ++i) { |
| auto value = vec_in[i]; |
| if (value < IPV4_MIN_NUM_VALUE || value > IPV4_MAX_NUM_VALUE) { |
| null_map->get_data()[i] = 1; |
| } else { |
| format_ipv4(reinterpret_cast<const unsigned char*>(&vec_in[i]), src_size, pos); |
| } |
| offsets_res[i] = cast_set<uint32_t>(pos - begin); |
| } |
| |
| vec_res.resize(pos - begin); |
| block.replace_by_position(result, |
| ColumnNullable::create(std::move(col_res), std::move(null_map))); |
| return Status::OK(); |
| } |
| |
| public: |
| static constexpr auto name = "ipv4_num_to_string"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv4NumToString>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeString>()); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| ColumnWithTypeAndName& argument = block.get_by_position(arguments[0]); |
| |
| switch (argument.type->get_primitive_type()) { |
| case PrimitiveType::TYPE_TINYINT: |
| return execute_type<TYPE_TINYINT>(block, argument, result); |
| break; |
| case PrimitiveType::TYPE_SMALLINT: |
| return execute_type<TYPE_SMALLINT>(block, argument, result); |
| break; |
| case PrimitiveType::TYPE_INT: |
| return execute_type<TYPE_INT>(block, argument, result); |
| break; |
| case PrimitiveType::TYPE_BIGINT: |
| return execute_type<TYPE_BIGINT>(block, argument, result); |
| break; |
| default: |
| break; |
| } |
| |
| return Status::InternalError( |
| "Illegal column {} of argument of function {}, expected Int8 or Int16 or Int32 or " |
| "Int64", |
| argument.name, get_name()); |
| } |
| }; |
| |
| /// Since IPExceptionMode means wider scope, we use more specific name here. |
| enum class IPConvertExceptionMode : uint8_t { Throw, Default, Null }; |
| |
| static inline bool try_parse_ipv4(const char* pos, Int64& result_value) { |
| return parse_ipv4_whole(pos, reinterpret_cast<unsigned char*>(&result_value)); |
| } |
| |
| template <IPConvertExceptionMode exception_mode, typename ToColumn> |
| ColumnPtr convert_to_ipv4(ColumnPtr column, const PaddedPODArray<UInt8>* null_map = nullptr) { |
| const auto* column_string = assert_cast<const ColumnString*>(column.get()); |
| |
| size_t column_size = column_string->size(); |
| |
| ColumnUInt8::MutablePtr col_null_map_to; |
| ColumnUInt8::Container* vec_null_map_to = nullptr; |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| col_null_map_to = ColumnUInt8::create(column_size, false); |
| vec_null_map_to = &col_null_map_to->get_data(); |
| } |
| |
| auto col_res = ToColumn::create(); |
| |
| auto& vec_res = col_res->get_data(); |
| vec_res.resize(column_size); |
| |
| const ColumnString::Chars& vec_src = column_string->get_chars(); |
| const ColumnString::Offsets& offsets_src = column_string->get_offsets(); |
| size_t prev_offset = 0; |
| |
| for (size_t i = 0; i < vec_res.size(); ++i) { |
| if (null_map && (*null_map)[i]) { |
| if constexpr (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception( |
| ErrorCode::INVALID_ARGUMENT, |
| "Null Input, you may consider convert it to a valid default IPv4 value " |
| "like '0.0.0.0' first"); |
| } |
| vec_res[i] = 0; |
| prev_offset = offsets_src[i]; |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| (*vec_null_map_to)[i] = true; |
| } |
| continue; |
| } |
| const char* src_start = reinterpret_cast<const char*>(&vec_src[prev_offset]); |
| size_t src_length = (i < vec_res.size() - 1) ? (offsets_src[i] - prev_offset) |
| : (vec_src.size() - prev_offset); |
| std::string src(src_start, src_length); |
| bool parse_result = try_parse_ipv4(src.c_str(), vec_res[i]); |
| |
| if (!parse_result) { |
| if constexpr (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Invalid IPv4 value"); |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Default) { |
| vec_res[i] = 0; |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| (*vec_null_map_to)[i] = true; |
| vec_res[i] = 0; |
| } |
| } |
| |
| prev_offset = offsets_src[i]; |
| } |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| return ColumnNullable::create(std::move(col_res), std::move(col_null_map_to)); |
| } |
| return col_res; |
| } |
| |
| template <IPConvertExceptionMode exception_mode> |
| class FunctionIPv4StringToNum : public IFunction { |
| public: |
| static constexpr auto name = exception_mode == IPConvertExceptionMode::Throw |
| ? "ipv4_string_to_num" |
| : (exception_mode == IPConvertExceptionMode::Default |
| ? "ipv4_string_to_num_or_default" |
| : "ipv4_string_to_num_or_null"); |
| |
| static FunctionPtr create() { |
| return std::make_shared<FunctionIPv4StringToNum<exception_mode>>(); |
| } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| auto result_type = std::make_shared<DataTypeInt64>(); |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| return make_nullable(result_type); |
| } |
| |
| return result_type; |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| ColumnPtr column = block.get_by_position(arguments[0]).column; |
| ColumnPtr null_map_column; |
| const NullMap* null_map = nullptr; |
| if (column->is_nullable()) { |
| const auto* column_nullable = assert_cast<const ColumnNullable*>(column.get()); |
| column = column_nullable->get_nested_column_ptr(); |
| null_map_column = column_nullable->get_null_map_column_ptr(); |
| null_map = &column_nullable->get_null_map_data(); |
| } |
| |
| auto col_res = convert_to_ipv4<exception_mode, ColumnInt64>(column, null_map); |
| |
| if (null_map && exception_mode == IPConvertExceptionMode::Null) { |
| block.replace_by_position( |
| result, ColumnNullable::create(std::move(col_res), std::move(null_map_column))); |
| } else { |
| block.replace_by_position(result, std::move(col_res)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename T> |
| void process_ipv6_column(const ColumnPtr& column, size_t input_rows_count, |
| ColumnString::Chars& vec_res, ColumnString::Offsets& offsets_res, |
| ColumnUInt8::MutablePtr& null_map, unsigned char* ipv6_address_data) { |
| auto* begin = reinterpret_cast<char*>(vec_res.data()); |
| auto* pos = begin; |
| |
| const auto* col = assert_cast<const T*>(column.get()); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| bool is_empty = false; |
| |
| if constexpr (std::is_same_v<T, ColumnIPv6>) { |
| const auto& vec_in = col->get_data(); |
| memcpy(ipv6_address_data, reinterpret_cast<const unsigned char*>(&vec_in[i]), |
| IPV6_BINARY_LENGTH); |
| } else { // ColumnString |
| const auto str_ref = col->get_data_at(i); |
| const char* value = str_ref.data; |
| size_t value_size = str_ref.size; |
| |
| if (value_size > IPV6_BINARY_LENGTH || value == nullptr || value_size == 0) { |
| is_empty = true; |
| } else { |
| memcpy(ipv6_address_data, value, value_size); |
| memset(ipv6_address_data + value_size, 0, IPV6_BINARY_LENGTH - value_size); |
| } |
| } |
| |
| if (is_empty) { |
| null_map->get_data()[i] = 1; |
| } else { |
| if constexpr (std::is_same_v<T, ColumnIPv6>) { |
| // ipv6 is little-endian byte order storage in doris |
| // so parsing ipv6 in little-endian byte order |
| format_ipv6(ipv6_address_data, pos); |
| } else { |
| // 16 bytes ipv6 string is big-endian byte order storage in doris |
| // so transfer to little-endian firstly |
| std::reverse(ipv6_address_data, ipv6_address_data + IPV6_BINARY_LENGTH); |
| format_ipv6(ipv6_address_data, pos); |
| } |
| } |
| offsets_res[i] = cast_set<uint32_t>(pos - begin); |
| } |
| } |
| |
| class FunctionIPv6NumToString : public IFunction { |
| public: |
| static constexpr auto name = "ipv6_num_to_string"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv6NumToString>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeString>()); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const ColumnPtr& column = block.get_by_position(arguments[0]).column; |
| |
| auto col_res = ColumnString::create(); |
| ColumnString::Chars& vec_res = col_res->get_chars(); |
| ColumnString::Offsets& offsets_res = col_res->get_offsets(); |
| vec_res.resize(input_rows_count * (IPV6_MAX_TEXT_LENGTH + 1)); |
| offsets_res.resize(input_rows_count); |
| |
| auto null_map = ColumnUInt8::create(input_rows_count, 0); |
| |
| unsigned char ipv6_address_data[IPV6_BINARY_LENGTH]; |
| |
| if (check_and_get_column<ColumnIPv6>(column.get())) { |
| process_ipv6_column<ColumnIPv6>(column, input_rows_count, vec_res, offsets_res, |
| null_map, ipv6_address_data); |
| } else { //ColumnString |
| process_ipv6_column<ColumnString>(column, input_rows_count, vec_res, offsets_res, |
| null_map, ipv6_address_data); |
| } |
| vec_res.resize(offsets_res[offsets_res.size() - 1]); |
| |
| block.replace_by_position(result, |
| ColumnNullable::create(std::move(col_res), std::move(null_map))); |
| return Status::OK(); |
| } |
| }; |
| |
| namespace detail { |
| template <IPConvertExceptionMode exception_mode, typename ToColumn = ColumnIPv6, |
| typename StringColumnType> |
| ColumnPtr convert_to_ipv6(const StringColumnType& string_column, |
| const PaddedPODArray<UInt8>* null_map = nullptr) { |
| const size_t column_size = string_column.size(); |
| |
| ColumnUInt8::MutablePtr col_null_map_to; |
| ColumnUInt8::Container* vec_null_map_to = nullptr; |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| col_null_map_to = ColumnUInt8::create(column_size, false); |
| vec_null_map_to = &col_null_map_to->get_data(); |
| } |
| |
| auto column_create = [](size_t column_size) -> typename ToColumn::MutablePtr { |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| auto column_string = ColumnString::create(); |
| column_string->get_chars().reserve(column_size * IPV6_BINARY_LENGTH); |
| column_string->get_offsets().reserve(column_size); |
| return column_string; |
| } else { |
| return ColumnIPv6::create(); |
| } |
| }; |
| |
| auto get_vector = [](auto& col_res, size_t col_size) -> decltype(auto) { |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| auto& vec_res = col_res->get_chars(); |
| vec_res.resize(col_size * IPV6_BINARY_LENGTH); |
| return (vec_res); |
| } else { |
| auto& vec_res = col_res->get_data(); |
| vec_res.resize(col_size); |
| return (vec_res); |
| } |
| }; |
| |
| auto col_res = column_create(column_size); |
| auto& vec_res = get_vector(col_res, column_size); |
| |
| using Chars = typename StringColumnType::Chars; |
| const Chars& vec_src = string_column.get_chars(); |
| |
| size_t src_offset = 0; |
| |
| /// ColumnString contains not null terminated strings. But functions parseIPv6, parseIPv4 expect null terminated string. |
| /// TODO fix this - now parseIPv6/parseIPv4 accept end iterator, so can be parsed in-place |
| std::string string_buffer; |
| |
| int offset_inc = 1; |
| ColumnString* column_string = nullptr; |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| offset_inc = IPV6_BINARY_LENGTH; |
| column_string = assert_cast<ColumnString*>(col_res.get()); |
| } |
| |
| for (size_t out_offset = 0, i = 0; i < column_size; out_offset += offset_inc, ++i) { |
| char src_ipv4_buf[sizeof("::ffff:") + IPV4_MAX_TEXT_LENGTH + 1] = "::ffff:"; |
| size_t src_next_offset = src_offset; |
| |
| const char* src_value = nullptr; |
| auto* res_value = reinterpret_cast<unsigned char*>(&vec_res[out_offset]); |
| |
| if constexpr (std::is_same_v<StringColumnType, ColumnString>) { |
| src_value = reinterpret_cast<const char*>(&vec_src[src_offset]); |
| src_next_offset = string_column.get_offsets()[i]; |
| |
| string_buffer.assign(src_value, src_next_offset - src_offset); |
| src_value = string_buffer.c_str(); |
| } |
| |
| if (null_map && (*null_map)[i]) { |
| if (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception( |
| ErrorCode::INVALID_ARGUMENT, |
| "Null Input, you may consider convert it to a valid default IPv6 value " |
| "like '::' first"); |
| } else if (exception_mode == IPConvertExceptionMode::Default) { |
| std::fill_n(&vec_res[out_offset], offset_inc, 0); |
| } else { |
| std::fill_n(&vec_res[out_offset], offset_inc, 0); |
| (*vec_null_map_to)[i] = true; |
| } |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| DCHECK(column_string != nullptr); |
| column_string->get_offsets().push_back((i + 1) * IPV6_BINARY_LENGTH); |
| } |
| src_offset = src_next_offset; |
| continue; |
| } |
| |
| bool parse_result = false; |
| Int64 dummy_result = 0; |
| |
| /// For both cases below: In case of failure, the function parseIPv6 fills vec_res with zero bytes. |
| |
| /// If the source IP address is parsable as an IPv4 address, then transform it into a valid IPv6 address. |
| /// Keeping it simple by just prefixing `::ffff:` to the IPv4 address to represent it as a valid IPv6 address. |
| size_t string_length = src_next_offset - src_offset; |
| if (string_length != 0) { |
| if (try_parse_ipv4(src_value, dummy_result)) { |
| strncat(src_ipv4_buf, src_value, sizeof(src_ipv4_buf) - strlen(src_ipv4_buf) - 1); |
| parse_result = parse_ipv6_whole(src_ipv4_buf, res_value); |
| } else { |
| parse_result = parse_ipv6_whole(src_value, res_value); |
| } |
| } |
| |
| if (parse_result && string_length != 0) { |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| // handling 16 bytes ipv6 string in the big-endian byte order |
| // is aimed at conforming to human reading habits |
| std::reverse(res_value, res_value + IPV6_BINARY_LENGTH); |
| } |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| auto* column_string_res = assert_cast<ColumnString*>(col_res.get()); |
| std::copy(res_value, res_value + IPV6_BINARY_LENGTH, |
| column_string_res->get_chars().begin() + i * IPV6_BINARY_LENGTH); |
| column_string_res->get_offsets().push_back((i + 1) * IPV6_BINARY_LENGTH); |
| } else { |
| col_res->insert_data(reinterpret_cast<const char*>(res_value), IPV6_BINARY_LENGTH); |
| } |
| } else { |
| if (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Invalid IPv6 value"); |
| } |
| std::fill_n(&vec_res[out_offset], offset_inc, 0); |
| if constexpr (std::is_same_v<ToColumn, ColumnString>) { |
| auto* column_string_res = assert_cast<ColumnString*>(col_res.get()); |
| column_string_res->get_offsets().push_back((i + 1) * IPV6_BINARY_LENGTH); |
| } |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| (*vec_null_map_to)[i] = true; |
| } |
| } |
| src_offset = src_next_offset; |
| } |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| return ColumnNullable::create(std::move(col_res), std::move(col_null_map_to)); |
| } |
| return col_res; |
| } |
| } // namespace detail |
| |
| template <IPConvertExceptionMode exception_mode, typename ToColumn = ColumnIPv6> |
| ColumnPtr convert_to_ipv6(ColumnPtr column, const PaddedPODArray<UInt8>* null_map = nullptr) { |
| const auto* column_input_string = assert_cast<const ColumnString*>(column.get()); |
| auto result = detail::convert_to_ipv6<exception_mode, ToColumn>(*column_input_string, null_map); |
| return result; |
| } |
| |
| template <IPConvertExceptionMode exception_mode> |
| class FunctionIPv6StringToNum : public IFunction { |
| public: |
| static constexpr auto name = exception_mode == IPConvertExceptionMode::Throw |
| ? "ipv6_string_to_num" |
| : (exception_mode == IPConvertExceptionMode::Default |
| ? "ipv6_string_to_num_or_default" |
| : "ipv6_string_to_num_or_null"); |
| |
| static FunctionPtr create() { |
| return std::make_shared<FunctionIPv6StringToNum<exception_mode>>(); |
| } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| auto result_type = std::make_shared<DataTypeString>(); |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| return make_nullable(result_type); |
| } |
| |
| return result_type; |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| ColumnPtr column = block.get_by_position(arguments[0]).column; |
| ColumnPtr null_map_column; |
| const NullMap* null_map = nullptr; |
| |
| if (column->is_nullable()) { |
| const auto* column_nullable = assert_cast<const ColumnNullable*>(column.get()); |
| column = column_nullable->get_nested_column_ptr(); |
| null_map_column = column_nullable->get_null_map_column_ptr(); |
| null_map = &column_nullable->get_null_map_data(); |
| } |
| |
| auto col_res = convert_to_ipv6<exception_mode, ColumnString>(column, null_map); |
| |
| if (null_map && exception_mode == IPConvertExceptionMode::Null) { |
| block.replace_by_position( |
| result, ColumnNullable::create(std::move(col_res), std::move(null_map_column))); |
| } else { |
| block.replace_by_position(result, std::move(col_res)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename Type> |
| class FunctionIsIPString : public IFunction { |
| static_assert(std::is_same_v<Type, IPv4> || std::is_same_v<Type, IPv6>); |
| |
| public: |
| static constexpr auto name = std::is_same_v<Type, IPv4> ? "is_ipv4_string" : "is_ipv6_string"; |
| static FunctionPtr create() { return std::make_shared<FunctionIsIPString<Type>>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeUInt8>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& addr_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const ColumnPtr& addr_column = addr_column_with_type_and_name.column; |
| const auto* str_addr_column = assert_cast<const ColumnString*>(addr_column.get()); |
| auto col_res = ColumnUInt8::create(input_rows_count, 0); |
| auto& col_res_data = col_res->get_data(); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if constexpr (std::is_same_v<Type, IPv4>) { |
| StringRef ipv4_str = str_addr_column->get_data_at(i); |
| if (IPv4Value::is_valid_string(ipv4_str.data, ipv4_str.size)) { |
| col_res_data[i] = 1; |
| } |
| } else { |
| StringRef ipv6_str = str_addr_column->get_data_at(i); |
| if (IPv6Value::is_valid_string(ipv6_str.data, ipv6_str.size)) { |
| col_res_data[i] = 1; |
| } |
| } |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionIsIPAddressInRange : public IFunction { |
| public: |
| static constexpr auto name = "is_ip_address_in_range"; |
| static FunctionPtr create() { return std::make_shared<FunctionIsIPAddressInRange>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 2; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeUInt8>(); |
| } |
| |
| template <PrimitiveType PT, typename ColumnType> |
| void execute_impl_with_ip(size_t input_rows_count, bool addr_const, bool cidr_const, |
| const ColumnString* str_cidr_column, const ColumnPtr addr_column, |
| ColumnUInt8* col_res) const { |
| auto& col_res_data = col_res->get_data(); |
| const auto& ip_data = assert_cast<const ColumnType*>(addr_column.get())->get_data(); |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto addr_idx = index_check_const(i, addr_const); |
| auto cidr_idx = index_check_const(i, cidr_const); |
| auto cidr_data = str_cidr_column->get_data_at(cidr_idx); |
| // cidr_data maybe NULL, But the input column is nested column, so check here avoid throw exception |
| if (cidr_data.data == nullptr || cidr_data.size == 0) { |
| col_res_data[i] = 0; |
| continue; |
| } |
| const auto cidr = parse_ip_with_cidr(cidr_data.to_string_view()); |
| if constexpr (PT == PrimitiveType::TYPE_IPV4) { |
| if (cidr._address.as_v4()) { |
| col_res_data[i] = match_ipv4_subnet(ip_data[addr_idx], cidr._address.as_v4(), |
| cidr._prefix) |
| ? 1 |
| : 0; |
| } else { |
| col_res_data[i] = 0; |
| } |
| } else if constexpr (PT == PrimitiveType::TYPE_IPV6) { |
| if (cidr._address.as_v6()) { |
| col_res_data[i] = match_ipv6_subnet((uint8*)(&ip_data[addr_idx]), |
| cidr._address.as_v6(), cidr._prefix) |
| ? 1 |
| : 0; |
| } else { |
| col_res_data[i] = 0; |
| } |
| } |
| } |
| } |
| |
| Status evaluate_inverted_index( |
| const ColumnsWithTypeAndName& arguments, |
| const std::vector<vectorized::IndexFieldNameAndTypePair>& data_type_with_names, |
| std::vector<segment_v2::IndexIterator*> iterators, uint32_t num_rows, |
| segment_v2::InvertedIndexResultBitmap& bitmap_result) const override { |
| DCHECK(arguments.size() == 1); |
| DCHECK(data_type_with_names.size() == 1); |
| DCHECK(iterators.size() == 1); |
| auto* iter = iterators[0]; |
| auto data_type_with_name = data_type_with_names[0]; |
| if (iter == nullptr) { |
| return Status::OK(); |
| } |
| |
| if (!iter->get_reader()->is_bkd_index()) { |
| // Not support only bkd index |
| return Status::Error<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>( |
| "Inverted index evaluate skipped, ip range reader can only support by bkd " |
| "reader"); |
| } |
| // Get the is_ip_address_in_range from the arguments: cidr |
| const auto& cidr_column_with_type_and_name = arguments[0]; |
| // in is_ip_address_in_range param is const Field |
| ColumnPtr arg_column = cidr_column_with_type_and_name.column; |
| DataTypePtr arg_type = cidr_column_with_type_and_name.type; |
| if ((is_column_nullable(*arg_column) && !is_column_const(*remove_nullable(arg_column))) || |
| (!is_column_nullable(*arg_column) && !is_column_const(*arg_column))) { |
| // if not we should skip inverted index and evaluate in expression |
| return Status::Error<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>( |
| "Inverted index evaluate skipped, is_ip_address_in_range only support const " |
| "value"); |
| } |
| // check param type is string |
| if (!is_string_type(arg_type->get_primitive_type())) { |
| return Status::Error<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>( |
| "Inverted index evaluate skipped, is_ip_address_in_range only support string " |
| "type"); |
| } |
| // min && max ip address |
| Field min_ip, max_ip; |
| IPAddressCIDR cidr = parse_ip_with_cidr(arg_column->get_data_at(0)); |
| if (data_type_with_name.second->get_primitive_type() == TYPE_IPV4 && |
| cidr._address.as_v4()) { |
| auto range = apply_cidr_mask(cidr._address.as_v4(), cidr._prefix); |
| min_ip = Field::create_field<TYPE_IPV4>(range.first); |
| max_ip = Field::create_field<TYPE_IPV4>(range.second); |
| } else if (data_type_with_name.second->get_primitive_type() == TYPE_IPV6 && |
| cidr._address.as_v6()) { |
| auto cidr_range_ipv6_col = ColumnIPv6::create(2, 0); |
| auto& cidr_range_ipv6_data = cidr_range_ipv6_col->get_data(); |
| apply_cidr_mask(reinterpret_cast<const char*>(cidr._address.as_v6()), |
| reinterpret_cast<char*>(&cidr_range_ipv6_data[0]), |
| reinterpret_cast<char*>(&cidr_range_ipv6_data[1]), cidr._prefix); |
| min_ip = Field::create_field<TYPE_IPV6>(cidr_range_ipv6_data[0]); |
| max_ip = Field::create_field<TYPE_IPV6>(cidr_range_ipv6_data[1]); |
| } else { |
| // if here param is invalid for current column to calcute min_ip|max_ip we just return |
| return Status::Error<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>( |
| "Inverted index evaluate skipped, data type " + arg_type->get_name() + |
| " can not support this cidr " + arg_column->get_data_at(0).to_string()); |
| } |
| // apply for inverted index |
| std::shared_ptr<roaring::Roaring> null_bitmap = std::make_shared<roaring::Roaring>(); |
| |
| auto param_type = data_type_with_name.second->get_primitive_type(); |
| std::unique_ptr<segment_v2::InvertedIndexQueryParamFactory> query_param = nullptr; |
| |
| // >= min ip |
| segment_v2::InvertedIndexParam res_param; |
| res_param.column_name = data_type_with_name.first; |
| res_param.query_type = segment_v2::InvertedIndexQueryType::GREATER_EQUAL_QUERY; |
| res_param.query_value = query_param->get_value(); |
| res_param.num_rows = num_rows; |
| res_param.roaring = std::make_shared<roaring::Roaring>(); |
| RETURN_IF_ERROR(segment_v2::InvertedIndexQueryParamFactory::create_query_value( |
| param_type, &min_ip, query_param)); |
| RETURN_IF_ERROR(iter->read_from_index(&res_param)); |
| |
| // <= max ip |
| segment_v2::InvertedIndexParam max_param; |
| res_param.column_name = data_type_with_name.first; |
| res_param.query_type = segment_v2::InvertedIndexQueryType::LESS_EQUAL_QUERY; |
| res_param.query_value = query_param->get_value(); |
| res_param.num_rows = num_rows; |
| res_param.roaring = std::make_shared<roaring::Roaring>(); |
| RETURN_IF_ERROR(segment_v2::InvertedIndexQueryParamFactory::create_query_value( |
| param_type, &max_ip, query_param)); |
| RETURN_IF_ERROR(iter->read_from_index(&res_param)); |
| |
| DBUG_EXECUTE_IF("ip.inverted_index_filtered", { |
| auto req_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
| "ip.inverted_index_filtered", "req_id", 0); |
| LOG(INFO) << "execute inverted index req_id: " << req_id |
| << " min: " << res_param.roaring->cardinality(); |
| }); |
| *res_param.roaring &= *max_param.roaring; |
| DBUG_EXECUTE_IF("ip.inverted_index_filtered", { |
| auto req_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
| "ip.inverted_index_filtered", "req_id", 0); |
| LOG(INFO) << "execute inverted index req_id: " << req_id |
| << " max: " << max_param.roaring->cardinality() |
| << " result: " << res_param.roaring->cardinality(); |
| }); |
| segment_v2::InvertedIndexResultBitmap result(res_param.roaring, null_bitmap); |
| bitmap_result = result; |
| bitmap_result.mask_out_null(); |
| return Status::OK(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| DBUG_EXECUTE_IF("ip.inverted_index_filtered", { |
| auto req_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
| "ip.inverted_index_filtered", "req_id", 0); |
| return Status::Error<ErrorCode::INTERNAL_ERROR>( |
| "{} has already execute inverted index req_id {} , should not execute expr " |
| "with rows: {}", |
| get_name(), req_id, input_rows_count); |
| }); |
| const auto& addr_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const auto& cidr_column_with_type_and_name = block.get_by_position(arguments[1]); |
| const auto& [addr_column, addr_const] = |
| unpack_if_const(addr_column_with_type_and_name.column); |
| const auto& [cidr_column, cidr_const] = |
| unpack_if_const(cidr_column_with_type_and_name.column); |
| |
| auto col_res = ColumnUInt8::create(input_rows_count, 0); |
| auto& col_res_data = col_res->get_data(); |
| |
| if (addr_column_with_type_and_name.type->get_primitive_type() == TYPE_IPV4) { |
| execute_impl_with_ip<PrimitiveType::TYPE_IPV4, ColumnIPv4>( |
| input_rows_count, addr_const, cidr_const, |
| assert_cast<const ColumnString*>(cidr_column.get()), addr_column, |
| col_res.get()); |
| } else if (addr_column_with_type_and_name.type->get_primitive_type() == TYPE_IPV6) { |
| execute_impl_with_ip<PrimitiveType::TYPE_IPV6, ColumnIPv6>( |
| input_rows_count, addr_const, cidr_const, |
| assert_cast<const ColumnString*>(cidr_column.get()), addr_column, |
| col_res.get()); |
| } else { |
| const auto* str_addr_column = assert_cast<const ColumnString*>(addr_column.get()); |
| const auto* str_cidr_column = assert_cast<const ColumnString*>(cidr_column.get()); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto addr_idx = index_check_const(i, addr_const); |
| auto cidr_idx = index_check_const(i, cidr_const); |
| auto addr_data = str_addr_column->get_data_at(addr_idx); |
| auto cidr_data = str_cidr_column->get_data_at(cidr_idx); |
| // cidr_data maybe NULL, But the input column is nested column, so check here avoid throw exception |
| if (cidr_data.data == nullptr || cidr_data.size == 0) { |
| col_res_data[i] = 0; |
| continue; |
| } |
| const auto addr = IPAddressVariant(addr_data.to_string_view()); |
| const auto cidr = parse_ip_with_cidr(cidr_data.to_string_view()); |
| col_res_data[i] = is_address_in_range(addr, cidr) ? 1 : 0; |
| } |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionIPv4CIDRToRange : public IFunction { |
| public: |
| static constexpr auto name = "ipv4_cidr_to_range"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv4CIDRToRange>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 2; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| DataTypePtr element = std::make_shared<DataTypeIPv4>(); |
| return std::make_shared<DataTypeStruct>(DataTypes {element, element}, |
| Strings {"min", "max"}); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| ColumnWithTypeAndName& ip_column = block.get_by_position(arguments[0]); |
| ColumnWithTypeAndName& cidr_column = block.get_by_position(arguments[1]); |
| |
| const auto& [ip_column_ptr, ip_col_const] = unpack_if_const(ip_column.column); |
| const auto& [cidr_column_ptr, cidr_col_const] = unpack_if_const(cidr_column.column); |
| |
| const auto* col_ip_column = assert_cast<const ColumnIPv4*>(ip_column_ptr.get()); |
| const auto* col_cidr_column = assert_cast<const ColumnInt16*>(cidr_column_ptr.get()); |
| |
| const typename ColumnIPv4::Container& vec_ip_input = col_ip_column->get_data(); |
| const ColumnInt16::Container& vec_cidr_input = col_cidr_column->get_data(); |
| auto col_lower_range_output = ColumnIPv4::create(input_rows_count, 0); |
| auto col_upper_range_output = ColumnIPv4::create(input_rows_count, 0); |
| |
| ColumnIPv4::Container& vec_lower_range_output = col_lower_range_output->get_data(); |
| ColumnIPv4::Container& vec_upper_range_output = col_upper_range_output->get_data(); |
| |
| static constexpr UInt8 max_cidr_mask = IPV4_BINARY_LENGTH * 8; |
| |
| if (ip_col_const) { |
| auto ip = vec_ip_input[0]; |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto cidr = vec_cidr_input[i]; |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| auto range = apply_cidr_mask(ip, cast_set<UInt8>(cidr)); |
| vec_lower_range_output[i] = range.first; |
| vec_upper_range_output[i] = range.second; |
| } |
| } else if (cidr_col_const) { |
| auto cidr = vec_cidr_input[0]; |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto ip = vec_ip_input[i]; |
| auto range = apply_cidr_mask(ip, cast_set<UInt8>(cidr)); |
| vec_lower_range_output[i] = range.first; |
| vec_upper_range_output[i] = range.second; |
| } |
| } else { |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto ip = vec_ip_input[i]; |
| auto cidr = vec_cidr_input[i]; |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| auto range = apply_cidr_mask(ip, cast_set<UInt8>(cidr)); |
| vec_lower_range_output[i] = range.first; |
| vec_upper_range_output[i] = range.second; |
| } |
| } |
| |
| block.replace_by_position( |
| result, ColumnStruct::create(Columns {std::move(col_lower_range_output), |
| std::move(col_upper_range_output)})); |
| return Status::OK(); |
| } |
| }; |
| |
| /** |
| * this function accepts two arguments: an IPv6 address and a CIDR mask |
| * IPv6 address can be either ipv6 type or string type as ipv6 string address |
| * FE: PropagateNullable is used to handle nullable columns |
| */ |
| class FunctionIPv6CIDRToRange : public IFunction { |
| public: |
| static constexpr auto name = "ipv6_cidr_to_range"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv6CIDRToRange>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 2; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| DataTypePtr element = std::make_shared<DataTypeIPv6>(); |
| return std::make_shared<DataTypeStruct>(DataTypes {element, element}, |
| Strings {"min", "max"}); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& addr_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const auto& cidr_column_with_type_and_name = block.get_by_position(arguments[1]); |
| const auto& [addr_column, add_col_const] = |
| unpack_if_const(addr_column_with_type_and_name.column); |
| const auto& [cidr_column, col_const] = |
| unpack_if_const(cidr_column_with_type_and_name.column); |
| |
| const auto* cidr_col = assert_cast<const ColumnInt16*>(cidr_column.get()); |
| ColumnPtr col_res = nullptr; |
| |
| if (addr_column_with_type_and_name.type->get_primitive_type() == TYPE_IPV6) { |
| const auto* ipv6_addr_column = assert_cast<const ColumnIPv6*>(addr_column.get()); |
| col_res = execute_impl(*ipv6_addr_column, *cidr_col, input_rows_count, add_col_const, |
| col_const); |
| } else if (is_string_type(addr_column_with_type_and_name.type->get_primitive_type())) { |
| ColumnPtr col_ipv6 = |
| convert_to_ipv6<IPConvertExceptionMode::Throw>(addr_column, nullptr); |
| const auto* ipv6_addr_column = assert_cast<const ColumnIPv6*>(col_ipv6.get()); |
| col_res = execute_impl(*ipv6_addr_column, *cidr_col, input_rows_count, add_col_const, |
| col_const); |
| } else { |
| return Status::RuntimeError( |
| "Illegal column {} of argument of function {}, Expected IPv6 or String", |
| addr_column->get_name(), get_name()); |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| |
| static ColumnPtr execute_impl(const ColumnIPv6& from_column, const ColumnInt16& cidr_column, |
| size_t input_rows_count, bool is_addr_const = false, |
| bool is_cidr_const = false) { |
| auto col_res_lower_range = ColumnIPv6::create(input_rows_count, 0); |
| auto col_res_upper_range = ColumnIPv6::create(input_rows_count, 0); |
| auto& vec_res_lower_range = col_res_lower_range->get_data(); |
| auto& vec_res_upper_range = col_res_upper_range->get_data(); |
| |
| static constexpr UInt8 max_cidr_mask = IPV6_BINARY_LENGTH * 8; |
| |
| if (is_addr_const) { |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto cidr = cidr_column.get_int(i); |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| apply_cidr_mask(from_column.get_data_at(0).data, |
| reinterpret_cast<char*>(&vec_res_lower_range[i]), |
| reinterpret_cast<char*>(&vec_res_upper_range[i]), |
| cast_set<UInt8>(cidr)); |
| } |
| } else if (is_cidr_const) { |
| auto cidr = cidr_column.get_int(0); |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| apply_cidr_mask(from_column.get_data_at(i).data, |
| reinterpret_cast<char*>(&vec_res_lower_range[i]), |
| reinterpret_cast<char*>(&vec_res_upper_range[i]), |
| cast_set<UInt8>(cidr)); |
| } |
| } else { |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto cidr = cidr_column.get_int(i); |
| if (cidr < 0 || cidr > max_cidr_mask) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Illegal cidr value '{}'", |
| std::to_string(cidr)); |
| } |
| apply_cidr_mask(from_column.get_data_at(i).data, |
| reinterpret_cast<char*>(&vec_res_lower_range[i]), |
| reinterpret_cast<char*>(&vec_res_upper_range[i]), |
| cast_set<UInt8>(cidr)); |
| } |
| } |
| return ColumnStruct::create( |
| Columns {std::move(col_res_lower_range), std::move(col_res_upper_range)}); |
| } |
| }; |
| |
| class FunctionIsIPv4Compat : public IFunction { |
| public: |
| static constexpr auto name = "is_ipv4_compat"; |
| static FunctionPtr create() { return std::make_shared<FunctionIsIPv4Compat>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeUInt8>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const ColumnPtr& column = block.get_by_position(arguments[0]).column; |
| const auto* col_in = assert_cast<const ColumnString*>(column.get()); |
| |
| size_t col_size = col_in->size(); |
| auto col_res = ColumnUInt8::create(col_size, 0); |
| auto& col_res_data = col_res->get_data(); |
| |
| for (size_t i = 0; i < col_size; ++i) { |
| auto ipv4_in = col_in->get_data_at(i); |
| if (is_ipv4_compat(reinterpret_cast<const UInt8*>(ipv4_in.data))) { |
| col_res_data[i] = 1; |
| } |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| |
| private: |
| static bool is_ipv4_compat(const UInt8* address) { |
| return (unaligned_load_little_endian<UInt64>(address) == 0) && |
| (unaligned_load_little_endian<UInt32>(address + 8) == 0) && |
| (unaligned_load_little_endian<UInt32>(address + 12) != 0); |
| } |
| }; |
| |
| class FunctionIsIPv4Mapped : public IFunction { |
| public: |
| static constexpr auto name = "is_ipv4_mapped"; |
| static FunctionPtr create() { return std::make_shared<FunctionIsIPv4Mapped>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeUInt8>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const ColumnPtr& column = block.get_by_position(arguments[0]).column; |
| const auto* col_in = assert_cast<const ColumnString*>(column.get()); |
| |
| size_t col_size = col_in->size(); |
| auto col_res = ColumnUInt8::create(col_size, 0); |
| auto& col_res_data = col_res->get_data(); |
| |
| for (size_t i = 0; i < col_size; ++i) { |
| auto ipv4_in = col_in->get_data_at(i); |
| if (is_ipv4_mapped(reinterpret_cast<const UInt8*>(ipv4_in.data))) { |
| col_res_data[i] = 1; |
| } |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| |
| private: |
| static bool is_ipv4_mapped(const UInt8* address) { |
| return (unaligned_load_little_endian<UInt64>(address) == 0) && |
| ((unaligned_load_little_endian<UInt64>(address + 8) & 0x00000000FFFFFFFFULL) == |
| 0x00000000FFFF0000ULL); |
| } |
| }; |
| |
| template <IPConvertExceptionMode exception_mode, PrimitiveType PType> |
| inline constexpr auto to_ip_func_name() { |
| if constexpr (PType == TYPE_IPV4) { |
| return exception_mode == IPConvertExceptionMode::Throw |
| ? "to_ipv4" |
| : (exception_mode == IPConvertExceptionMode::Default ? "to_ipv4_or_default" |
| : "to_ipv4_or_null"); |
| } else { |
| return exception_mode == IPConvertExceptionMode::Throw |
| ? "to_ipv6" |
| : (exception_mode == IPConvertExceptionMode::Default ? "to_ipv6_or_default" |
| : "to_ipv6_or_null"); |
| } |
| } |
| |
| template <IPConvertExceptionMode exception_mode, PrimitiveType PType> |
| class FunctionToIP : public IFunction { |
| static_assert(is_ip(PType)); |
| |
| public: |
| static constexpr auto name = to_ip_func_name<exception_mode, PType>(); |
| |
| static FunctionPtr create() { return std::make_shared<FunctionToIP<exception_mode, PType>>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| DataTypePtr result_type; |
| |
| if constexpr (PType == TYPE_IPV4) { |
| result_type = std::make_shared<DataTypeIPv4>(); |
| } else { |
| result_type = std::make_shared<DataTypeIPv6>(); |
| } |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| return make_nullable(result_type); |
| } else { |
| return result_type; |
| } |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& addr_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const ColumnPtr& addr_column = addr_column_with_type_and_name.column; |
| const ColumnString* str_addr_column = nullptr; |
| const NullMap* addr_null_map = nullptr; |
| |
| if (addr_column_with_type_and_name.type->is_nullable()) { |
| const auto* addr_column_nullable = |
| assert_cast<const ColumnNullable*>(addr_column.get()); |
| str_addr_column = assert_cast<const ColumnString*>( |
| addr_column_nullable->get_nested_column_ptr().get()); |
| addr_null_map = &addr_column_nullable->get_null_map_data(); |
| } else { |
| str_addr_column = assert_cast<const ColumnString*>(addr_column.get()); |
| } |
| |
| auto col_res = ColumnVector<PType>::create(input_rows_count, 0); |
| auto res_null_map = ColumnUInt8::create(input_rows_count, 0); |
| auto& col_res_data = col_res->get_data(); |
| auto& res_null_map_data = res_null_map->get_data(); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (addr_null_map && (*addr_null_map)[i]) { |
| if constexpr (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, |
| "The arguments of function {} must be String, not NULL", |
| get_name()); |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Default) { |
| col_res_data[i] = 0; // '0.0.0.0' or '::' |
| continue; |
| } else { |
| res_null_map_data[i] = 1; |
| continue; |
| } |
| } |
| |
| if constexpr (PType == TYPE_IPV4) { |
| StringRef ipv4_str = str_addr_column->get_data_at(i); |
| IPv4 ipv4_val = 0; |
| if (IPv4Value::from_string(ipv4_val, ipv4_str.data, ipv4_str.size)) { |
| col_res_data[i] = ipv4_val; |
| } else { |
| if constexpr (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Invalid IPv4 value '{}'", |
| ipv4_str.to_string_view()); |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Default) { |
| col_res_data[i] = 0; // '0.0.0.0' |
| } else { |
| res_null_map_data[i] = 1; |
| } |
| } |
| } else { |
| StringRef ipv6_str = str_addr_column->get_data_at(i); |
| IPv6 ipv6_val = 0; |
| if (IPv6Value::from_string(ipv6_val, ipv6_str.data, ipv6_str.size)) { |
| col_res_data[i] = ipv6_val; |
| } else { |
| if constexpr (exception_mode == IPConvertExceptionMode::Throw) { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, "Invalid IPv6 value '{}'", |
| ipv6_str.to_string_view()); |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Default) { |
| col_res_data[i] = 0; // '::' |
| } else if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| res_null_map_data[i] = 1; |
| } |
| } |
| } |
| } |
| |
| if constexpr (exception_mode == IPConvertExceptionMode::Null) { |
| block.replace_by_position( |
| result, ColumnNullable::create(std::move(col_res), std::move(res_null_map))); |
| } else { |
| block.replace_by_position(result, std::move(col_res)); |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionIPv4ToIPv6 : public IFunction { |
| public: |
| static constexpr auto name = "ipv4_to_ipv6"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv4ToIPv6>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeIPv6>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& ipv4_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const auto& [ipv4_column, ipv4_const] = |
| unpack_if_const(ipv4_column_with_type_and_name.column); |
| const auto* ipv4_addr_column = assert_cast<const ColumnIPv4*>(ipv4_column.get()); |
| const auto& ipv4_column_data = ipv4_addr_column->get_data(); |
| auto col_res = ColumnIPv6::create(input_rows_count, 0); |
| auto& col_res_data = col_res->get_data(); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto ipv4_idx = index_check_const(i, ipv4_const); |
| map_ipv4_to_ipv6(ipv4_column_data[ipv4_idx], |
| reinterpret_cast<UInt8*>(&col_res_data[i])); |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionCutIPv6 : public IFunction { |
| public: |
| static constexpr auto name = "cut_ipv6"; |
| static FunctionPtr create() { return std::make_shared<FunctionCutIPv6>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 3; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeString>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& ipv6_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const auto& bytes_to_cut_for_ipv6_column_with_type_and_name = |
| block.get_by_position(arguments[1]); |
| const auto& bytes_to_cut_for_ipv4_column_with_type_and_name = |
| block.get_by_position(arguments[2]); |
| |
| const auto& [ipv6_column, ipv6_const] = |
| unpack_if_const(ipv6_column_with_type_and_name.column); |
| const auto& [bytes_to_cut_for_ipv6_column, bytes_to_cut_for_ipv6_const] = |
| unpack_if_const(bytes_to_cut_for_ipv6_column_with_type_and_name.column); |
| const auto& [bytes_to_cut_for_ipv4_column, bytes_to_cut_for_ipv4_const] = |
| unpack_if_const(bytes_to_cut_for_ipv4_column_with_type_and_name.column); |
| |
| const auto* ipv6_addr_column = assert_cast<const ColumnIPv6*>(ipv6_column.get()); |
| const auto* to_cut_for_ipv6_bytes_column = |
| assert_cast<const ColumnInt8*>(bytes_to_cut_for_ipv6_column.get()); |
| const auto* to_cut_for_ipv4_bytes_column = |
| assert_cast<const ColumnInt8*>(bytes_to_cut_for_ipv4_column.get()); |
| |
| const auto& ipv6_addr_column_data = ipv6_addr_column->get_data(); |
| const auto& to_cut_for_ipv6_bytes_column_data = to_cut_for_ipv6_bytes_column->get_data(); |
| const auto& to_cut_for_ipv4_bytes_column_data = to_cut_for_ipv4_bytes_column->get_data(); |
| |
| auto col_res = ColumnString::create(); |
| ColumnString::Chars& chars_res = col_res->get_chars(); |
| ColumnString::Offsets& offsets_res = col_res->get_offsets(); |
| chars_res.resize(input_rows_count * (IPV6_MAX_TEXT_LENGTH + 1)); // + 1 for ending '\0' |
| offsets_res.resize(input_rows_count); |
| auto* begin = reinterpret_cast<char*>(chars_res.data()); |
| auto* pos = begin; |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| auto ipv6_idx = index_check_const(i, ipv6_const); |
| auto bytes_to_cut_for_ipv6_idx = index_check_const(i, bytes_to_cut_for_ipv6_const); |
| auto bytes_to_cut_for_ipv4_idx = index_check_const(i, bytes_to_cut_for_ipv4_const); |
| // the current function logic is processed in big endian manner |
| // But ipv6 in doris is stored in little-endian byte order |
| // need transfer to big-endian byte order first, so we can't deal this process in column |
| auto val_128 = ipv6_addr_column_data[ipv6_idx]; |
| auto* address = reinterpret_cast<unsigned char*>(&val_128); |
| |
| Int8 bytes_to_cut_for_ipv6_count = |
| to_cut_for_ipv6_bytes_column_data[bytes_to_cut_for_ipv6_idx]; |
| Int8 bytes_to_cut_for_ipv4_count = |
| to_cut_for_ipv4_bytes_column_data[bytes_to_cut_for_ipv4_idx]; |
| |
| if (bytes_to_cut_for_ipv6_count > IPV6_BINARY_LENGTH) [[unlikely]] { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, |
| "Illegal value for argument 2 {} of function {}", |
| bytes_to_cut_for_ipv6_column_with_type_and_name.type->get_name(), |
| get_name()); |
| } |
| |
| if (bytes_to_cut_for_ipv4_count > IPV6_BINARY_LENGTH) [[unlikely]] { |
| throw Exception(ErrorCode::INVALID_ARGUMENT, |
| "Illegal value for argument 3 {} of function {}", |
| bytes_to_cut_for_ipv4_column_with_type_and_name.type->get_name(), |
| get_name()); |
| } |
| |
| UInt8 bytes_to_cut_count = is_ipv4_mapped(address) ? bytes_to_cut_for_ipv4_count |
| : bytes_to_cut_for_ipv6_count; |
| cut_address(address, pos, bytes_to_cut_count); |
| offsets_res[i] = cast_set<uint32_t>(pos - begin); |
| } |
| |
| chars_res.resize(offsets_res[offsets_res.size() - 1]); |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| |
| private: |
| static bool is_ipv4_mapped(const UInt8* address) { |
| return (unaligned_load_little_endian<UInt64>(address + 8) == 0) && |
| ((unaligned_load_little_endian<UInt64>(address) & 0xFFFFFFFF00000000ULL) == |
| 0x0000FFFF00000000ULL); |
| } |
| |
| static void cut_address(unsigned char* address, char*& dst, UInt8 zeroed_tail_bytes_count) { |
| format_ipv6(address, dst, zeroed_tail_bytes_count); |
| } |
| }; |
| |
| class FunctionIPv6FromUInt128StringOrNull : public IFunction { |
| public: |
| static constexpr auto name = "ipv6_from_uint128_string_or_null"; |
| static FunctionPtr create() { return std::make_shared<FunctionIPv6FromUInt128StringOrNull>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeIPv6>()); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const auto& ipv6_column_with_type_and_name = block.get_by_position(arguments[0]); |
| const auto& [ipv6_column, ipv6_const] = |
| unpack_if_const(ipv6_column_with_type_and_name.column); |
| const auto* ipv6_addr_column = assert_cast<const ColumnString*>(ipv6_column.get()); |
| // result is nullable column |
| auto col_res = ColumnNullable::create(ColumnIPv6::create(input_rows_count, 0), |
| ColumnUInt8::create(input_rows_count, 1)); |
| auto& col_res_data = assert_cast<ColumnIPv6*>(&col_res->get_nested_column())->get_data(); |
| auto& res_null_map_data = col_res->get_null_map_data(); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| IPv6 ipv6 = 0; |
| auto ipv6_idx = index_check_const(i, ipv6_const); |
| StringRef uint128_string = ipv6_addr_column->get_data_at(ipv6_idx); |
| if (!IPv6Value::from_uint128_string(ipv6, uint128_string.data, uint128_string.size)) { |
| VLOG_DEBUG << "Invalid uin128 IPv6 value '" << uint128_string.to_string_view() |
| << "'"; |
| // we should set null to the result not throw exception for load senior |
| } else { |
| col_res_data[i] = ipv6; |
| res_null_map_data[i] = 0; |
| } |
| } |
| |
| block.replace_by_position(result, std::move(col_res)); |
| return Status::OK(); |
| } |
| }; |
| |
| } // namespace doris::vectorized |
| |
| #include "common/compile_check_end.h" |