blob: c6e8ae11f849b556167698832261bf5bdd303f4c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is porting from
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
// to cpp and modified by Doris
/**
* Helps in distributing big or skewed partitions across available tasks to improve the performance of
* partitioned writes.
* <p>
* This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to
* uniformly distribute partitions across those buckets. This helps to mitigate two problems:
* 1. Mitigate skewness across tasks.
* 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the
* local scaling without impacting much overall resource utilization.
* <p>
* Example:
* <p>
* Before: 3 tasks, 3 buckets per task, and 2 skewed partitions
* Task1 Task2 Task3
* Bucket1 (Part 1) Bucket1 (Part 2) Bucket1
* Bucket2 Bucket2 Bucket2
* Bucket3 Bucket3 Bucket3
* <p>
* After rebalancing:
* Task1 Task2 Task3
* Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1)
* Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2)
* Bucket3 Bucket3 Bucket3
*/
#pragma once
#include <glog/logging.h>
#include <algorithm>
#include <iostream>
#include <list>
#include <optional>
#include <vector>
#include "util/indexed_priority_queue.hpp"
namespace doris::vectorized {
class SkewedPartitionRebalancer {
private:
struct TaskBucket {
// `task_bucket_count_` is always 1.
int task_id;
int id;
TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
: task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {
DCHECK_LT(bucket_id_, task_bucket_count_);
}
bool operator==(const TaskBucket& other) const { return id == other.id; }
bool operator<(const TaskBucket& other) const { return id < other.id; }
bool operator>(const TaskBucket& other) const { return id > other.id; }
};
public:
SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count,
long min_partition_data_processed_rebalance_threshold,
long min_data_processed_rebalance_threshold);
int get_task_id(int partition_id, int64_t index);
void add_data_processed(long data_size);
void add_partition_row_count(int partition, long row_count);
void rebalance();
private:
void _calculate_partition_data_size(long data_processed);
long _calculate_task_bucket_data_size_since_last_rebalance(
IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
max_partitions);
void _rebalance_based_on_task_bucket_skewness(
IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
max_task_buckets,
IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
min_task_buckets,
std::vector<
IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
task_bucket_max_partitions);
std::vector<TaskBucket> _find_skewed_min_task_buckets(
const TaskBucket& max_task_bucket,
const IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
min_task_buckets);
bool _rebalance_partition(
int partition_id, const TaskBucket& to_task_bucket,
IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
max_task_buckets,
IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
min_task_buckets);
bool _should_rebalance(long data_processed);
void _rebalance_partitions(long data_processed);
static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7;
// One or more tasks in one partition. `_task_count` equals to the number of channels and `_task_bucket_count` is always 1.
const int _partition_count;
const int _task_count;
const int _task_bucket_count;
long _min_partition_data_processed_rebalance_threshold;
long _min_data_processed_rebalance_threshold;
std::vector<long> _partition_row_count;
long _data_processed;
long _data_processed_at_last_rebalance;
std::vector<long> _partition_data_size;
std::vector<long> _partition_data_size_at_last_rebalance;
std::vector<long> _partition_data_size_since_last_rebalance_per_task;
std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance;
std::vector<std::vector<TaskBucket>> _partition_assignments;
};
} // namespace doris::vectorized