blob: 0da8ed51c2f7b52a2b901fa708a153f558d4b866 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <chrono>
#include <string>
#include <unordered_map>
namespace doris {
// Helper class for tracking metrics in data consumers
class ConsumerMetrics {
public:
ConsumerMetrics() = default;
// Track a single message fetch operation
void track_get_msg(int64_t latency_us);
// Track consumed bytes
void track_consume_bytes(int64_t bytes);
// Track consumed rows
void track_consume_rows(int64_t rows);
// Get accumulated metrics
int64_t get_msg_count() const { return _get_msg_count; }
int64_t consume_bytes() const { return _consume_bytes; }
int64_t consume_rows() const { return _consume_rows; }
private:
int64_t _get_msg_count = 0;
int64_t _consume_bytes = 0;
int64_t _consume_rows = 0;
};
// Helper class for retry logic with configurable backoff
class RetryPolicy {
public:
explicit RetryPolicy(int max_retries = 3, int backoff_ms = 200)
: _max_retries(max_retries), _backoff_ms(backoff_ms), _retry_count(0) {}
// Check if should retry
bool should_retry() const { return _retry_count < _max_retries; }
// Increment retry count and sleep with backoff
void retry_with_backoff();
// Reset retry counter (call on success)
void reset() { _retry_count = 0; }
// Get current retry count
int retry_count() const { return _retry_count; }
private:
int _max_retries;
int _backoff_ms;
int _retry_count;
};
// Helper class for exponential backoff (used for throttling)
class ThrottleBackoff {
public:
explicit ThrottleBackoff(int initial_backoff_ms = 1000, int max_backoff_ms = 10000)
: _initial_backoff_ms(initial_backoff_ms),
_max_backoff_ms(max_backoff_ms),
_throttle_count(0) {}
// Increment throttle count and sleep with exponential backoff
void backoff_and_sleep();
// Reset throttle counter (call on success)
void reset() { _throttle_count = 0; }
// Get current throttle count
int throttle_count() const { return _throttle_count; }
private:
int _initial_backoff_ms;
int _max_backoff_ms;
int _throttle_count;
};
// Helper class for comparing custom properties between consumers
class PropertyMatcher {
public:
// Check if two property maps match exactly
template <typename MapType1, typename MapType2>
static bool properties_match(const MapType1& props1, const MapType2& props2) {
if (props1.size() != props2.size()) {
return false;
}
for (const auto& [key, value] : props1) {
auto it = props2.find(key);
if (it == props2.end() || it->second != value) {
return false;
}
}
return true;
}
};
} // namespace doris