blob: 8bd9ad7b01bc2ccb746e6ef4afca4d65400f9717 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
#include "types.h"
namespace icp {
/**
* @brief Value objects used in the Iggy protocol.
*/
namespace model {
/**
* @brief Base class for all value objects used in the Iggy protocol.
*/
class Model {
public:
virtual ~Model() = default;
};
/**
* @brief Models shared across multiple functional groups.
*/
namespace shared {
/**
* @brief Discriminator to allow correct decoding of identifier bytes.
*/
enum IdKind { NUMERIC = 1, STRING = 2 };
/**
* @brief An identifier for a stream or a topic.
*/
class Identifier : Model {
private:
const IdKind kind;
const uint8_t length;
const std::vector<unsigned char>& value;
public:
Identifier(IdKind kind, uint8_t length, const std::vector<unsigned char>& value)
: kind(kind)
, length(length)
, value(value) {}
IdKind getKind() { return kind; }
uint8_t getLength() { return length; }
const std::vector<unsigned char>& getValue() { return value; }
};
/**
* @brief Discriminator to allow correct decoding of consumer kind bytes.
*/
enum ConsumerKind { CONSUMER = 1, CONSUMER_GROUP = 2 };
/**
* @brief A consumer of a message stream; may be part of a consumer group.
*/
class Consumer : Model {
private:
const ConsumerKind kind;
const uint32_t id;
public:
Consumer(ConsumerKind kind, uint32_t id)
: kind(kind)
, id(id) {}
ConsumerKind getKind() { return kind; }
uint32_t getId() { return id; }
};
}; // namespace shared
namespace partition {
/**
* @brief Metadata describing a partition of a topic.
*/
class Partition : Model {
private:
const uint32_t id;
const uint64_t createdAt;
const uint32_t segmentsCount;
const uint64_t currentOffset;
const uint64_t sizeBytes;
uint64_t messagesCount;
public:
Partition(uint32_t id, uint64_t createdAt, uint32_t segmentsCount, uint64_t currentOffset, uint64_t sizeBytes, uint64_t messagesCount)
: id(id)
, createdAt(createdAt)
, segmentsCount(segmentsCount)
, currentOffset(currentOffset)
, sizeBytes(sizeBytes)
, messagesCount(messagesCount) {}
uint32_t getId() { return id; }
uint64_t getCreatedAt() { return createdAt; }
uint32_t getSegmentsCount() { return segmentsCount; }
uint64_t getCurrentOffset() { return currentOffset; }
uint64_t getSizeBytes() { return sizeBytes; }
uint64_t getMessagesCount() { return messagesCount; }
};
}; // namespace partition
/**
* @brief Models related to topic metadata.
*/
namespace topic {
/**
* @brief Metadata describing a topic at a summary level.
*/
class Topic : Model {
private:
const uint32_t id;
const uint64_t createdAt;
const std::string& name;
const uint64_t sizeBytes;
const std::optional<uint32_t> messageExpiry;
const std::optional<uint64_t> maxTopicSize;
const uint8_t replicationFactor;
const uint64_t messagesCount;
const uint32_t partitionsCount;
public:
Topic(uint32_t id,
uint64_t createdAt,
const std::string& name,
uint64_t sizeBytes,
std::optional<uint32_t> messageExpiry,
std::optional<uint64_t> maxTopicSize,
uint8_t replicationFactor,
uint64_t messagesCount,
uint32_t partitionsCount)
: id(id)
, createdAt(createdAt)
, name(name)
, sizeBytes(sizeBytes)
, messageExpiry(messageExpiry)
, maxTopicSize(maxTopicSize)
, replicationFactor(replicationFactor)
, messagesCount(messagesCount)
, partitionsCount(partitionsCount) {}
uint32_t getId() { return id; }
uint64_t getCreatedAt() { return createdAt; }
const std::string& getName() { return name; }
uint64_t getSizeBytes() { return sizeBytes; }
std::optional<uint32_t> getMessageExpiry() { return messageExpiry; }
std::optional<uint64_t> getMaxTopicSize() { return maxTopicSize; }
uint8_t getReplicationFactor() { return replicationFactor; }
uint64_t getMessagesCount() { return messagesCount; }
uint32_t getPartitionsCount() { return partitionsCount; }
};
/**
* @brief Metadata describing a topic, including partition details.
*/
class TopicDetails : Model {
private:
uint32_t id;
uint64_t createdAt;
std::string name;
uint64_t sizeBytes;
const std::optional<uint32_t> messageExpiry;
const std::optional<uint64_t> maxTopicSize;
uint8_t replicationFactor;
uint64_t messagesCount;
uint32_t partitionsCount;
const std::vector<partition::Partition>& partitions;
public:
TopicDetails(uint32_t id,
uint64_t createdAt,
const std::string& name,
uint64_t sizeBytes,
std::optional<uint32_t> messageExpiry,
std::optional<uint64_t> maxTopicSize,
uint8_t replicationFactor,
uint64_t messagesCount,
uint32_t partitionsCount,
const std::vector<partition::Partition>& partitions)
: id(id)
, createdAt(createdAt)
, name(name)
, sizeBytes(sizeBytes)
, messageExpiry(messageExpiry)
, maxTopicSize(maxTopicSize)
, replicationFactor(replicationFactor)
, messagesCount(messagesCount)
, partitionsCount(partitionsCount)
, partitions(partitions) {}
uint32_t getId() { return id; }
uint64_t getCreatedAt() { return createdAt; }
const std::string& getName() { return name; }
uint64_t getSizeBytes() { return sizeBytes; }
std::optional<uint32_t> getMessageExpiry() { return messageExpiry; }
std::optional<uint64_t> getMaxTopicSize() { return maxTopicSize; }
uint8_t getReplicationFactor() { return replicationFactor; }
uint64_t getMessagesCount() { return messagesCount; }
uint32_t getPartitionsCount() { return partitionsCount; }
const std::vector<partition::Partition>& getPartitions() { return partitions; }
};
}; // namespace topic
/**
* @brief Models related to message stream metadata.
*/
namespace stream {
/**
* @brief Metadata describing a message stream including topic details.
*/
class StreamDetails : Model {
private:
const uint32_t id;
const uint64_t createdAt;
const std::string& name;
const uint64_t sizeBytes;
const uint64_t messagesCount;
const uint32_t topicsCount;
const std::vector<topic::Topic>& topics;
public:
StreamDetails(uint32_t id,
uint64_t createdAt,
const std::string& name,
uint64_t sizeBytes,
uint64_t messagesCount,
uint32_t topicsCount,
const std::vector<topic::Topic>& topics)
: id(id)
, createdAt(createdAt)
, name(name)
, sizeBytes(sizeBytes)
, messagesCount(messagesCount)
, topicsCount(topicsCount)
, topics(topics) {}
uint32_t getId() { return id; }
uint64_t getCreatedAt() { return createdAt; }
const std::string getName() & { return name; }
uint64_t getSizeBytes() { return sizeBytes; }
uint64_t getMessagesCount() { return messagesCount; }
uint32_t getTopicsCount() { return topicsCount; }
const std::vector<topic::Topic>& getTopics() { return topics; }
};
}; // namespace stream
/**
* @brief Models related to messages consumed and sent to the Iggy server.
*/
namespace message {
/**
* @brief Current state of the message on the server.
*/
enum MessageState { AVAILABLE = 1, UNAVAILABLE = 10, POISONED = 20, MARKED_FOR_DELETION = 30 };
typedef std::string HeaderKey;
/**
* @brief Discriminator to allow correct decoding of header value bytes representing different value types.
*/
enum HeaderKind {
RAW = 1,
STRING = 2,
BOOL = 3,
INT8 = 4,
INT16 = 5,
INT32 = 6,
INT64 = 7,
INT128 = 8,
UINT8 = 9,
UINT16 = 10,
UINT32 = 11,
UINT64 = 12,
UINT128 = 13,
FLOAT32 = 14,
FLOAT64 = 15
};
/**
* @brief A value of various types associated with the message header -- message metadata.
*/
class HeaderValue : Model {
private:
const HeaderKind kind;
const std::vector<unsigned char>& value;
public:
HeaderValue(HeaderKind kind, const std::vector<unsigned char>& value)
: kind(kind)
, value(value) {}
HeaderKind getKind() const { return kind; }
const std::vector<unsigned char>& getValue() const { return value; }
};
/**
* @brief A message consumed or sent to the server, with binary payload and flexible metadata.
*/
class Message : Model {
private:
// core message state
const uint128_t id;
const std::unordered_map<HeaderKey, HeaderValue>& headers;
const uint32_t length;
const std::vector<unsigned char>& payload;
// message state set on the server-side
const std::optional<uint64_t> offset;
const std::optional<MessageState> state;
const std::optional<uint64_t> timestamp;
const std::optional<uint32_t> checksum;
public:
/**
* @brief Fully-qualified message constructor; @ref isComplete will be true.
*/
Message(uint128_t id,
const std::unordered_map<HeaderKey, HeaderValue>& headers,
uint32_t length,
const std::vector<unsigned char>& payload,
std::optional<uint64_t> offset,
std::optional<MessageState> state,
std::optional<uint64_t> timestamp,
std::optional<uint32_t> checksum)
: id(id)
, headers(headers)
, length(length)
, payload(payload)
, offset(offset)
, state(state)
, timestamp(timestamp)
, checksum(checksum) {}
/**
* @brief Simpler constructor for a message to be delivered to the server; @ref isComplete will be false.
*/
Message(uint128_t id,
const std::unordered_map<HeaderKey, HeaderValue>& headers,
uint32_t length,
const std::vector<unsigned char>& payload)
: Message(id,
headers,
length,
payload,
std::optional<uint64_t>(),
std::optional<MessageState>(),
std::optional<uint64_t>(),
std::optional<uint32_t>()) {}
uint128_t getId() const { return id; }
const std::unordered_map<HeaderKey, HeaderValue>& getHeaders() const { return headers; }
uint32_t getLength() const { return length; }
const std::vector<unsigned char>& getPayload() const { return payload; }
std::optional<uint64_t> getOffset() const { return offset; }
std::optional<MessageState> getState() const { return state; }
std::optional<uint64_t> getTimestamp() const { return timestamp; }
std::optional<uint32_t> getChecksum() const { return checksum; }
/**
* @brief Check if the message has all the server-side fields set.
*/
bool isComplete() const { return offset.has_value() && state.has_value() && timestamp.has_value() && checksum.has_value(); }
};
/**
* @brief Carrier object for all the messages polled from the server,
*/
class PolledMessages : Model {
private:
const uint32_t partition_id;
const uint64_t current_offset;
const std::vector<Message>& messages;
public:
PolledMessages(uint32_t partition_id, uint64_t current_offset, const std::vector<Message>& messages)
: partition_id(partition_id)
, current_offset(current_offset)
, messages(messages) {}
uint32_t getPartitionId() const { return partition_id; }
uint64_t getCurrentOffset() const { return current_offset; }
const std::vector<Message>& getMessages() const { return messages; }
};
} // namespace message
/**
* @brief Information about server-managed stream consumer offsets.
*/
namespace consumeroffset {
/**
* @brief Current state of the client's position reading the stream via a consumer or consumer group.
*/
class ConsumerOffsetInfo : Model {
private:
const uint32_t partitionId;
const uint64_t currentOffset;
const uint64_t storedOffset;
public:
ConsumerOffsetInfo(uint32_t partitionId, uint64_t currentOffset, uint64_t storedOffset)
: partitionId(partitionId)
, currentOffset(currentOffset)
, storedOffset(storedOffset) {}
uint32_t getPartitionId() { return partitionId; }
uint64_t getCurrentOffset() { return currentOffset; }
uint64_t getStoredOffset() { return storedOffset; }
};
}; // namespace consumeroffset
/**
* @brief Models related to consumer groups, which are used to coordinate message consumption across multiple clients.
*/
namespace consumergroup {
class ConsumerGroupMember : Model {
private:
const uint32_t id;
const uint32_t partitionsCount;
const std::vector<uint32_t>& partitions;
public:
ConsumerGroupMember(uint32_t id, uint32_t partitionsCount, const std::vector<uint32_t>& partitions)
: id(id)
, partitionsCount(partitionsCount)
, partitions(partitions) {}
uint32_t getId() { return id; }
uint32_t getPartitionsCount() { return partitionsCount; }
const std::vector<uint32_t>& getPartitions() { return partitions; }
};
class ConsumerGroupDetails : Model {
private:
uint32_t id;
const std::string& name;
uint32_t paritionsCount;
uint32_t membersCount;
const std::vector<ConsumerGroupMember>& members;
public:
ConsumerGroupDetails(uint32_t id,
const std::string& name,
uint32_t paritionsCount,
uint32_t membersCount,
const std::vector<ConsumerGroupMember>& members)
: id(id)
, name(name)
, paritionsCount(paritionsCount)
, membersCount(membersCount)
, members(members) {}
uint32_t getId() const { return id; }
const std::string& getName() const { return name; }
uint32_t getParitionsCount() const { return paritionsCount; }
uint32_t getMembersCount() const { return membersCount; }
const std::vector<ConsumerGroupMember>& getMembers() const { return members; }
};
}; // namespace consumergroup
/**
* @brief Models related to global system state.
*/
namespace sys {
/**
* @brief Summary of the consumer groups that a client has joined.
*/
class ConsumerGroupInfo : Model {
private:
const uint32_t streamId;
const uint32_t topicId;
const uint32_t consumerGroupId;
public:
ConsumerGroupInfo(uint32_t streamId, uint32_t topicId, uint32_t consumerGroupId)
: streamId(streamId)
, topicId(topicId)
, consumerGroupId(consumerGroupId) {}
uint32_t getStreamId() { return streamId; }
uint32_t getTopicId() { return topicId; }
uint32_t getConsumerGroupId() { return consumerGroupId; }
};
/**
* @brief Information about a client connected to the server.
*/
class ClientInfoDetails : Model {
private:
const uint32_t clientId;
const std::optional<uint32_t> userId;
const std::string& address;
const std::string& transport;
const uint32_t consumerGroupsCount;
const std::vector<ConsumerGroupInfo>& consumerGroups;
public:
ClientInfoDetails(uint32_t clientId,
std::optional<uint32_t> userId,
const std::string& address,
const std::string& transport,
uint32_t consumerGroupsCount,
const std::vector<ConsumerGroupInfo>& consumerGroups)
: clientId(clientId)
, userId(userId)
, address(address)
, transport(transport)
, consumerGroupsCount(consumerGroupsCount)
, consumerGroups(consumerGroups) {}
uint32_t getClientId() const { return clientId; }
std::optional<uint32_t> getUserId() const { return userId; }
const std::string& getAddress() const { return address; }
const std::string& getTransport() const { return transport; }
uint32_t getConsumerGroupsCount() const { return consumerGroupsCount; }
const std::vector<ConsumerGroupInfo>& getConsumerGroups() const { return consumerGroups; }
};
/**
* @brief Model class holding server performance statistics.
*
* This class is used to represent the result of the GetStats call,
* which gets a snapshot of latest server performance statistics.
*
* @see [GetStats definition](https://iggy.apache.org/docs/server/schema#get-stats)
* @see [stats.rs](https://github.com/apache/iggy/blob/master/core/server/src/streaming/systems/stats.rs)
*/
class Stats : Model {
private:
const pid_t process_id;
const percent_t cpu_usage;
const byte_cnt_t memory_usage;
const byte_cnt_t total_memory;
const byte_cnt_t available_memory;
const time_val_t run_time;
const time_val_t start_time;
const byte_cnt_t read_bytes;
const byte_cnt_t written_bytes;
const byte_cnt_t messages_size_bytes;
const obj_cnt_t streams_count;
const obj_cnt_t topics_count;
const obj_cnt_t partitions_count;
const obj_cnt_t segments_count;
const msg_cnt_t messages_count;
const obj_cnt_t clients_count;
const obj_cnt_t consumer_groups_count;
const std::string& hostname;
const std::string& os_name;
const std::string& os_version;
const std::string& kernel_version;
public:
Stats(pid_t process_id,
percent_t cpu_usage,
byte_cnt_t memory_usage,
byte_cnt_t total_memory,
byte_cnt_t available_memory,
time_val_t run_time,
time_val_t start_time,
byte_cnt_t read_bytes,
byte_cnt_t written_bytes,
byte_cnt_t messages_size_bytes,
obj_cnt_t streams_count,
obj_cnt_t topics_count,
obj_cnt_t partitions_count,
obj_cnt_t segments_count,
msg_cnt_t messages_count,
obj_cnt_t clients_count,
obj_cnt_t consumer_groups_count,
const std::string& hostname,
const std::string& os_name,
const std::string& os_version,
const std::string& kernel_version)
: process_id(process_id)
, cpu_usage(cpu_usage)
, memory_usage(memory_usage)
, total_memory(total_memory)
, available_memory(available_memory)
, run_time(run_time)
, start_time(start_time)
, read_bytes(read_bytes)
, written_bytes(written_bytes)
, messages_size_bytes(messages_size_bytes)
, streams_count(streams_count)
, topics_count(topics_count)
, partitions_count(partitions_count)
, segments_count(segments_count)
, messages_count(messages_count)
, clients_count(clients_count)
, consumer_groups_count(consumer_groups_count)
, hostname(hostname)
, os_name(os_name)
, os_version(os_version)
, kernel_version(kernel_version) {}
/// @brief Get the server process ID (PID)
pid_t getProcessId() const { return process_id; }
/// @brief Get the server process CPU usage.
percent_t getCpuUsage() const { return cpu_usage; }
/// @brief Get the server total memory usage.
byte_cnt_t getMemoryUsage() const { return memory_usage; }
/// @brief Get the server total memory.
byte_cnt_t getTotalMemory() const { return total_memory; }
/// @brief Get the server available memory.
byte_cnt_t getAvailableMemory() const { return available_memory; }
/// @brief Get how long the server has been running, in seconds.
time_val_t getRunTime() const { return run_time; }
/// @brief Get the server start time, in seconds since the UNIX epoch.
time_val_t getStartTime() const { return start_time; }
/// @brief Get the number of bytes read since process start.
byte_cnt_t getReadBytes() const { return read_bytes; }
/// @brief Get the number of bytes written since process start.
byte_cnt_t getWrittenBytes() const { return written_bytes; }
/// @brief Get the total size of all messages processed.
byte_cnt_t getMessagesSizeBytes() const { return messages_size_bytes; }
/// @brief Get the total number of streams in the server.
obj_cnt_t getStreamsCount() const { return streams_count; }
/// @brief Get the total number of topics on the server.
obj_cnt_t getTopicsCount() const { return topics_count; }
/// @brief Get the total number of topic partitions on the server across all topics.
obj_cnt_t getPartitionsCount() const { return partitions_count; }
/// @brief Get the total number of fixed-size segments used for topic storage on disk.
obj_cnt_t getSegmentsCount() const { return segments_count; }
/// @brief Get the total number of messages processed by the server across all topics.
msg_cnt_t getMessagesCount() const { return messages_count; }
/// @brief Get the total number of active connected clients.
obj_cnt_t getClientsCount() const { return clients_count; }
/// @brief Get the total number of active consumer groups on the server.
obj_cnt_t getConsumerGroupsCount() const { return consumer_groups_count; }
/// @brief Get the name of the host that the server process is running on.
const std::string& getHostname() const { return hostname; }
/// @brief Get the name of the operating system that the server process is running on.
const std::string& getOsName() const { return os_name; }
/// @brief Get the version of the operating system that the server process is running on.
const std::string& getOsVersion() const { return os_version; }
/// @brief Get the version of the OS kernel that the server process is running on.
const std::string& getKernelVersion() const { return kernel_version; }
};
} // namespace sys
} // namespace model
} // namespace icp