| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| #include <memory> |
| #include <mutex> |
| #include <stack> |
| #include <jni.h> |
| #include <Core/ColumnWithTypeAndName.h> |
| #include <Core/Defines.h> |
| #include <Core/NamesAndTypes.h> |
| #include <DataTypes/Serializations/ISerialization.h> |
| #include <Interpreters/Context_fwd.h> |
| #include <Processors/Chunk.h> |
| #include <Shuffle/SelectorBuilder.h> |
| #include <Shuffle/ShuffleCommon.h> |
| #include <base/types.h> |
| #include <Common/BlockIterator.h> |
| |
| namespace local_engine |
| { |
| class NativeSplitter : BlockIterator |
| { |
| public: |
| struct Options |
| { |
| size_t buffer_size = DB::DEFAULT_BLOCK_SIZE; |
| size_t partition_num; |
| std::string exprs_buffer; |
| std::string schema_buffer; |
| std::string hash_algorithm; |
| }; |
| |
| struct Holder |
| { |
| std::unique_ptr<NativeSplitter> splitter = nullptr; |
| }; |
| |
| static jclass iterator_class; |
| static jmethodID iterator_has_next; |
| static jmethodID iterator_next; |
| static std::unique_ptr<NativeSplitter> create(const std::string & short_name, Options options, jobject input); |
| |
| NativeSplitter(Options options, jobject input); |
| virtual ~NativeSplitter(); |
| |
| bool hasNext(); |
| DB::Block * next(); |
| int32_t nextPartitionId() const; |
| |
| protected: |
| virtual void computePartitionId(DB::Block &) = 0; |
| |
| Options options; |
| PartitionInfo partition_info; |
| std::vector<size_t> output_columns_indicies; |
| DB::Block output_header; |
| |
| private: |
| void split(DB::Block & block); |
| int64_t inputNext(); |
| bool inputHasNext(); |
| |
| std::vector<std::shared_ptr<ColumnsBuffer>> partition_buffer; |
| std::stack<std::pair<int32_t, std::unique_ptr<DB::Block>>> output_buffer; |
| int32_t next_partition_id = -1; |
| jobject input; |
| }; |
| |
| class HashNativeSplitter : public NativeSplitter |
| { |
| public: |
| HashNativeSplitter(NativeSplitter::Options options_, jobject input); |
| ~HashNativeSplitter() override = default; |
| |
| private: |
| void computePartitionId(DB::Block & block) override; |
| |
| std::unique_ptr<HashSelectorBuilder> selector_builder; |
| }; |
| |
| class RoundRobinNativeSplitter : public NativeSplitter |
| { |
| public: |
| RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input); |
| ~RoundRobinNativeSplitter() override = default; |
| |
| private: |
| void computePartitionId(DB::Block & block) override; |
| |
| std::unique_ptr<RoundRobinSelectorBuilder> selector_builder; |
| }; |
| |
| class RangePartitionNativeSplitter : public NativeSplitter |
| { |
| public: |
| RangePartitionNativeSplitter(NativeSplitter::Options options_, jobject input); |
| ~RangePartitionNativeSplitter() override = default; |
| |
| private: |
| void computePartitionId(DB::Block & block) override; |
| |
| std::unique_ptr<RangeSelectorBuilder> selector_builder; |
| }; |
| |
| } |