blob: 0779346b6e04344b23a6cc45a2e861749489a90f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Common/HashTable/ClearableHashSet.h>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <Columns/IColumn.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/// copied from src/Functions/array/arrayDistinct.cpp
/// but have little differences in implementation detail:
/// We shoud remain the first null value in the array
/// e.g. select arrayDistinct([1, null, 2, null, 3, 3]) should return [1, null, 2, 3]
class FunctionArrayDistinctSpark : public IFunction
{
public:
static constexpr auto name = "arrayDistinctSpark";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionArrayDistinctSpark>();
}
String getName() const override
{
return name;
}
bool isVariadic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Argument for function {} must be array but it has type {}.",
getName(), arguments[0]->getName());
/// difference: we can return Array(Nullable()) type
return std::make_shared<DataTypeArray>(array_type->getNestedType());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override;
private:
/// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess.
static constexpr size_t INITIAL_SIZE_DEGREE = 9;
template <typename T>
static bool executeNumber(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);
static bool executeString(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);
static void executeHashed(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);
};
ColumnPtr FunctionArrayDistinctSpark::executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const
{
ColumnPtr array_ptr = arguments[0].column;
const ColumnArray * array = checkAndGetColumn<ColumnArray>(array_ptr.get());
const auto & return_type = result_type;
auto res_ptr = return_type->createColumn();
ColumnArray & res = assert_cast<ColumnArray &>(*res_ptr);
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
IColumn & res_data = res.getData();
ColumnArray::Offsets & res_offsets = res.getOffsets();
const ColumnNullable * nullable_col = checkAndGetColumn<ColumnNullable>(&src_data);
const IColumn * inner_col;
if (nullable_col)
{
inner_col = &nullable_col->getNestedColumn();
}
else
{
inner_col = &src_data;
}
if (!(executeNumber<UInt8>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<UInt16>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<UInt32>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<UInt64>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Int8>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Int16>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Int32>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Int64>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Float32>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Float64>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeString(*inner_col, offsets, res_data, res_offsets, nullable_col)))
executeHashed(*inner_col, offsets, res_data, res_offsets, nullable_col);
return res_ptr;
}
template <typename T>
bool FunctionArrayDistinctSpark::executeNumber(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col)
{
const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data);
if (!src_data_concrete)
{
return false;
}
const PaddedPODArray<T> & values = src_data_concrete->getData();
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
src_null_map = &nullable_col->getNullMapData();
using Set = ClearableHashSetWithStackMemory<T, DefaultHash<T>,
INITIAL_SIZE_DEGREE>;
Set set;
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (auto curr_src_offset : src_offsets)
{
set.clear();
bool has_null = false;
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
/// difference: we should remain the first null value
if (nullable_col && (*src_null_map)[j])
{
if (has_null)
continue;
res_data_col.insertDefault();
has_null = true;
continue;
}
if (!set.find(values[j]))
{
res_data_col.insert(values[j]);
set.insert(values[j]);
}
}
res_offset += set.size() + has_null;
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
return true;
}
bool FunctionArrayDistinctSpark::executeString(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col)
{
const ColumnString * src_data_concrete = checkAndGetColumn<ColumnString>(&src_data);
if (!src_data_concrete)
return false;
using Set = ClearableHashSetWithStackMemory<StringRef, StringRefHash,
INITIAL_SIZE_DEGREE>;
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
src_null_map = &nullable_col->getNullMapData();
Set set;
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (auto curr_src_offset : src_offsets)
{
set.clear();
bool has_null = false;
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
/// difference: we should remain the first null value
if (nullable_col && (*src_null_map)[j])
{
if (has_null)
continue;
res_data_col.insertDefault();
has_null = true;
continue;
}
StringRef str_ref = src_data_concrete->getDataAt(j);
if (!set.find(str_ref))
{
set.insert(str_ref);
res_data_col.insertData(str_ref.data, str_ref.size);
}
}
res_offset += set.size() + has_null;
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
return true;
}
void FunctionArrayDistinctSpark::executeHashed(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col)
{
using Set = ClearableHashSetWithStackMemory<UInt128, UInt128TrivialHash,
INITIAL_SIZE_DEGREE>;
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
src_null_map = &nullable_col->getNullMapData();
Set set;
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (auto curr_src_offset : src_offsets)
{
set.clear();
bool has_null = false;
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
/// difference: we should remain the first null value
if (nullable_col && (*src_null_map)[j])
{
if (has_null)
continue;
res_data_col.insertDefault();
has_null = true;
continue;
}
SipHash hash_function;
src_data.updateHashWithValue(j, hash_function);
UInt128 hash = hash_function.get128();
if (!set.find(hash))
{
set.insert(hash);
if (nullable_col)
res_data_col.insertFrom(*nullable_col, j);
else
res_data_col.insertFrom(src_data, j);
}
}
res_offset += set.size() + has_null;
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
}
REGISTER_FUNCTION(ArrayDistinctSpark)
{
factory.registerFunction<FunctionArrayDistinctSpark>();
}
}